Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 12886852cc | |||
| 7058e7139d | |||
| 73f3552b52 | |||
| 54a3a5b5d8 | |||
| c0044274f7 |
Generated
+124
-104
@@ -193,6 +193,51 @@ version = "1.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum"
|
||||||
|
version = "0.5.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"axum-core",
|
||||||
|
"bitflags",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"hyper",
|
||||||
|
"itoa 1.0.1",
|
||||||
|
"matchit",
|
||||||
|
"memchr",
|
||||||
|
"mime",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project-lite",
|
||||||
|
"serde",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tokio",
|
||||||
|
"tower",
|
||||||
|
"tower-http",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum-core"
|
||||||
|
version = "0.2.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"mime",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bandwidth-claim-contract"
|
name = "bandwidth-claim-contract"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -685,22 +730,21 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "console-api"
|
name = "console-api"
|
||||||
version = "0.1.2"
|
version = "0.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cc347c19eb5b940f396ac155822caee6662f850d97306890ac3773ed76c90c5a"
|
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.9.0",
|
"prost 0.11.0",
|
||||||
"prost-types 0.9.0",
|
"prost-types 0.11.1",
|
||||||
"tonic",
|
"tonic",
|
||||||
"tonic-build",
|
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "console-subscriber"
|
name = "console-subscriber"
|
||||||
version = "0.1.3"
|
version = "0.1.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "565a7dfea2d10dd0e5c57cc394d5d441b1910960d8c9211ed14135e0e6ec3a20"
|
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"console-api",
|
"console-api",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
@@ -708,7 +752,7 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"hdrhistogram",
|
"hdrhistogram",
|
||||||
"humantime 2.1.0",
|
"humantime 2.1.0",
|
||||||
"prost-types 0.9.0",
|
"prost-types 0.11.1",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thread_local",
|
"thread_local",
|
||||||
@@ -1664,12 +1708,6 @@ dependencies = [
|
|||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "fixedbitset"
|
|
||||||
version = "0.4.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.0.22"
|
version = "1.0.22"
|
||||||
@@ -2266,9 +2304,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "0.2.6"
|
version = "0.2.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
|
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"fnv",
|
"fnv",
|
||||||
@@ -2277,15 +2315,21 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http-body"
|
name = "http-body"
|
||||||
version = "0.4.4"
|
version = "0.4.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
|
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"http",
|
"http",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "http-range-header"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "httparse"
|
name = "httparse"
|
||||||
version = "1.6.0"
|
version = "1.6.0"
|
||||||
@@ -2761,6 +2805,12 @@ version = "0.1.9"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
|
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "matchit"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "maxminddb"
|
name = "maxminddb"
|
||||||
version = "0.23.0"
|
version = "0.23.0"
|
||||||
@@ -2903,12 +2953,6 @@ dependencies = [
|
|||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "multimap"
|
|
||||||
version = "0.8.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "multisig-contract-common"
|
name = "multisig-contract-common"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -3114,6 +3158,7 @@ dependencies = [
|
|||||||
"coconut-interface",
|
"coconut-interface",
|
||||||
"completions",
|
"completions",
|
||||||
"config",
|
"config",
|
||||||
|
"console-subscriber",
|
||||||
"credential-storage",
|
"credential-storage",
|
||||||
"credentials",
|
"credentials",
|
||||||
"crypto",
|
"crypto",
|
||||||
@@ -3154,6 +3199,7 @@ dependencies = [
|
|||||||
"colored",
|
"colored",
|
||||||
"completions",
|
"completions",
|
||||||
"config",
|
"config",
|
||||||
|
"console-subscriber",
|
||||||
"credentials",
|
"credentials",
|
||||||
"crypto",
|
"crypto",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
@@ -3282,6 +3328,7 @@ dependencies = [
|
|||||||
"coconut-interface",
|
"coconut-interface",
|
||||||
"completions",
|
"completions",
|
||||||
"config",
|
"config",
|
||||||
|
"console-subscriber",
|
||||||
"credential-storage",
|
"credential-storage",
|
||||||
"credentials",
|
"credentials",
|
||||||
"crypto",
|
"crypto",
|
||||||
@@ -3833,16 +3880,6 @@ dependencies = [
|
|||||||
"sha-1 0.8.2",
|
"sha-1 0.8.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "petgraph"
|
|
||||||
version = "0.6.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
|
|
||||||
dependencies = [
|
|
||||||
"fixedbitset",
|
|
||||||
"indexmap",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pickledb"
|
name = "pickledb"
|
||||||
version = "0.4.1"
|
version = "0.4.1"
|
||||||
@@ -4007,16 +4044,6 @@ dependencies = [
|
|||||||
"yansi",
|
"yansi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "prost"
|
|
||||||
version = "0.9.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"prost-derive 0.9.0",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "prost"
|
name = "prost"
|
||||||
version = "0.10.3"
|
version = "0.10.3"
|
||||||
@@ -4028,36 +4055,13 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "prost-build"
|
name = "prost"
|
||||||
version = "0.9.0"
|
version = "0.11.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
|
checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"heck 0.3.3",
|
"prost-derive 0.11.0",
|
||||||
"itertools",
|
|
||||||
"lazy_static",
|
|
||||||
"log",
|
|
||||||
"multimap",
|
|
||||||
"petgraph",
|
|
||||||
"prost 0.9.0",
|
|
||||||
"prost-types 0.9.0",
|
|
||||||
"regex",
|
|
||||||
"tempfile",
|
|
||||||
"which",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "prost-derive"
|
|
||||||
version = "0.9.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
"itertools",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -4074,13 +4078,16 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "prost-types"
|
name = "prost-derive"
|
||||||
version = "0.9.0"
|
version = "0.11.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
|
checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"anyhow",
|
||||||
"prost 0.9.0",
|
"itertools",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -4093,6 +4100,16 @@ dependencies = [
|
|||||||
"prost 0.10.3",
|
"prost 0.10.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-types"
|
||||||
|
version = "0.11.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost 0.11.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proxy-helpers"
|
name = "proxy-helpers"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -5566,6 +5583,12 @@ dependencies = [
|
|||||||
"unicode-xid",
|
"unicode-xid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sync_wrapper"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "synstructure"
|
name = "synstructure"
|
||||||
version = "0.12.6"
|
version = "0.12.6"
|
||||||
@@ -5974,12 +5997,13 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tonic"
|
name = "tonic"
|
||||||
version = "0.6.2"
|
version = "0.8.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
|
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-stream",
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"axum",
|
||||||
"base64",
|
"base64",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
@@ -5991,11 +6015,11 @@ dependencies = [
|
|||||||
"hyper-timeout",
|
"hyper-timeout",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"prost 0.9.0",
|
"prost 0.11.0",
|
||||||
"prost-derive 0.9.0",
|
"prost-derive 0.11.0",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-util 0.6.9",
|
"tokio-util 0.7.3",
|
||||||
"tower",
|
"tower",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
@@ -6003,18 +6027,6 @@ dependencies = [
|
|||||||
"tracing-futures",
|
"tracing-futures",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tonic-build"
|
|
||||||
version = "0.6.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"prost-build",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "topology"
|
name = "topology"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -6049,6 +6061,25 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-http"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-range-header",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-layer"
|
name = "tower-layer"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
@@ -6681,17 +6712,6 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "which"
|
|
||||||
version = "4.2.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae"
|
|
||||||
dependencies = [
|
|
||||||
"either",
|
|
||||||
"lazy_static",
|
|
||||||
"libc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi"
|
name = "winapi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
|
|||||||
@@ -175,7 +175,10 @@ impl LoopCoverTrafficStream<OsRng> {
|
|||||||
// - we run out of memory
|
// - we run out of memory
|
||||||
// - the receiver channel is closed
|
// - the receiver channel is closed
|
||||||
// in either case there's no recovery and we can only panic
|
// in either case there's no recovery and we can only panic
|
||||||
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
|
//self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
|
||||||
|
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
|
||||||
|
log::error!("Failed to send cover traffic: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
|
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
|
||||||
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
|
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
|
||||||
@@ -197,7 +200,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
|||||||
sample_poisson_duration(&mut self.rng, self.average_cover_message_sending_delay);
|
sample_poisson_duration(&mut self.rng, self.average_cover_message_sending_delay);
|
||||||
self.next_delay = Box::pin(time::sleep(sampled));
|
self.next_delay = Box::pin(time::sleep(sampled));
|
||||||
|
|
||||||
spawn_future(async move {
|
spawn_future("loop cover traffic stream", async move {
|
||||||
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
|
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
|
||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
@@ -208,6 +211,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
|||||||
}
|
}
|
||||||
next = self.next() => {
|
next = self.next() => {
|
||||||
if next.is_some() {
|
if next.is_some() {
|
||||||
|
//log::debug!("loop cover traffic: got next msg to send to mix traffic");
|
||||||
self.on_new_message().await;
|
self.on_new_message().await;
|
||||||
} else {
|
} else {
|
||||||
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
|
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
|
||||||
|
|||||||
@@ -8,8 +8,12 @@ use gateway_client::GatewayClient;
|
|||||||
use log::*;
|
use log::*;
|
||||||
use nymsphinx::forwarding::packet::MixPacket;
|
use nymsphinx::forwarding::packet::MixPacket;
|
||||||
|
|
||||||
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
|
//pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
|
||||||
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
|
//pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
|
||||||
|
//pub type BatchMixMessageSender = mpsc::Sender<Vec<MixPacket>>;
|
||||||
|
//pub type BatchMixMessageReceiver = mpsc::Receiver<Vec<MixPacket>>;
|
||||||
|
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||||
|
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||||
|
|
||||||
const MAX_FAILURE_COUNT: usize = 100;
|
const MAX_FAILURE_COUNT: usize = 100;
|
||||||
|
|
||||||
@@ -38,6 +42,7 @@ impl MixTrafficController {
|
|||||||
|
|
||||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||||
debug_assert!(!mix_packets.is_empty());
|
debug_assert!(!mix_packets.is_empty());
|
||||||
|
//log::debug!("on_messages: {}", mix_packets.len());
|
||||||
|
|
||||||
let result = if mix_packets.len() == 1 {
|
let result = if mix_packets.len() == 1 {
|
||||||
let mix_packet = mix_packets.pop().unwrap();
|
let mix_packet = mix_packets.pop().unwrap();
|
||||||
@@ -67,13 +72,14 @@ impl MixTrafficController {
|
|||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
|
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
|
||||||
spawn_future(async move {
|
spawn_future("mix traffic controller", async move {
|
||||||
debug!("Started MixTrafficController with graceful shutdown support");
|
debug!("Started MixTrafficController with graceful shutdown support");
|
||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
mix_packets = self.mix_rx.next() => match mix_packets {
|
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||||
Some(mix_packets) => {
|
Some(mix_packets) => {
|
||||||
|
//log::debug!("received mix packet to send to gateway");
|
||||||
self.on_messages(mix_packets).await;
|
self.on_messages(mix_packets).await;
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
|
|||||||
+1
-1
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
||||||
debug!("Received an ack");
|
trace!("Received an ack");
|
||||||
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
||||||
.map(FragmentIdentifier::try_from_bytes)
|
.map(FragmentIdentifier::try_from_bytes)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -188,6 +188,7 @@ where
|
|||||||
config.average_ack_delay,
|
config.average_ack_delay,
|
||||||
)
|
)
|
||||||
.with_custom_real_message_packet_size(config.packet_size);
|
.with_custom_real_message_packet_size(config.packet_size);
|
||||||
|
//.with_custom_real_message_packet_size(nymsphinx::params::PacketSize::ExtendedPacket);
|
||||||
|
|
||||||
// will listen for any acks coming from the network
|
// will listen for any acks coming from the network
|
||||||
let acknowledgement_listener = AcknowledgementListener::new(
|
let acknowledgement_listener = AcknowledgementListener::new(
|
||||||
@@ -243,7 +244,7 @@ where
|
|||||||
let mut action_controller = self.action_controller;
|
let mut action_controller = self.action_controller;
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("acknowledgement listener", async move {
|
||||||
acknowledgement_listener
|
acknowledgement_listener
|
||||||
.run_with_shutdown(shutdown_handle)
|
.run_with_shutdown(shutdown_handle)
|
||||||
.await;
|
.await;
|
||||||
@@ -251,7 +252,7 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("input message listener", async move {
|
||||||
input_message_listener
|
input_message_listener
|
||||||
.run_with_shutdown(shutdown_handle)
|
.run_with_shutdown(shutdown_handle)
|
||||||
.await;
|
.await;
|
||||||
@@ -259,7 +260,7 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("retransmission request listener", async move {
|
||||||
retransmission_request_listener
|
retransmission_request_listener
|
||||||
.run_with_shutdown(shutdown_handle)
|
.run_with_shutdown(shutdown_handle)
|
||||||
.await;
|
.await;
|
||||||
@@ -267,14 +268,14 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("sent notification listener", async move {
|
||||||
sent_notification_listener
|
sent_notification_listener
|
||||||
.run_with_shutdown(shutdown_handle)
|
.run_with_shutdown(shutdown_handle)
|
||||||
.await;
|
.await;
|
||||||
debug!("The sent notification listener has finished execution!");
|
debug!("The sent notification listener has finished execution!");
|
||||||
});
|
});
|
||||||
|
|
||||||
spawn_future(async move {
|
spawn_future("action controller", async move {
|
||||||
action_controller.run_with_shutdown(shutdown).await;
|
action_controller.run_with_shutdown(shutdown).await;
|
||||||
debug!("The controller has finished execution!");
|
debug!("The controller has finished execution!");
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ impl RealMessagesController<OsRng> {
|
|||||||
let ack_control = self.ack_control;
|
let ack_control = self.ack_control;
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("out queue control", async move {
|
||||||
out_queue_control.run_with_shutdown(shutdown_handle).await;
|
out_queue_control.run_with_shutdown(shutdown_handle).await;
|
||||||
debug!("The out queue controller has finished execution!");
|
debug!("The out queue controller has finished execution!");
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
use crate::client::mix_traffic::BatchMixMessageSender;
|
use crate::client::mix_traffic::BatchMixMessageSender;
|
||||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||||
use crate::client::topology_control::TopologyAccessor;
|
use crate::client::topology_control::TopologyAccessor;
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc::{self};
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use futures::{Future, Stream, StreamExt};
|
use futures::{Future, Stream, StreamExt};
|
||||||
use log::*;
|
use log::*;
|
||||||
@@ -20,6 +20,8 @@ use std::collections::VecDeque;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
@@ -27,6 +29,8 @@ use tokio::time;
|
|||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
use wasm_timer;
|
use wasm_timer;
|
||||||
|
|
||||||
|
const REDUCE_DELAY_THRESHOLD: usize = 100;
|
||||||
|
|
||||||
/// Configurable parameters of the `OutQueueControl`
|
/// Configurable parameters of the `OutQueueControl`
|
||||||
pub(crate) struct Config {
|
pub(crate) struct Config {
|
||||||
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
||||||
@@ -89,6 +93,13 @@ where
|
|||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
|
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
|
||||||
|
|
||||||
|
/// The current message delay, only different from `average_message_sending_delay` if there has been
|
||||||
|
/// backpressure from the gateway client to slow down.
|
||||||
|
current_average_message_sending_delay: Duration,
|
||||||
|
|
||||||
|
/// Count the number of successful sends to determine if we should reduce sending delay
|
||||||
|
number_of_consectivive_successful_sends: usize,
|
||||||
|
|
||||||
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
||||||
/// out to the network without any further delays.
|
/// out to the network without any further delays.
|
||||||
mix_tx: BatchMixMessageSender,
|
mix_tx: BatchMixMessageSender,
|
||||||
@@ -151,11 +162,14 @@ where
|
|||||||
our_full_destination: Recipient,
|
our_full_destination: Recipient,
|
||||||
topology_access: TopologyAccessor,
|
topology_access: TopologyAccessor,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let current_average_message_sending_delay = config.average_message_sending_delay;
|
||||||
OutQueueControl {
|
OutQueueControl {
|
||||||
config,
|
config,
|
||||||
ack_key,
|
ack_key,
|
||||||
sent_notifier,
|
sent_notifier,
|
||||||
next_delay: None,
|
next_delay: None,
|
||||||
|
current_average_message_sending_delay,
|
||||||
|
number_of_consectivive_successful_sends: 0,
|
||||||
mix_tx,
|
mix_tx,
|
||||||
real_receiver,
|
real_receiver,
|
||||||
our_full_destination,
|
our_full_destination,
|
||||||
@@ -173,10 +187,12 @@ where
|
|||||||
self.sent_notifier.unbounded_send(frag_id).unwrap();
|
self.sent_notifier.unbounded_send(frag_id).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_lines)]
|
||||||
async fn on_message(&mut self, next_message: StreamMessage) {
|
async fn on_message(&mut self, next_message: StreamMessage) {
|
||||||
trace!("created new message");
|
trace!("created new message");
|
||||||
|
|
||||||
let next_message = match next_message {
|
//let next_message = match next_message {
|
||||||
|
match next_message {
|
||||||
StreamMessage::Cover => {
|
StreamMessage::Cover => {
|
||||||
// TODO for way down the line: in very rare cases (during topology update) we might have
|
// TODO for way down the line: in very rare cases (during topology update) we might have
|
||||||
// to wait a really tiny bit before actually obtaining the permit hence messing with our
|
// to wait a really tiny bit before actually obtaining the permit hence messing with our
|
||||||
@@ -195,7 +211,7 @@ where
|
|||||||
}
|
}
|
||||||
let topology_ref = topology_ref_option.unwrap();
|
let topology_ref = topology_ref_option.unwrap();
|
||||||
|
|
||||||
generate_loop_cover_packet(
|
let next_message = generate_loop_cover_packet(
|
||||||
&mut self.rng,
|
&mut self.rng,
|
||||||
topology_ref,
|
topology_ref,
|
||||||
&self.ack_key,
|
&self.ack_key,
|
||||||
@@ -204,11 +220,120 @@ where
|
|||||||
self.config.average_packet_delay,
|
self.config.average_packet_delay,
|
||||||
self.config.cover_packet_size,
|
self.config.cover_packet_size,
|
||||||
)
|
)
|
||||||
.expect("Somehow failed to generate a loop cover message with a valid topology")
|
.expect("Somehow failed to generate a loop cover message with a valid topology");
|
||||||
|
|
||||||
|
log::info!("capacity: {}", self.mix_tx.capacity());
|
||||||
|
if self.mix_tx.max_capacity() - self.mix_tx.capacity() > 4 && self.number_of_consectivive_successful_sends > 10 {
|
||||||
|
self.current_average_message_sending_delay =
|
||||||
|
self.current_average_message_sending_delay.mul_f64(1.2);
|
||||||
|
log::error!(
|
||||||
|
"new average_message_sending_delay: {:?}",
|
||||||
|
self.current_average_message_sending_delay
|
||||||
|
);
|
||||||
|
self.number_of_consectivive_successful_sends = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.mix_tx.try_send(vec![next_message]) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.number_of_consectivive_successful_sends += 1;
|
||||||
|
//if self.number_of_consectivive_successful_sends > REDUCE_DELAY_THRESHOLD {
|
||||||
|
// self.number_of_consectivive_successful_sends = 0;
|
||||||
|
// log::error!(
|
||||||
|
// "old average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
// );
|
||||||
|
// self.current_average_message_sending_delay =
|
||||||
|
// self.current_average_message_sending_delay.mul_f64(0.9);
|
||||||
|
// log::error!(
|
||||||
|
// "new average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
// );
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
Err(TrySendError::Full(err)) => {
|
||||||
|
log::error!("Failed to send");
|
||||||
|
//self.number_of_consectivive_successful_sends = 0;
|
||||||
|
//// Increase average send delay
|
||||||
|
//log::error!(
|
||||||
|
// "old average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
//);
|
||||||
|
////self.current_average_message_sending_delay *= 2;
|
||||||
|
//self.current_average_message_sending_delay =
|
||||||
|
// self.current_average_message_sending_delay.mul_f64(1.2);
|
||||||
|
//log::error!(
|
||||||
|
// "new average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
//);
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(err)) => {
|
||||||
|
log::warn!("should not happen during normal operation!");
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
StreamMessage::Real(real_message) => {
|
StreamMessage::Real(real_message) => {
|
||||||
self.sent_notify(real_message.fragment_id);
|
let RealMessage {
|
||||||
real_message.mix_packet
|
mix_packet: next_message,
|
||||||
|
fragment_id,
|
||||||
|
} = *real_message;
|
||||||
|
log::info!("capacity: {}", self.mix_tx.capacity());
|
||||||
|
if self.mix_tx.max_capacity() - self.mix_tx.capacity() > 4 && self.number_of_consectivive_successful_sends > 10 {
|
||||||
|
self.current_average_message_sending_delay =
|
||||||
|
self.current_average_message_sending_delay.mul_f64(1.2);
|
||||||
|
log::error!(
|
||||||
|
"new average_message_sending_delay: {:?}",
|
||||||
|
self.current_average_message_sending_delay
|
||||||
|
);
|
||||||
|
self.number_of_consectivive_successful_sends = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.mix_tx.try_send(vec![next_message]) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.sent_notify(fragment_id);
|
||||||
|
self.number_of_consectivive_successful_sends += 1;
|
||||||
|
//if self.number_of_consectivive_successful_sends > REDUCE_DELAY_THRESHOLD {
|
||||||
|
// self.number_of_consectivive_successful_sends = 0;
|
||||||
|
// log::error!(
|
||||||
|
// "old average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
// );
|
||||||
|
// self.current_average_message_sending_delay =
|
||||||
|
// self.current_average_message_sending_delay.mul_f64(0.9);
|
||||||
|
// log::error!(
|
||||||
|
// "new average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
// );
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
Err(TrySendError::Full(err)) => {
|
||||||
|
log::error!("Failed to send, channel full, will retry: {}", fragment_id);
|
||||||
|
//self.number_of_consectivive_successful_sends = 0;
|
||||||
|
// Re-queue at the front
|
||||||
|
let mut msg = err;
|
||||||
|
assert!(msg.len() == 1);
|
||||||
|
let msg = msg.pop().unwrap();
|
||||||
|
let new_real_message = RealMessage::new(msg, real_message.fragment_id);
|
||||||
|
self.received_buffer.push_front(new_real_message);
|
||||||
|
|
||||||
|
// Increase average send delay
|
||||||
|
//log::error!(
|
||||||
|
// "old average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
//);
|
||||||
|
////self.current_average_message_sending_delay *= 2;
|
||||||
|
//self.current_average_message_sending_delay =
|
||||||
|
// self.current_average_message_sending_delay.mul_f64(1.2);
|
||||||
|
//log::error!(
|
||||||
|
// "new average_message_sending_delay: {:?}",
|
||||||
|
// self.current_average_message_sending_delay
|
||||||
|
//);
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(err)) => {
|
||||||
|
log::warn!("should not happen during normal operation!");
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -216,12 +341,13 @@ where
|
|||||||
// - we run out of memory
|
// - we run out of memory
|
||||||
// - the receiver channel is closed
|
// - the receiver channel is closed
|
||||||
// in either case there's no recovery and we can only panic
|
// in either case there's no recovery and we can only panic
|
||||||
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
|
//if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
|
||||||
log::warn!(
|
// log::warn!(
|
||||||
"Failed to send {} packets (possible process shutdown?)",
|
// "Failed to send {} packets (possible process shutdown?)",
|
||||||
err.into_inner().len()
|
// err.into_inner().len()
|
||||||
);
|
// );
|
||||||
}
|
//}
|
||||||
|
//self.mix_tx.try_send(vec![next_message]).unwrap();
|
||||||
|
|
||||||
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
|
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
|
||||||
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
|
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
|
||||||
@@ -238,12 +364,15 @@ where
|
|||||||
if let Some(ref mut next_delay) = &mut self.next_delay {
|
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||||
// it is not yet time to return a message
|
// it is not yet time to return a message
|
||||||
if next_delay.as_mut().poll(cx).is_pending() {
|
if next_delay.as_mut().poll(cx).is_pending() {
|
||||||
|
//log::debug!("poisson: pending");
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
|
//} else {
|
||||||
|
//log::debug!("poisson: not pending");
|
||||||
};
|
};
|
||||||
|
|
||||||
// we know it's time to send a message, so let's prepare delay for the next one
|
// we know it's time to send a message, so let's prepare delay for the next one
|
||||||
// Get the `now` by looking at the current `delay` deadline
|
// Get the `now` by looking at the current `delay` deadline
|
||||||
let avg_delay = self.config.average_message_sending_delay;
|
let avg_delay = self.current_average_message_sending_delay;
|
||||||
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
||||||
|
|
||||||
// The next interval value is `next_poisson_delay` after the one that just
|
// The next interval value is `next_poisson_delay` after the one that just
|
||||||
@@ -262,6 +391,11 @@ where
|
|||||||
|
|
||||||
// check if we have anything immediately available
|
// check if we have anything immediately available
|
||||||
if let Some(real_available) = self.received_buffer.pop_front() {
|
if let Some(real_available) = self.received_buffer.pop_front() {
|
||||||
|
//log::debug!("real available");
|
||||||
|
log::debug!(
|
||||||
|
"sending from received_buffer: {}",
|
||||||
|
self.received_buffer.len()
|
||||||
|
);
|
||||||
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -273,6 +407,7 @@ where
|
|||||||
|
|
||||||
// if there are more messages available, return first one and store the rest
|
// if there are more messages available, return first one and store the rest
|
||||||
Poll::Ready(Some(real_messages)) => {
|
Poll::Ready(Some(real_messages)) => {
|
||||||
|
log::debug!("sending from real_receiver: {}", real_messages.len());
|
||||||
self.received_buffer = real_messages.into();
|
self.received_buffer = real_messages.into();
|
||||||
// we MUST HAVE received at least ONE message
|
// we MUST HAVE received at least ONE message
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
||||||
@@ -281,9 +416,13 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// otherwise construct a dummy one
|
// otherwise construct a dummy one
|
||||||
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
|
Poll::Pending => {
|
||||||
|
//log::debug!("cover message");
|
||||||
|
Poll::Ready(Some(StreamMessage::Cover))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
log::debug!("poisson: not send");
|
||||||
// we never set an initial delay - let's do it now
|
// we never set an initial delay - let's do it now
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
|
|
||||||
@@ -347,14 +486,29 @@ where
|
|||||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
|
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
|
||||||
debug!("Started OutQueueControl with graceful shutdown support");
|
debug!("Started OutQueueControl with graceful shutdown support");
|
||||||
|
|
||||||
|
//let stream = self.delay_stream();
|
||||||
|
//let stream = delay_stream(self.config.average_message_sending_delay);
|
||||||
|
//tokio::pin!(stream);
|
||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
log::trace!("OutQueueControl: Received shutdown");
|
log::trace!("OutQueueControl: Received shutdown");
|
||||||
}
|
}
|
||||||
|
//next_message = stream.next() => match next_message {
|
||||||
|
// Some(next_message) => {
|
||||||
|
// log::debug!("out queue control: got next_message to send to mix traffic");
|
||||||
|
// self.on_message(next_message).await;
|
||||||
|
// },
|
||||||
|
// None => {
|
||||||
|
// log::trace!("OutQueueControl: Stopping since channel closed");
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//}
|
||||||
next_message = self.next() => match next_message {
|
next_message = self.next() => match next_message {
|
||||||
Some(next_message) => {
|
Some(next_message) => {
|
||||||
|
//log::debug!("out queue control: got next_message to send to mix traffic");
|
||||||
self.on_message(next_message).await;
|
self.on_message(next_message).await;
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
@@ -378,6 +532,19 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//fn delay_stream(avg_delay: Duration) -> impl futures::Stream<Item = StreamMessage> + 'static {
|
||||||
|
// let mut rng = rand::rngs::OsRng;
|
||||||
|
//
|
||||||
|
// futures::stream::unfold((rng, avg_delay), |(mut rng, avg_delay)| async {
|
||||||
|
// //let avg_delay = out_queue_control.config.average_message_sending_delay;
|
||||||
|
// let next_poisson_delay = sample_poisson_duration(&mut rng, avg_delay);
|
||||||
|
// time::sleep(next_poisson_delay).await;
|
||||||
|
// //let a = 1;
|
||||||
|
// let msg = StreamMessage::Cover;
|
||||||
|
// Some((msg, (rng, avg_delay)))
|
||||||
|
// })
|
||||||
|
//}
|
||||||
|
|
||||||
impl<R> Stream for OutQueueControl<R>
|
impl<R> Stream for OutQueueControl<R>
|
||||||
where
|
where
|
||||||
R: CryptoRng + Rng + Unpin,
|
R: CryptoRng + Rng + Unpin,
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
||||||
debug!(
|
trace!(
|
||||||
"Processing {:?} new message that might get added to the buffer!",
|
"Processing {:?} new message that might get added to the buffer!",
|
||||||
msgs.len()
|
msgs.len()
|
||||||
);
|
);
|
||||||
@@ -436,12 +436,12 @@ impl ReceivedMessagesBufferController {
|
|||||||
let mut request_receiver = self.request_receiver;
|
let mut request_receiver = self.request_receiver;
|
||||||
|
|
||||||
let shutdown_handle = shutdown.clone();
|
let shutdown_handle = shutdown.clone();
|
||||||
spawn_future(async move {
|
spawn_future("fragmented message receiver", async move {
|
||||||
fragmented_message_receiver
|
fragmented_message_receiver
|
||||||
.run_with_shutdown(shutdown_handle)
|
.run_with_shutdown(shutdown_handle)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
spawn_future(async move {
|
spawn_future("request receiver", async move {
|
||||||
request_receiver.run_with_shutdown(shutdown).await;
|
request_receiver.run_with_shutdown(shutdown).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -305,7 +305,7 @@ impl TopologyRefresher {
|
|||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
|
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
|
||||||
spawn_future(async move {
|
spawn_future("topology refresher", async move {
|
||||||
debug!("Started TopologyRefresher with graceful shutdown support");
|
debug!("Started TopologyRefresher with graceful shutdown support");
|
||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
|
|||||||
@@ -13,11 +13,33 @@ where
|
|||||||
wasm_bindgen_futures::spawn_local(future);
|
wasm_bindgen_futures::spawn_local(future);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
//#[cfg(not(target_arch = "wasm32"))]
|
||||||
pub(crate) fn spawn_future<F>(future: F)
|
//pub(crate) fn spawn_future<F>(future: F)
|
||||||
|
//where
|
||||||
|
// F: Future + Send + 'static,
|
||||||
|
// F::Output: Send + 'static,
|
||||||
|
//{
|
||||||
|
// tokio::spawn(future);
|
||||||
|
//}
|
||||||
|
|
||||||
|
pub(crate) fn spawn_future<F>(task_name: &str, future: F)
|
||||||
where
|
where
|
||||||
F: Future + Send + 'static,
|
F: Future + Send + 'static,
|
||||||
F::Output: Send + 'static,
|
F::Output: Send + 'static,
|
||||||
{
|
{
|
||||||
|
//tokio::task::Builder::default()
|
||||||
|
// .name(task_name)
|
||||||
|
// .spawn(future);
|
||||||
tokio::spawn(future);
|
tokio::spawn(future);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//pub(crate) fn spawn_future_named<F>(future: F, task_name: &str)
|
||||||
|
//where
|
||||||
|
// F: Future + Send + 'static,
|
||||||
|
// F::Output: Send + 'static,
|
||||||
|
//{
|
||||||
|
// tokio::task::Builder::default()
|
||||||
|
// .name(task_name)
|
||||||
|
// .spawn(future);
|
||||||
|
// //tokio::spawn(future);
|
||||||
|
//}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] } # rng-related traits +
|
|||||||
serde = { version = "1.0.104", features = ["derive"] } # for config serialization/deserialization
|
serde = { version = "1.0.104", features = ["derive"] } # for config serialization/deserialization
|
||||||
sled = "0.34" # for storage of replySURB decryption keys
|
sled = "0.34" # for storage of replySURB decryption keys
|
||||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] } # async runtime
|
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] } # async runtime
|
||||||
|
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
|
||||||
tokio-tungstenite = "0.14" # websocket
|
tokio-tungstenite = "0.14" # websocket
|
||||||
|
|
||||||
## internal
|
## internal
|
||||||
|
|||||||
@@ -359,7 +359,9 @@ impl NymClient {
|
|||||||
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
||||||
// they are used by cover traffic stream and real traffic stream
|
// they are used by cover traffic stream and real traffic stream
|
||||||
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
||||||
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
||||||
|
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::channel(4);
|
||||||
|
let (sphinx_message_sender, sphinx_message_receiver) = tokio::sync::mpsc::channel(16);
|
||||||
|
|
||||||
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
||||||
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] }
|
|||||||
serde = { version = "1.0", features = ["derive"] } # for config serialization/deserialization
|
serde = { version = "1.0", features = ["derive"] } # for config serialization/deserialization
|
||||||
snafu = "0.6"
|
snafu = "0.6"
|
||||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||||
|
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
|
||||||
url = "2.2"
|
url = "2.2"
|
||||||
|
|
||||||
# internal
|
# internal
|
||||||
|
|||||||
@@ -348,7 +348,9 @@ impl NymClient {
|
|||||||
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
||||||
// they are used by cover traffic stream and real traffic stream
|
// they are used by cover traffic stream and real traffic stream
|
||||||
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
||||||
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
||||||
|
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::channel(4);
|
||||||
|
let (sphinx_message_sender, sphinx_message_receiver) = tokio::sync::mpsc::channel(16);
|
||||||
|
|
||||||
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
||||||
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
||||||
|
|||||||
@@ -10,6 +10,9 @@ pub mod socks;
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||||
|
//console_subscriber::init();
|
||||||
|
|
||||||
setup_logging();
|
setup_logging();
|
||||||
println!("{}", banner());
|
println!("{}", banner());
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,8 @@ default-features = false
|
|||||||
# non-wasm-only dependencies
|
# non-wasm-only dependencies
|
||||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||||
version = "1.21.2"
|
version = "1.21.2"
|
||||||
features = ["macros", "rt", "net", "sync", "time"]
|
#features = ["macros", "rt", "net", "sync", "time"]
|
||||||
|
features = ["full", "tracing"]
|
||||||
|
|
||||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||||
version = "0.1.9"
|
version = "0.1.9"
|
||||||
|
|||||||
@@ -383,11 +383,13 @@ impl GatewayClient {
|
|||||||
) -> Result<(), GatewayClientError> {
|
) -> Result<(), GatewayClientError> {
|
||||||
match self.connection {
|
match self.connection {
|
||||||
SocketState::Available(ref mut conn) => {
|
SocketState::Available(ref mut conn) => {
|
||||||
|
log::debug!("SocketState::Available: sending size: {}", messages.len());
|
||||||
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
|
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
|
||||||
let mut send_stream = futures::stream::iter(stream_messages);
|
let mut send_stream = futures::stream::iter(stream_messages);
|
||||||
Ok(conn.send_all(&mut send_stream).await?)
|
Ok(conn.send_all(&mut send_stream).await?)
|
||||||
}
|
}
|
||||||
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
||||||
|
log::debug!("SocketState::PartiallyDelegated: sending size: {}", messages.len());
|
||||||
if let Err(err) = partially_delegated
|
if let Err(err) = partially_delegated
|
||||||
.batch_send_without_response(messages)
|
.batch_send_without_response(messages)
|
||||||
.await
|
.await
|
||||||
@@ -415,8 +417,12 @@ impl GatewayClient {
|
|||||||
msg: Message,
|
msg: Message,
|
||||||
) -> Result<(), GatewayClientError> {
|
) -> Result<(), GatewayClientError> {
|
||||||
match self.connection {
|
match self.connection {
|
||||||
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
|
SocketState::Available(ref mut conn) => {
|
||||||
|
//log::debug!("SocketState::Available: conn.send()");
|
||||||
|
Ok(conn.send(msg).await?)
|
||||||
|
},
|
||||||
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
||||||
|
//log::debug!("SocketState::PartiallyDelegated: send_without_response()");
|
||||||
if let Err(err) = partially_delegated.send_without_response(msg).await {
|
if let Err(err) = partially_delegated.send_without_response(msg).await {
|
||||||
error!("failed to send message without response - {}...", err);
|
error!("failed to send message without response - {}...", err);
|
||||||
// we must ensure we do not leave the task still active
|
// we must ensure we do not leave the task still active
|
||||||
@@ -607,6 +613,7 @@ impl GatewayClient {
|
|||||||
&mut self,
|
&mut self,
|
||||||
packets: Vec<MixPacket>,
|
packets: Vec<MixPacket>,
|
||||||
) -> Result<(), GatewayClientError> {
|
) -> Result<(), GatewayClientError> {
|
||||||
|
//log::debug!("batch_send_mix_packets: {}", packets.len());
|
||||||
if !self.authenticated {
|
if !self.authenticated {
|
||||||
return Err(GatewayClientError::NotAuthenticated);
|
return Err(GatewayClientError::NotAuthenticated);
|
||||||
}
|
}
|
||||||
@@ -677,6 +684,7 @@ impl GatewayClient {
|
|||||||
&mut self,
|
&mut self,
|
||||||
mix_packet: MixPacket,
|
mix_packet: MixPacket,
|
||||||
) -> Result<(), GatewayClientError> {
|
) -> Result<(), GatewayClientError> {
|
||||||
|
//log::debug!("send_mix_packet");
|
||||||
if !self.authenticated {
|
if !self.authenticated {
|
||||||
return Err(GatewayClientError::NotAuthenticated);
|
return Err(GatewayClientError::NotAuthenticated);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ pub struct Args {
|
|||||||
|
|
||||||
#[clap(
|
#[clap(
|
||||||
value_parser,
|
value_parser,
|
||||||
requires = "fundsDenom",
|
requires = "funds-denom",
|
||||||
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
|
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
|
||||||
)]
|
)]
|
||||||
pub funds: Option<u128>,
|
pub funds: Option<u128>,
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ pub struct Args {
|
|||||||
|
|
||||||
#[clap(
|
#[clap(
|
||||||
long,
|
long,
|
||||||
requires = "fundsDenom",
|
requires = "funds-denom",
|
||||||
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
|
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
|
||||||
)]
|
)]
|
||||||
pub funds: Option<u128>,
|
pub funds: Option<u128>,
|
||||||
|
|||||||
+1
-1
@@ -9,7 +9,7 @@ MIX_DENOM_DISPLAY=nym
|
|||||||
STAKE_DENOM=unyx
|
STAKE_DENOM=unyx
|
||||||
STAKE_DENOM_DISPLAY=nyx
|
STAKE_DENOM_DISPLAY=nyx
|
||||||
DENOMS_EXPONENT=6
|
DENOMS_EXPONENT=6
|
||||||
MIXNET_CONTRACT_ADDRESS=n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep
|
MIXNET_CONTRACT_ADDRESS=n1rjzps6qrmdqmf0xz4cn4x4rcmqeqzq6hnzqg4wcvd0r2lyasdq5sepn5s8
|
||||||
VESTING_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
|
VESTING_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
|
||||||
BANDWIDTH_CLAIM_CONTRACT_ADDRESS=n19lc9u84cz0yz3fww5283nucc9yvr8gsjmgeul0
|
BANDWIDTH_CLAIM_CONTRACT_ADDRESS=n19lc9u84cz0yz3fww5283nucc9yvr8gsjmgeul0
|
||||||
COCONUT_BANDWIDTH_CONTRACT_ADDRESS=n1ghd753shjuwexxywmgs4xz7x2q732vcn7ty4yw
|
COCONUT_BANDWIDTH_CONTRACT_ADDRESS=n1ghd753shjuwexxywmgs4xz7x2q732vcn7ty4yw
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ tokio-stream = { version = "0.1.9", features = ["fs"] }
|
|||||||
tokio-tungstenite = "0.14"
|
tokio-tungstenite = "0.14"
|
||||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||||
url = { version = "2.2", features = ["serde"] }
|
url = { version = "2.2", features = ["serde"] }
|
||||||
|
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
|
||||||
|
|
||||||
# internal
|
# internal
|
||||||
coconut-interface = { path = "../common/coconut-interface", optional = true }
|
coconut-interface = { path = "../common/coconut-interface", optional = true }
|
||||||
|
|||||||
@@ -29,6 +29,9 @@ struct Cli {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||||
|
//console_subscriber::init();
|
||||||
|
|
||||||
setup_logging();
|
setup_logging();
|
||||||
println!("{}", banner());
|
println!("{}", banner());
|
||||||
LONG_VERSION
|
LONG_VERSION
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
|
|
||||||
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
|
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
|
||||||
if let Some(forward_ack) = forward_ack {
|
if let Some(forward_ack) = forward_ack {
|
||||||
trace!(
|
debug!(
|
||||||
"Sending ack from packet for {} to {}",
|
"Sending ack from packet for {} to {}",
|
||||||
client_address,
|
client_address,
|
||||||
forward_ack.next_hop()
|
forward_ack.next_hop()
|
||||||
@@ -131,23 +131,32 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
|
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
|
||||||
|
log::debug!("Handle processed packet");
|
||||||
let client_address = processed_final_hop.destination;
|
let client_address = processed_final_hop.destination;
|
||||||
let message = processed_final_hop.message;
|
let message = processed_final_hop.message;
|
||||||
let forward_ack = processed_final_hop.forward_ack;
|
let forward_ack = processed_final_hop.forward_ack;
|
||||||
|
|
||||||
// we failed to push message directly to the client - it's probably offline.
|
// we failed to push message directly to the client - it's probably offline.
|
||||||
// we should store it on the disk instead.
|
// we should store it on the disk instead.
|
||||||
|
let t = tokio::time::Instant::now();
|
||||||
|
|
||||||
match self.try_push_message_to_client(client_address, message) {
|
match self.try_push_message_to_client(client_address, message) {
|
||||||
Err(unsent_plaintext) => match self
|
Err(unsent_plaintext) => match self
|
||||||
.store_processed_packet_payload(client_address, unsent_plaintext)
|
.store_processed_packet_payload(client_address, unsent_plaintext)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Err(err) => error!("Failed to store client data - {}", err),
|
Err(err) => error!("Failed to store client data - {}", err),
|
||||||
Ok(_) => trace!("Stored packet for {}", client_address),
|
Ok(_) => debug!("Stored packet for {}", client_address),
|
||||||
},
|
},
|
||||||
Ok(_) => trace!("Pushed received packet to {}", client_address),
|
Ok(_) => debug!("Pushed received packet to {}", client_address),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let elapsed = t.elapsed();
|
||||||
|
if elapsed.as_millis() > 100 {
|
||||||
|
log::error!("push message top client, time elapsed: {:?}", elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// if we managed to either push message directly to the [online] client or store it at
|
// if we managed to either push message directly to the [online] client or store it at
|
||||||
// its inbox, it means that it must exist at this gateway, hence we can send the
|
// its inbox, it means that it must exist at this gateway, hence we can send the
|
||||||
// received ack back into the network
|
// received ack back into the network
|
||||||
@@ -161,6 +170,8 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
// question: can it also be per connection vs global?
|
// question: can it also be per connection vs global?
|
||||||
//
|
//
|
||||||
|
|
||||||
|
log::debug!("Handle received packet");
|
||||||
|
|
||||||
let processed_final_hop = match self.packet_processor.process_received(framed_sphinx_packet)
|
let processed_final_hop = match self.packet_processor.process_received(framed_sphinx_packet)
|
||||||
{
|
{
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user