Compare commits

...

25 Commits

Author SHA1 Message Date
Simon Wicky c28410f6de allow multiple messages decryption 2023-08-02 13:59:06 +02:00
Simon Wicky 203633cabf noise between client and gateway, first try 2023-07-28 14:33:01 +02:00
Simon Wicky 331a0328c7 remove absolute dependence on topology 2023-07-28 12:09:13 +02:00
Simon Wicky 9e3bb6ef24 error handling on try_read + EOF 2023-07-28 09:25:44 +02:00
Simon Wicky 4014467496 revamp poll_read for Noise Stream 2023-07-27 12:25:17 +02:00
Simon Wicky 3cd17be26f error handling 2023-07-27 10:56:35 +02:00
Simon Wicky 9d91145f0a somewhat working stream 2023-07-26 16:18:40 +02:00
Simon Wicky 83d0dfc657 bit of cleanup 2023-07-26 15:47:43 +02:00
Simon Wicky f91a22cb6a store extra bytes in storage 2023-07-26 14:49:09 +02:00
Simon Wicky 981f567131 first try at noisestream 2023-07-26 14:44:56 +02:00
Simon Wicky c58331f9b0 change to sphinx key initiator side 2023-07-25 09:52:04 +02:00
Simon Wicky 5d26fefaaa hash psk 2023-07-25 09:23:51 +02:00
Simon Wicky dc77e3f962 swap id key for sphinx key 2023-07-24 16:54:08 +02:00
Simon Wicky eca406d9a7 more debugging... 2023-07-24 16:26:32 +02:00
Simon Wicky 139bde0176 more debugging 2023-07-24 16:19:31 +02:00
Simon Wicky a4c0be13f8 continue debugging 2023-07-24 15:16:32 +02:00
Simon Wicky f680222b91 add debug info 2023-07-24 11:48:18 +02:00
Simon Wicky 27b399331b debug info 2023-07-24 11:27:36 +02:00
Simon Wicky 3228bb5aa0 swap misplaced arguments for upgrade responder 2023-07-24 11:27:26 +02:00
Simon Wicky 8e4516a0a8 find node by ip not by full ip,port addr 2023-07-24 10:44:49 +02:00
Simon Wicky 234656aba8 draft a full noise handshake, with dummy secret 2023-07-24 10:18:22 +02:00
Simon Wicky 15f43c705a bring what's needed for secret, but epoch 2023-07-21 16:37:52 +02:00
Simon Wicky 044251d60f bring private key to the noise handshake. still does nothing though 2023-07-21 14:41:31 +02:00
Simon Wicky 5424568e81 add NoiseStream decorator, does nothing yet 2023-07-21 12:02:03 +02:00
Simon Wicky b939978c28 add topology refresher to gateways and mixnodes 2023-07-21 09:20:23 +02:00
37 changed files with 1182 additions and 120 deletions
Generated
+239 -69
View File
@@ -14,6 +14,15 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
dependencies = [
"generic-array 0.14.7",
]
[[package]]
name = "aead"
version = "0.5.2"
@@ -48,17 +57,31 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "aes-gcm"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6"
dependencies = [
"aead 0.4.3",
"aes 0.7.5",
"cipher 0.3.0",
"ctr 0.8.0",
"ghash 0.4.4",
"subtle 2.4.1",
]
[[package]]
name = "aes-gcm"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e1366e0c69c9f927b1fa5ce2c7bf9eafc8f9268c0b9800729e8b267612447c"
dependencies = [
"aead",
"aead 0.5.2",
"aes 0.8.2",
"cipher 0.4.4",
"ctr 0.9.2",
"ghash",
"ghash 0.5.0",
"subtle 2.4.1",
]
@@ -244,7 +267,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -255,7 +278,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -472,7 +495,7 @@ version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -486,7 +509,7 @@ dependencies = [
"cc",
"cfg-if",
"constant_time_eq",
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -618,6 +641,18 @@ dependencies = [
"keystream",
]
[[package]]
name = "chacha20"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6"
dependencies = [
"cfg-if",
"cipher 0.3.0",
"cpufeatures",
"zeroize",
]
[[package]]
name = "chacha20"
version = "0.9.1"
@@ -629,16 +664,29 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "chacha20poly1305"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5"
dependencies = [
"aead 0.4.3",
"chacha20 0.8.2",
"cipher 0.3.0",
"poly1305 0.7.2",
"zeroize",
]
[[package]]
name = "chacha20poly1305"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35"
dependencies = [
"aead",
"chacha20",
"aead 0.5.2",
"chacha20 0.9.1",
"cipher 0.4.4",
"poly1305",
"poly1305 0.8.0",
"zeroize",
]
@@ -770,7 +818,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -899,13 +947,13 @@ version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb"
dependencies = [
"aes-gcm",
"aes-gcm 0.10.1",
"base64 0.20.0",
"hkdf 0.12.3",
"hmac 0.12.1",
"percent-encoding",
"rand 0.8.5",
"sha2 0.10.6",
"sha2 0.10.7",
"subtle 2.4.1",
"time 0.3.21",
"version_check",
@@ -1308,6 +1356,20 @@ dependencies = [
"zeroize",
]
[[package]]
name = "curve25519-dalek"
version = "4.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d4ba9852b42210c7538b75484f9daa0655e9a3ac04f693747bb0f02cf3cfe16"
dependencies = [
"cfg-if",
"fiat-crypto",
"packed_simd_2",
"platforms",
"subtle 2.4.1",
"zeroize",
]
[[package]]
name = "cw-controllers"
version = "0.13.4"
@@ -1501,9 +1563,9 @@ dependencies = [
[[package]]
name = "digest"
version = "0.10.6"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer 0.10.4",
"crypto-common",
@@ -1603,7 +1665,7 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d"
dependencies = [
"curve25519-dalek",
"curve25519-dalek 3.2.0",
"ed25519",
"rand 0.7.3",
"serde",
@@ -1618,7 +1680,7 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c24f403d068ad0b359e577a77f92392118be3f3c927538f2bb544a5ecd828c6"
dependencies = [
"curve25519-dalek",
"curve25519-dalek 3.2.0",
"hashbrown 0.12.3",
"hex",
"rand_core 0.6.4",
@@ -1677,7 +1739,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -1832,6 +1894,12 @@ dependencies = [
"subtle 2.4.1",
]
[[package]]
name = "fiat-crypto"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e825f6987101665dea6ec934c09ec6d721de7bc1bf92248e1d5810c8cd636b77"
[[package]]
name = "figment"
version = "0.10.8"
@@ -2029,7 +2097,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -2140,6 +2208,16 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "ghash"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99"
dependencies = [
"opaque-debug 0.3.0",
"polyval 0.5.3",
]
[[package]]
name = "ghash"
version = "0.5.0"
@@ -2147,7 +2225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40"
dependencies = [
"opaque-debug 0.3.0",
"polyval",
"polyval 0.6.0",
]
[[package]]
@@ -2158,7 +2236,7 @@ checksum = "e77ac7b51b8e6313251737fcef4b1c01a2ea102bde68415b62c0ee9268fec357"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -2439,7 +2517,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -3010,6 +3088,12 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "libm"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a"
[[package]]
name = "libm"
version = "0.2.7"
@@ -3069,12 +3153,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.17"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "loom"
@@ -3307,7 +3388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg 1.1.0",
"libm",
"libm 0.2.7",
]
[[package]]
@@ -3587,7 +3668,7 @@ dependencies = [
"rand 0.7.3",
"serde",
"serde_json",
"sha2 0.10.6",
"sha2 0.10.7",
"sqlx 0.6.3",
"tap",
"tempfile",
@@ -3749,7 +3830,7 @@ dependencies = [
"bs58",
"cipher 0.4.4",
"ctr 0.9.2",
"digest 0.10.6",
"digest 0.10.7",
"ed25519-dalek",
"generic-array 0.14.7",
"hkdf 0.12.3",
@@ -3817,6 +3898,7 @@ dependencies = [
"log",
"nym-api-requests",
"nym-bin-common",
"nym-client-core",
"nym-coconut-interface",
"nym-config",
"nym-credentials",
@@ -3825,10 +3907,12 @@ dependencies = [
"nym-mixnet-client",
"nym-mixnode-common",
"nym-network-defaults",
"nym-noise",
"nym-pemstore",
"nym-sphinx",
"nym-statistics-common",
"nym-task",
"nym-topology",
"nym-types",
"nym-validator-client",
"once_cell",
@@ -3860,6 +3944,7 @@ dependencies = [
"nym-crypto",
"nym-gateway-requests",
"nym-network-defaults",
"nym-noise",
"nym-pemstore",
"nym-sphinx",
"nym-task",
@@ -3923,6 +4008,9 @@ version = "0.1.0"
dependencies = [
"futures",
"log",
"nym-client-core",
"nym-crypto",
"nym-noise",
"nym-sphinx",
"nym-task",
"tokio",
@@ -3965,11 +4053,13 @@ dependencies = [
"lazy_static",
"log",
"nym-bin-common",
"nym-client-core",
"nym-config",
"nym-contracts-common",
"nym-crypto",
"nym-mixnet-client",
"nym-mixnode-common",
"nym-noise",
"nym-nonexhaustive-delayqueue",
"nym-pemstore",
"nym-sphinx",
@@ -4140,6 +4230,21 @@ dependencies = [
"wasm-utils",
]
[[package]]
name = "nym-noise"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"log",
"nym-topology",
"pin-project",
"sha2 0.10.7",
"snow",
"thiserror",
"tokio",
]
[[package]]
name = "nym-nonexhaustive-delayqueue"
version = "0.1.0"
@@ -4181,10 +4286,10 @@ name = "nym-outfox"
version = "0.1.0"
dependencies = [
"blake3",
"chacha20",
"chacha20poly1305",
"chacha20 0.9.1",
"chacha20poly1305 0.10.1",
"criterion",
"curve25519-dalek",
"curve25519-dalek 3.2.0",
"fastrand",
"getrandom 0.2.10",
"log",
@@ -4539,7 +4644,7 @@ dependencies = [
name = "nym-store-cipher"
version = "0.1.0"
dependencies = [
"aes-gcm",
"aes-gcm 0.10.1",
"argon2",
"generic-array 0.14.7",
"getrandom 0.2.10",
@@ -4758,7 +4863,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -4906,6 +5011,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "packed_simd_2"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1914cd452d8fccd6f9db48147b29fd4ae05bea9dc5d9ad578509f72415de282"
dependencies = [
"cfg-if",
"libm 0.1.4",
]
[[package]]
name = "pairing"
version = "0.20.0"
@@ -5024,7 +5139,7 @@ dependencies = [
"proc-macro2",
"proc-macro2-diagnostics 0.10.0",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -5101,7 +5216,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -5112,7 +5227,7 @@ checksum = "745a452f8eb71e39ffd8ee32b3c5f51d03845f99786fa9b68db6ff509c505411"
dependencies = [
"once_cell",
"pest",
"sha2 0.10.6",
"sha2 0.10.7",
]
[[package]]
@@ -5132,7 +5247,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -5164,6 +5279,12 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "platforms"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d7ddaed09e0eb771a79ab0fd64609ba0afb0a8366421957936ad14cbd13630"
[[package]]
name = "plotters"
version = "0.3.4"
@@ -5208,6 +5329,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "poly1305"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
dependencies = [
"cpufeatures",
"opaque-debug 0.3.0",
"universal-hash 0.4.1",
]
[[package]]
name = "poly1305"
version = "0.8.0"
@@ -5216,7 +5348,19 @@ checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf"
dependencies = [
"cpufeatures",
"opaque-debug 0.3.0",
"universal-hash",
"universal-hash 0.5.0",
]
[[package]]
name = "polyval"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug 0.3.0",
"universal-hash 0.4.1",
]
[[package]]
@@ -5228,7 +5372,7 @@ dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug 0.3.0",
"universal-hash",
"universal-hash 0.5.0",
]
[[package]]
@@ -5283,9 +5427,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.58"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
@@ -5311,7 +5455,7 @@ checksum = "606c4ba35817e2922a308af55ad51bab3645b59eae5c570d4a6cf07e36bd493b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
"version_check",
"yansi",
]
@@ -5406,9 +5550,9 @@ checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quote"
version = "1.0.27"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@@ -5701,7 +5845,7 @@ checksum = "8d2275aab483050ab2a7364c1a46604865ee7d6906684e08db0f090acf74f9e7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -6253,7 +6397,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -6286,7 +6430,7 @@ checksum = "bcec881020c684085e55a25f7fd888954d56609ef363479dc5a1305eb0d40cab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -6331,7 +6475,7 @@ checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -6342,7 +6486,7 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -6360,13 +6504,13 @@ dependencies = [
[[package]]
name = "sha2"
version = "0.10.6"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.6",
"digest 0.10.7",
]
[[package]]
@@ -6478,6 +6622,22 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "snow"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ccba027ba85743e09d15c03296797cad56395089b832b48b5a5217880f57733"
dependencies = [
"aes-gcm 0.9.4",
"blake2 0.10.6",
"chacha20poly1305 0.9.1",
"curve25519-dalek 4.0.0-rc.1",
"rand_core 0.6.4",
"rustc_version 0.4.0",
"sha2 0.10.7",
"subtle 2.4.1",
]
[[package]]
name = "socket2"
version = "0.4.9"
@@ -6500,7 +6660,7 @@ dependencies = [
"bs58",
"byteorder",
"chacha",
"curve25519-dalek",
"curve25519-dalek 3.2.0",
"digest 0.9.0",
"hkdf 0.11.0",
"hmac 0.11.0",
@@ -6613,7 +6773,7 @@ dependencies = [
"paste",
"percent-encoding",
"rustls 0.19.1",
"sha2 0.10.6",
"sha2 0.10.7",
"smallvec",
"sqlformat 0.1.8",
"sqlx-rt 0.5.13",
@@ -6661,7 +6821,7 @@ dependencies = [
"percent-encoding",
"rustls 0.20.8",
"rustls-pemfile",
"sha2 0.10.6",
"sha2 0.10.7",
"smallvec",
"sqlformat 0.2.1",
"sqlx-rt 0.6.3",
@@ -6684,7 +6844,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"sha2 0.10.6",
"sha2 0.10.7",
"sqlx-core 0.5.13",
"sqlx-rt 0.5.13",
"syn 1.0.109",
@@ -6703,7 +6863,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"sha2 0.10.6",
"sha2 0.10.7",
"sqlx-core 0.6.3",
"sqlx-rt 0.6.3",
"syn 1.0.109",
@@ -6847,9 +7007,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.16"
version = "2.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01"
checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0"
dependencies = [
"proc-macro2",
"quote",
@@ -7009,22 +7169,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -7161,7 +7321,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -7397,7 +7557,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@@ -7675,6 +7835,16 @@ dependencies = [
"extension-traits",
]
[[package]]
name = "universal-hash"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
dependencies = [
"generic-array 0.14.7",
"subtle 2.4.1",
]
[[package]]
name = "universal-hash"
version = "0.5.0"
@@ -8208,7 +8378,7 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a0c105152107e3b96f6a00a65e86ce82d9b125230e1c4302940eca58ff71f4f"
dependencies = [
"curve25519-dalek",
"curve25519-dalek 3.2.0",
"rand_core 0.5.1",
"serde",
"zeroize",
@@ -8237,5 +8407,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
@@ -306,15 +306,21 @@ where
{
let gateway_address = gateway_config.gateway_listener.clone();
let gateway_id = gateway_config.gateway_id;
let gateway_sphinx = gateway_config.gateway_sphinx;
// TODO: in theory, at this point, this should be infallible
let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
let gateway_sphinx_key = encryption::PublicKey::from_base58_string(gateway_sphinx)
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)?;
let mut gateway_client = GatewayClient::new(
gateway_address,
managed_keys.identity_keypair(),
managed_keys.encryption_keypair(),
gateway_identity,
gateway_sphinx_key,
Some(managed_keys.must_get_gateway_shared_key()),
mixnet_message_sender,
ack_sender,
@@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> {
Ok(topology)
}
pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
Ok(topology)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
@@ -9,8 +9,8 @@ use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopologyError;
use std::time::Duration;
mod accessor;
pub(crate) mod nym_api_provider;
pub mod accessor;
pub mod nym_api_provider;
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -9,7 +9,7 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use url::Url;
pub(crate) struct NymApiTopologyProvider {
pub struct NymApiTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,
@@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub(crate) fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
pub fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
NymApiTopologyProvider {
+11 -1
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_config::defaults::NymNetworkDetails;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_sphinx::params::{PacketSize, PacketType};
use serde::{Deserialize, Serialize};
use std::time::Duration;
@@ -207,6 +207,8 @@ pub struct GatewayEndpointConfig {
/// If initially omitted, a random gateway will be chosen from the available topology.
pub gateway_id: String,
pub gateway_sphinx: String,
/// Address of the gateway owner to which the client should send messages.
pub gateway_owner: String,
@@ -219,11 +221,13 @@ impl GatewayEndpointConfig {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(constructor))]
pub fn new(
gateway_id: String,
gateway_sphinx: String,
gateway_owner: String,
gateway_listener: String,
) -> GatewayEndpointConfig {
GatewayEndpointConfig {
gateway_id,
gateway_sphinx,
gateway_owner,
gateway_listener,
}
@@ -236,6 +240,11 @@ impl GatewayEndpointConfig {
identity::PublicKey::from_base58_string(&self.gateway_id)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)
}
pub fn try_get_gateway_sphinx_key(&self) -> Result<encryption::PublicKey, ClientCoreError> {
encryption::PublicKey::from_base58_string(&self.gateway_sphinx)
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)
}
}
impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
@@ -243,6 +252,7 @@ impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
let gateway_listener = node.clients_address();
GatewayEndpointConfig {
gateway_id: node.identity_key.to_base58_string(),
gateway_sphinx: node.sphinx_key.to_base58_string(),
gateway_owner: node.owner,
gateway_listener,
}
@@ -68,6 +68,7 @@ pub struct ConfigV1_1_20<T> {
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
pub struct GatewayEndpointConfigV1_1_20 {
pub gateway_id: String,
pub gateway_sphinx: String,
pub gateway_owner: String,
pub gateway_listener: String,
}
@@ -76,6 +77,7 @@ impl From<GatewayEndpointConfigV1_1_20> for GatewayEndpointConfigV1_1_20_2 {
fn from(value: GatewayEndpointConfigV1_1_20) -> Self {
GatewayEndpointConfigV1_1_20_2 {
gateway_id: value.gateway_id,
gateway_sphinx: value.gateway_sphinx,
gateway_owner: value.gateway_owner,
gateway_listener: value.gateway_listener,
}
@@ -73,6 +73,8 @@ pub struct GatewayEndpointConfigV1_1_20_2 {
/// If initially omitted, a random gateway will be chosen from the available topology.
pub gateway_id: String,
pub gateway_sphinx: String,
/// Address of the gateway owner to which the client should send messages.
pub gateway_owner: String,
@@ -84,6 +86,7 @@ impl From<GatewayEndpointConfigV1_1_20_2> for GatewayEndpointConfig {
fn from(value: GatewayEndpointConfigV1_1_20_2) -> Self {
GatewayEndpointConfig {
gateway_id: value.gateway_id,
gateway_sphinx: value.gateway_sphinx,
gateway_owner: value.gateway_owner,
gateway_listener: value.gateway_listener,
}
+4
View File
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_crypto::asymmetric::encryption::KeyRecoveryError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::gateway::GatewayConversionError;
@@ -58,6 +59,9 @@ pub enum ClientCoreError {
#[error("The gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("The gateway sphinx is invalid - {0}")]
UnableToCreateSphinxKeyFromGatewayId(KeyRecoveryError),
#[error("The identity of the gateway is unknown - did you run init?")]
GatewayIdUnknown,
+4 -1
View File
@@ -5,7 +5,7 @@ use crate::config::GatewayEndpointConfig;
use crate::error::ClientCoreError;
use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_gateway_client::GatewayClient;
use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_topology::{filter::VersionFilterable, gateway};
@@ -200,12 +200,15 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) async fn register_with_gateway(
gateway: &GatewayEndpointConfig,
our_identity: Arc<identity::KeyPair>,
our_sphinx: Arc<encryption::KeyPair>,
) -> Result<Arc<SharedKeys>, ClientCoreError> {
let timeout = Duration::from_millis(1500);
let mut gateway_client: GatewayClient<DirectSigningNyxdClient, _> = GatewayClient::new_init(
gateway.gateway_listener.clone(),
gateway.try_get_gateway_identity_key()?,
gateway.try_get_gateway_sphinx_key()?,
our_identity.clone(),
our_sphinx.clone(),
timeout,
);
gateway_client
+4 -1
View File
@@ -370,8 +370,11 @@ where
// get our identity key
let our_identity = managed_keys.identity_keypair();
let our_sphinx = managed_keys.encryption_keypair();
// Establish connection, authenticate and generate keys for talking with the gateway
let shared_keys = helpers::register_with_gateway(&gateway_details, our_identity).await?;
let shared_keys =
helpers::register_with_gateway(&gateway_details, our_identity, our_sphinx).await?;
let persisted_details = PersistedGatewayDetails::new(gateway_details, &shared_keys);
@@ -27,6 +27,7 @@ nym-sphinx = { path = "../../nymsphinx" }
nym-pemstore = { path = "../../pemstore" }
nym-validator-client = { path = "../validator-client" }
nym-task = { path = "../../task" }
nym-noise = { path = "../../nymnoise"}
serde = { workspace = true, features = ["derive"] }
@@ -14,24 +14,27 @@ use nym_bandwidth_controller::BandwidthController;
use nym_coconut_interface::Credential;
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_gateway_requests::authentication::encrypted_address::EncryptedAddressBytes;
use nym_gateway_requests::iv::IV;
use nym_gateway_requests::registration::handshake::{client_handshake, SharedKeys};
use nym_gateway_requests::{BinaryRequest, ClientControlRequest, ServerResponse, PROTOCOL_VERSION};
use nym_network_defaults::{REMAINING_BANDWIDTH_THRESHOLD, TOKENS_TO_BURN};
use nym_noise::upgrade_noise_initiator;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tungstenite::protocol::Message;
#[cfg(not(target_arch = "wasm32"))]
use nym_validator_client::nyxd::traits::DkgQueryClient;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
use tokio_tungstenite::client_async;
#[cfg(target_arch = "wasm32")]
use nym_bandwidth_controller::wasm_mockups::DkgQueryClient;
@@ -49,7 +52,9 @@ pub struct GatewayClient<C, St> {
bandwidth_remaining: i64,
gateway_address: String,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
shared_key: Option<Arc<SharedKeys>>,
connection: SocketState,
packet_router: PacketRouter,
@@ -75,7 +80,9 @@ impl<C, St> GatewayClient<C, St> {
pub fn new(
gateway_address: String,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
// TODO: make it mandatory. if you don't want to pass it, use `new_init`
shared_key: Option<Arc<SharedKeys>>,
mixnet_message_sender: MixnetMessageSender,
@@ -89,8 +96,10 @@ impl<C, St> GatewayClient<C, St> {
disabled_credentials_mode: true,
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
local_identity,
local_sphinx,
gateway_identity,
gateway_sphinx,
shared_key,
connection: SocketState::NotConnected,
packet_router: PacketRouter::new(ack_sender, mixnet_message_sender, shutdown.clone()),
@@ -163,7 +172,50 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
let ws_stream = match connect_async(&self.gateway_address).await {
let socket_addr: SocketAddr = self.gateway_address.parse().unwrap();
let connection_fut = TcpStream::connect(socket_addr);
//arbitrary TO, it's a POC
let noise_conn = match tokio::time::timeout(Duration::from_secs(5), connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to gateway");
let noise_stream = match upgrade_noise_initiator(
stream,
None, //as a client, the gateway cannot know my pub key
&self.local_sphinx.private_key().to_bytes(),
&self.gateway_sphinx.to_bytes(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!(
"Failed to perform Noise handshake with {:?} - {err}",
self.gateway_address
);
return Err(GatewayClientError::ConnectionNotEstablished);
}
};
debug!(
"Noise initiator handshake completed for {:?}",
self.gateway_address
);
noise_stream
}
Err(err) => {
debug!("failed to establish connection to gateway (err: {})", err);
return Err(GatewayClientError::NetworkIoError(err));
}
},
Err(_) => {
debug!("failed to connect to {} within 5s", self.gateway_address);
return Err(GatewayClientError::Timeout);
}
};
let ws_address = format!("ws://{}", self.gateway_address);
let ws_stream = match client_async(ws_address, noise_conn).await {
Ok((ws_stream, _)) => ws_stream,
Err(e) => return Err(GatewayClientError::NetworkError(e)),
};
@@ -773,7 +825,9 @@ impl<C> GatewayClient<C, EphemeralCredentialStorage> {
pub fn new_init(
gateway_address: String,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
response_timeout_duration: Duration,
) -> Self {
use futures::channel::mpsc;
@@ -791,7 +845,9 @@ impl<C> GatewayClient<C, EphemeralCredentialStorage> {
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
gateway_sphinx,
local_identity,
local_sphinx,
shared_key: None,
connection: SocketState::NotConnected,
packet_router,
@@ -19,6 +19,9 @@ pub enum GatewayClientError {
#[error("There was a network error - {0}")]
NetworkError(#[from] WsError),
#[error("There was a network error - {0}")]
NetworkIoError(#[from] io::Error),
// TODO: see if `JsValue` is a reasonable type for this
#[cfg(target_arch = "wasm32")]
#[error("There was a network error")]
@@ -9,14 +9,13 @@ use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use log::*;
use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_noise::NoiseStream;
use nym_task::TaskClient;
use std::sync::Arc;
use tungstenite::Message;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::WebSocketStream;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures;
@@ -26,7 +25,7 @@ use wasm_utils::websocket::JSWebsocket;
// type alias for not having to type the whole thing every single time (and now it makes it easier
// to use different types based on compilation target)
#[cfg(not(target_arch = "wasm32"))]
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;
type WsConn = WebSocketStream<NoiseStream>;
#[cfg(target_arch = "wasm32")]
type WsConn = JSWebsocket;
@@ -15,3 +15,6 @@ tokio-util = { version = "0.7.4", features = ["codec"] }
# internal
nym-sphinx = { path = "../../nymsphinx" }
nym-task = { path = "../../task" }
nym-client-core = { path = "../../client-core" }
nym-noise = { path = "../../nymnoise"}
nym-crypto = { path = "../../crypto" }
+58 -9
View File
@@ -4,6 +4,9 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_noise::upgrade_noise_initiator_with_topology;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
@@ -59,6 +62,8 @@ pub trait SendWithoutResponse {
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
config: Config,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
}
struct ConnectionSender {
@@ -76,10 +81,16 @@ impl ConnectionSender {
}
impl Client {
pub fn new(config: Config) -> Client {
pub fn new(
config: Config,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
) -> Client {
Client {
conn_new: HashMap::new(),
config,
topology_access,
local_identity,
}
}
@@ -88,6 +99,9 @@ impl Client {
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
topology_access: TopologyAccessor,
local_public_key: &[u8],
local_private_key: &[u8],
) {
let connection_fut = TcpStream::connect(address);
@@ -97,7 +111,32 @@ impl Client {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
//Get the topology, because we need the keys for the handshake
let topology_permit = topology_access.get_read_permit().await;
let topology_ref = match topology_permit.try_get_raw_topology_ref() {
Ok(topology) => topology,
Err(err) => {
error!("Cannot perform Noise handshake to {address}, due to topology error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_initiator_with_topology(
stream,
topology_ref,
local_public_key,
local_private_key,
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
return;
}
};
debug!("Noise initiator handshake completed for {:?}", address);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!(
@@ -175,6 +214,10 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let topology_access_clone = self.topology_access.clone();
let local_public_key = self.local_identity.public_key().to_bytes();
let local_private_key = self.local_identity.private_key().to_bytes();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -187,6 +230,9 @@ impl Client {
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
topology_access_clone,
&local_public_key,
&local_private_key,
)
.await
});
@@ -255,13 +301,16 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
},
TopologyAccessor::new(),
)
}
#[test]
@@ -5,7 +5,10 @@ use crate::client::{Client, Config, SendWithoutResponse};
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_sphinx::forwarding::packet::MixPacket;
use std::sync::Arc;
use std::time::Duration;
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
@@ -26,6 +29,8 @@ impl PacketForwarder {
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_version: bool,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
shutdown: nym_task::TaskClient,
) -> (PacketForwarder, MixForwardingSender) {
let client_config = Config::new(
@@ -40,7 +45,7 @@ impl PacketForwarder {
(
PacketForwarder {
mixnet_client: Client::new(client_config),
mixnet_client: Client::new(client_config, topology_access, local_identity),
packet_receiver,
shutdown,
},
+18
View File
@@ -0,0 +1,18 @@
[package]
name = "nym-noise"
version = "0.1.0"
authors = ["Simon Wicky <simon@nymtech.net>"]
edition = "2021"
[dependencies]
snow = "0.9.2"
futures = "0.3"
tokio = { version = "1.24.1", features = ["net","io-util"] }
pin-project = "1"
log = "0.4.19"
sha2 = "0.10.7"
bytes = "1.0"
thiserror = "1.0.44"
# internal
nym-topology = { path = "../topology"}
+355
View File
@@ -0,0 +1,355 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use log::*;
use nym_topology::NymTopology;
use pin_project::pin_project;
use sha2::{Digest, Sha256};
use snow::error::Prerequisite;
use snow::Builder;
use snow::Error;
use snow::TransportState;
use std::cmp::min;
use std::collections::VecDeque;
use std::io;
use std::io::ErrorKind;
use std::pin::Pin;
use std::task::Poll;
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};
const NOISE_HS_PATTERN: &str = "Noise_XKpsk3_25519_AESGCM_SHA256";
const MAXMSGLEN: usize = 65535;
const TAGLEN: usize = 16;
const HEADER_SIZE: usize = 2;
#[derive(Error, Debug)]
pub enum NoiseError {
#[error("encountered a Noise decryption error")]
DecryptionError,
#[error("encountered a Noise Protocol error - {0}")]
ProtocolError(Error),
#[error("encountered an IO error - {0}")]
IoError(#[from] io::Error),
}
impl From<Error> for NoiseError {
fn from(err: Error) -> Self {
match err {
Error::Decrypt => NoiseError::DecryptionError,
err => NoiseError::ProtocolError(err),
}
}
}
/// Wrapper around a TcpStream
#[pin_project]
pub struct NoiseStream {
#[pin]
inner_stream: TcpStream,
noise: TransportState,
enc_storage: VecDeque<u8>,
dec_storage: VecDeque<u8>,
}
impl NoiseStream {
fn new(inner_stream: TcpStream, noise: TransportState) -> NoiseStream {
NoiseStream {
inner_stream,
noise,
enc_storage: VecDeque::with_capacity(MAXMSGLEN + HEADER_SIZE),
dec_storage: VecDeque::with_capacity(MAXMSGLEN + HEADER_SIZE),
}
}
}
impl AsyncRead for NoiseStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let projected_self = self.project();
let enc_storage = projected_self.enc_storage;
let ready_to_read = projected_self.inner_stream.poll_read_ready(cx);
match ready_to_read {
Poll::Pending => {
//no new data, waking is already scheduled.
//Nothing new to decrypt, only check if we can return something from dec_storage, happens after
}
Poll::Ready(Ok(())) => {
//Read what we can into enc_storage, decrypt what we can into dec_storage
let mut tcp_buf = vec![0u8; MAXMSGLEN + HEADER_SIZE];
if let Ok(tcp_len) = projected_self.inner_stream.try_read(&mut tcp_buf) {
if tcp_len == 0 && projected_self.dec_storage.len() == 0 {
//EOF
return Poll::Ready(Ok(()));
}
enc_storage.extend(&tcp_buf[..tcp_len]);
//we can at least read the length
while enc_storage.len() >= HEADER_SIZE {
let msg_len = ((enc_storage[0] as usize) << 8) + (enc_storage[1] as usize);
//no more messages to read
if enc_storage.len() < HEADER_SIZE + msg_len {
break;
}
//we have a full message to decrypt
//remove size
enc_storage.pop_front();
enc_storage.pop_front();
let noise_msg = enc_storage.drain(..msg_len).collect::<Vec<u8>>();
let mut dec_msg = vec![0u8; MAXMSGLEN];
let len = match projected_self.noise.read_message(&noise_msg, &mut dec_msg)
{
Ok(len) => len,
Err(_) => return Poll::Ready(Err(ErrorKind::InvalidData.into())),
};
projected_self.dec_storage.extend(&dec_msg[..len]);
}
}
}
//an error occured, let's return it right away
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
//check if we can return something
let read_len = min(buf.remaining(), projected_self.dec_storage.len());
if read_len > 0 {
buf.put_slice(
&projected_self
.dec_storage
.drain(..read_len)
.collect::<Vec<u8>>(),
);
return Poll::Ready(Ok(()));
}
//can't return anything, schedule the wakeup and return pending
match projected_self.inner_stream.poll_read_ready(cx) {
Poll::Ready(Ok(())) => {
//we got data in the meantime, we can wake up immediately
cx.waker().wake_by_ref();
}
_ => {}
}
Poll::Pending
}
}
impl AsyncWrite for NoiseStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let projected_self = self.project();
let mut noise_buf = vec![0u8; MAXMSGLEN];
let len = match projected_self.noise.write_message(buf, &mut noise_buf) {
Ok(len) => len,
Err(_) => return Poll::Ready(Err(ErrorKind::InvalidInput.into())),
};
let to_send = [&[(len >> 8) as u8, (len & 0xff) as u8], &noise_buf[..len]].concat();
match projected_self.inner_stream.poll_write(cx, &to_send) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(n)) => {
//didn't send a thing, no problem for the underlying stream
if n == 0 {
return Poll::Ready(Ok(0));
}
//we sent the whole thing, no problem for the underlying stream
//We must guarantee that the return number is <= buf.len()
if n == to_send.len() {
return Poll::Ready(Ok(n - HEADER_SIZE - TAGLEN));
}
//We didn't write the whole message, the stream will be corrupted
return Poll::Ready(Err(ErrorKind::WriteZero.into()));
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_shutdown(cx)
}
}
pub async fn upgrade_noise_initiator(
mut conn: TcpStream,
local_public_key: Option<&[u8]>,
local_private_key: &[u8],
remote_pub_key: &[u8],
) -> Result<NoiseStream, NoiseError> {
trace!("Perform Noise Handshake, initiator side");
//In case the local key cannot be known by the remote party, e.g. in a client-gateway connection
let secret = [local_public_key.unwrap_or(&[]), remote_pub_key].concat();
let secret_hash = Sha256::digest(secret);
let builder = Builder::new(NOISE_HS_PATTERN.parse().unwrap()); //This cannot fail, hardcoded pattern must be correct
let mut handshake = builder
.local_private_key(local_private_key)
.remote_public_key(&remote_pub_key)
.psk(3, &secret_hash)
.build_initiator()?;
//Actual Handshake
let mut buf = vec![0u8; MAXMSGLEN];
// -> e, es
let len = handshake.write_message(&[], &mut buf)?;
send(&mut conn, &buf[..len]).await?;
// <- e, ee
handshake.read_message(&recv(&mut conn).await?, &mut buf)?;
// -> s, se, psk
let len = handshake.write_message(&[], &mut buf)?;
send(&mut conn, &buf[..len]).await?;
let noise = handshake.into_transport_mode()?;
Ok(NoiseStream::new(conn, noise))
}
pub async fn upgrade_noise_initiator_with_topology(
conn: TcpStream,
topology: &NymTopology,
local_public_key: &[u8],
local_private_key: &[u8],
) -> Result<NoiseStream, NoiseError> {
//Get init material
let responder_addr = match conn.peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("Unable to extract peer address from connection - {err}");
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) {
Some(pub_key) => pub_key.to_bytes(),
None => {
error!(
"Cannot find public key for node with address {:?}",
responder_addr
);
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
upgrade_noise_initiator(
conn,
Some(local_public_key),
local_private_key,
&remote_pub_key,
)
.await
}
pub async fn upgrade_noise_responder(
mut conn: TcpStream,
local_public_key: &[u8],
local_private_key: &[u8],
remote_pub_key: Option<&[u8]>,
) -> Result<NoiseStream, NoiseError> {
trace!("Perform Noise Handshake, responder side");
//If the remote_key cannot be kwnown, e.g. in a client-gateway connection
let secret = [&remote_pub_key.unwrap_or(&[]), local_public_key].concat();
let secret_hash = Sha256::digest(secret);
let builder = Builder::new(NOISE_HS_PATTERN.parse().unwrap()); //This cannot fail, hardcoded pattern must be correct
let mut handshake = builder
.local_private_key(local_private_key)
.psk(3, &secret_hash)
.build_responder()?;
//Actual Handshake
let mut buf = vec![0u8; MAXMSGLEN];
// <- e, es
handshake.read_message(&recv(&mut conn).await?, &mut buf)?;
// -> e, ee
let len = handshake.write_message(&[], &mut buf)?;
send(&mut conn, &buf[..len]).await?;
// <- s, se, psk
handshake.read_message(&recv(&mut conn).await?, &mut buf)?;
let noise = handshake.into_transport_mode()?;
Ok(NoiseStream::new(conn, noise))
}
pub async fn upgrade_noise_responder_with_topology(
conn: TcpStream,
topology: &NymTopology,
local_public_key: &[u8],
local_private_key: &[u8],
) -> Result<NoiseStream, NoiseError> {
//Get init material
let initiator_addr = match conn.peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("Unable to extract peer address from connection - {err}");
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) {
Some(pub_key) => pub_key.to_bytes(),
None => {
error!(
"Cannot find public key for node with address {:?}",
initiator_addr
);
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
upgrade_noise_responder(
conn,
local_public_key,
local_private_key,
Some(&remote_pub_key),
)
.await
}
/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload.
async fn recv(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
let mut msg_len_buf = [0u8; HEADER_SIZE];
stream.read_exact(&mut msg_len_buf).await?;
let msg_len = ((msg_len_buf[0] as usize) << 8) + (msg_len_buf[1] as usize);
let mut msg = vec![0u8; msg_len];
stream.read_exact(&mut msg[..]).await?;
Ok(msg)
}
/// Hyper-basic stream transport sender. 16-bit BE size followed by payload.
async fn send(stream: &mut TcpStream, buf: &[u8]) -> io::Result<()> {
let msg_len_buf = [(buf.len() >> 8) as u8, (buf.len() & 0xff) as u8];
stream.write_all(&msg_len_buf).await?;
stream.write_all(buf).await?;
Ok(())
}
+1 -1
View File
@@ -67,7 +67,7 @@ impl Node {
}
pub fn clients_address(&self) -> String {
format!("ws://{}:{}", self.host, self.clients_port)
format!("{}:{}", self.host, self.clients_port)
}
}
+20
View File
@@ -3,6 +3,7 @@
use crate::filter::VersionFilterable;
use log::warn;
use nym_crypto::asymmetric::encryption;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_sphinx_addressing::nodes::NodeIdentity;
@@ -107,6 +108,25 @@ impl NymTopology {
None
}
pub fn find_node_key_by_mix_host(
&self,
mix_host: SocketAddr,
) -> Option<&encryption::PublicKey> {
for node in self.gateways.iter() {
if node.mix_host.ip() == mix_host.ip() {
return Some(&node.sphinx_key);
}
}
for nodes in self.mixes.values() {
for node in nodes {
if node.mix_host.ip() == mix_host.ip() {
return Some(&node.sphinx_key);
}
}
}
None
}
pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> {
self.gateways
.iter()
+3
View File
@@ -61,6 +61,9 @@ nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client", features = [ "nyxd-client" ] }
nym-client-core = { path = "../common/client-core" }
nym-topology = { path = "../common/topology" }
nym-noise = { path = "../common/nymnoise" }
[build-dependencies]
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] }
+37
View File
@@ -33,6 +33,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100;
@@ -74,6 +77,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -89,6 +95,7 @@ impl Config {
storage_paths: GatewayPaths::new_default(id.as_ref()),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -302,3 +309,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+1
View File
@@ -69,6 +69,7 @@ impl From<ConfigV1_1_20> for Config {
},
logging: value.logging.into(),
debug: value.debug.into(),
topology: Default::default(),
}
}
}
@@ -6,8 +6,9 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
use crate::node::client_handling::websocket::connection_handler::FreshHandler;
use crate::node::storage::Storage;
use log::*;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_noise::upgrade_noise_responder;
use rand::rngs::OsRng;
use std::net::SocketAddr;
use std::process;
@@ -17,6 +18,7 @@ use tokio::task::JoinHandle;
pub(crate) struct Listener {
address: SocketAddr,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
only_coconut_credentials: bool,
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
}
@@ -25,12 +27,14 @@ impl Listener {
pub(crate) fn new(
address: SocketAddr,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
only_coconut_credentials: bool,
coconut_verifier: Arc<CoconutVerifier>,
) -> Self {
Listener {
address,
local_identity,
local_sphinx,
only_coconut_credentials,
coconut_verifier,
}
@@ -68,9 +72,23 @@ impl Listener {
trace!("received a socket connection from {remote_addr}");
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
// clients or spawned tokio tasks -> perhaps a worker system?
let noise_stream = match upgrade_noise_responder(
socket,
&self.local_sphinx.public_key().to_bytes(),
&self.local_sphinx.private_key().to_bytes(),
None, //connection from client, no remote pub key
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote_addr} - {err}");
return;
}
};
let handle = FreshHandler::new(
OsRng,
socket,
noise_stream,
self.only_coconut_credentials,
outbound_mix_sender.clone(),
Arc::clone(&self.local_identity),
@@ -8,8 +8,11 @@ use crate::node::storage::error::StorageError;
use crate::node::storage::Storage;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
@@ -17,6 +20,7 @@ use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
@@ -31,6 +35,8 @@ pub(crate) struct ConnectionHandler<St: Storage> {
active_clients_store: ActiveClientsStore,
storage: St,
ack_sender: MixForwardingSender,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
}
impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
@@ -49,6 +55,8 @@ impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
active_clients_store: self.active_clients_store.clone(),
storage: self.storage.clone(),
ack_sender: self.ack_sender.clone(),
topology_access: self.topology_access.clone(),
local_identity: self.local_identity.clone(),
}
}
}
@@ -59,6 +67,8 @@ impl<St: Storage> ConnectionHandler<St> {
storage: St,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
@@ -66,6 +76,8 @@ impl<St: Storage> ConnectionHandler<St> {
storage,
active_clients_store,
ack_sender,
topology_access,
local_identity,
}
}
@@ -182,7 +194,33 @@ impl<St: Storage> ConnectionHandler<St> {
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let topology_access_clonne = self.topology_access.clone();
let topology_permit = topology_access_clonne.get_read_permit().await;
let topology_ref = match topology_permit.try_get_raw_topology_ref() {
Ok(topology) => topology,
Err(err) => {
error!("Cannot connect to {remote}, due to topology error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
topology_ref,
&self.local_identity.public_key().to_bytes(),
&self.local_identity.private_key().to_bytes(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -212,9 +250,6 @@ impl<St: Storage> ConnectionHandler<St> {
}
}
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
info!("Closing connection from {:?}", remote);
}
}
+73 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use self::storage::PersistentStorage;
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::error::GatewayError;
use crate::node::client_handling::active_clients::ActiveClientsStore;
use crate::node::client_handling::websocket;
@@ -12,17 +12,23 @@ use crate::node::statistics::collector::GatewayStatisticsCollector;
use crate::node::storage::Storage;
use log::*;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_client_core::client::topology_control::TopologyRefresher;
use nym_client_core::client::topology_control::TopologyRefresherConfig;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_network_defaults::NymNetworkDetails;
use nym_statistics_common::collector::StatisticsSender;
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::Client;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use url::Url;
pub(crate) mod client_handling;
pub(crate) mod mixnet_handling;
@@ -124,6 +130,7 @@ impl<St> Gateway<St> {
&self,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
shutdown: TaskClient,
) where
St: Storage + Clone + 'static,
@@ -138,6 +145,8 @@ impl<St> Gateway<St> {
self.storage.clone(),
ack_sender,
active_clients_store,
topology_access,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
@@ -167,6 +176,7 @@ impl<St> Gateway<St> {
websocket::Listener::new(
listening_address,
Arc::clone(&self.identity_keypair),
Arc::clone(&self.sphinx_keypair),
self.config.gateway.only_coconut_credentials,
coconut_verifier,
)
@@ -178,7 +188,11 @@ impl<St> Gateway<St> {
);
}
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
fn start_packet_forwarder(
&self,
topology_access: TopologyAccessor,
shutdown: TaskClient,
) -> MixForwardingSender {
info!("Starting mix packet forwarder...");
let (mut packet_forwarder, packet_sender) = PacketForwarder::new(
@@ -187,6 +201,8 @@ impl<St> Gateway<St> {
self.config.debug.initial_connection_timeout,
self.config.debug.maximum_connection_buffer_size,
self.config.debug.use_legacy_framed_packet_version,
topology_access,
Arc::clone(&self.sphinx_keypair),
shutdown,
);
@@ -234,6 +250,47 @@ impl<St> Gateway<St> {
client
}
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
async fn check_if_bonded(&self) -> Result<bool, GatewayError> {
// TODO: if anything, this should be getting data directly from the contract
// as opposed to the validator API
@@ -268,12 +325,25 @@ impl<St> Gateway<St> {
CoconutVerifier::new(nyxd_client)
};
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
let shared_topology_access = TopologyAccessor::new();
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;
let mix_forwarding_channel =
self.start_packet_forwarder(shared_topology_access.clone(), shutdown.subscribe());
let active_clients_store = ActiveClientsStore::new();
self.start_mix_socket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shared_topology_access,
shutdown.subscribe(),
);
+2
View File
@@ -56,9 +56,11 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-topology = { path = "../common/topology" }
nym-client-core = { path = "../common/client-core/" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
cpu-cycles = { path = "../cpu-cycles", optional = true }
nym-noise = { path = "../common/nymnoise" }
[dev-dependencies]
tokio = { version = "1.21.2", features = [
+37
View File
@@ -44,6 +44,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
/// Derive default path to mixnodes's config directory.
/// It should get resolved to `$HOME/.nym/mixnodes/<id>/config`
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
@@ -85,6 +88,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -101,6 +107,7 @@ impl Config {
verloc: Default::default(),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -292,3 +299,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+1
View File
@@ -94,6 +94,7 @@ impl From<ConfigV1_1_21> for Config {
verloc: value.verloc.into(),
logging: value.logging.into(),
debug: value.debug.into(),
topology: Default::default(),
}
}
}
@@ -7,12 +7,16 @@ use crate::node::listener::connection_handler::packet_processing::{
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
use crate::node::TaskClient;
use futures::StreamExt;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_mixnode_common::measure;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::Delay as SphinxDelay;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
@@ -25,16 +29,22 @@ pub(crate) mod packet_processing;
pub(crate) struct ConnectionHandler {
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
}
impl ConnectionHandler {
pub(crate) fn new(
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
delay_forwarding_channel,
topology_access,
local_identity,
}
}
@@ -85,8 +95,34 @@ impl ConnectionHandler {
mut shutdown: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let topology_permit = self.topology_access.get_read_permit().await;
let topology_ref = match topology_permit.try_get_raw_topology_ref() {
Ok(topology) => topology,
Err(err) => {
error!("Cannot connect to {remote}, due to topology error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
topology_ref,
&self.local_identity.public_key().to_bytes(),
&self.local_identity.private_key().to_bytes(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -118,10 +154,7 @@ impl ConnectionHandler {
}
}
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
info!("Closing connection from {:?}", remote);
log::trace!("ConnectionHandler: Exiting");
}
}
+79 -5
View File
@@ -1,7 +1,7 @@
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::node::http::{
description::description,
hardware::hardware,
@@ -17,14 +17,20 @@ use crate::node::node_statistics::SharedNodeStats;
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
use nym_bin_common::output_format::OutputFormat;
use nym_bin_common::version_checker::parse_version;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_client_core::client::topology_control::TopologyRefresher;
use nym_client_core::client::topology_control::TopologyRefresherConfig;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use url::Url;
#[cfg(feature = "cpucycles")]
use tracing::{error, info, warn};
@@ -146,6 +152,7 @@ impl MixNode {
&self,
node_stats_update_sender: node_statistics::UpdateSender,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
shutdown: TaskClient,
) {
info!("Starting socket listener...");
@@ -153,7 +160,12 @@ impl MixNode {
let packet_processor =
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
let connection_handler = ConnectionHandler::new(
packet_processor,
delay_forwarding_channel,
topology_access,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
self.config.mixnode.listening_address,
@@ -166,6 +178,7 @@ impl MixNode {
fn start_packet_delay_forwarder(
&mut self,
node_stats_update_sender: node_statistics::UpdateSender,
topology_access: TopologyAccessor,
shutdown: TaskClient,
) -> PacketDelayForwardSender {
info!("Starting packet delay-forwarder...");
@@ -179,7 +192,11 @@ impl MixNode {
);
let mut packet_forwarder = DelayForwarder::new(
nym_mixnet_client::Client::new(client_config),
nym_mixnet_client::Client::new(
client_config,
topology_access,
Arc::clone(&self.sphinx_keypair),
),
node_stats_update_sender,
shutdown,
);
@@ -230,6 +247,47 @@ impl MixNode {
atomic_verloc_results
}
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
let endpoints = self.config.get_nym_api_endpoints();
let nym_api = endpoints
@@ -276,11 +334,27 @@ impl MixNode {
let (node_stats_pointer, node_stats_update_sender) =
self.start_node_stats_controller(shutdown.subscribe());
let delay_forwarding_channel = self
.start_packet_delay_forwarder(node_stats_update_sender.clone(), shutdown.subscribe());
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
let shared_topology_access = TopologyAccessor::new();
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;
let delay_forwarding_channel = self.start_packet_delay_forwarder(
node_stats_update_sender.clone(),
shared_topology_access.clone(),
shutdown.subscribe(),
);
self.start_socket_listener(
node_stats_update_sender,
delay_forwarding_channel,
shared_topology_access,
shutdown.subscribe(),
);
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe());
+3
View File
@@ -105,6 +105,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
Arc::clone(&encryption_keypair),
self.config.debug.gateway_sending_rate,
bandwidth_controller,
self.config.debug.disabled_credentials_mode,
@@ -177,6 +178,7 @@ fn new_packet_sender(
config: &config::NetworkMonitor,
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
@@ -184,6 +186,7 @@ fn new_packet_sender(
PacketSender::new(
gateways_status_updater,
local_identity,
local_sphinx,
config.debug.gateway_response_timeout,
config.debug.gateway_connection_timeout,
config.debug.max_concurrent_gateway_clients,
@@ -310,6 +310,7 @@ impl PacketPreparer {
GatewayPackets::new(
route.gateway_clients_address(),
route.gateway_identity(),
route.gateway_sphinx(),
mix_packets,
)
}
@@ -387,6 +388,7 @@ impl PacketPreparer {
let route_ext = test_route.test_message_ext(test_nonce);
let gateway_address = test_route.gateway_clients_address();
let gateway_identity = test_route.gateway_identity();
let gateway_sphinx = test_route.gateway_sphinx();
let mut mix_tester = self.ephemeral_mix_tester(test_route);
@@ -408,7 +410,9 @@ impl PacketPreparer {
let gateway_packets = all_gateway_packets
.entry(gateway_identity.to_bytes())
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
.or_insert_with(|| {
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
});
gateway_packets.push_packets(mix_packets);
// and generate test packets for gateways (note the variable recipient)
@@ -436,7 +440,9 @@ impl PacketPreparer {
// or create a new one
let gateway_packets = all_gateway_packets
.entry(gateway_identity.to_bytes())
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
.or_insert_with(|| {
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
});
gateway_packets.push_packets(gateway_mix_packets);
}
}
+20 -2
View File
@@ -15,6 +15,7 @@ use log::{debug, info, trace, warn};
use nym_bandwidth_controller::BandwidthController;
use nym_config::defaults::REMAINING_BANDWIDTH_THRESHOLD;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_crypto::asymmetric::encryption;
use nym_crypto::asymmetric::identity::{self, PUBLIC_KEY_LENGTH};
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::{AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver};
@@ -38,6 +39,8 @@ pub(crate) struct GatewayPackets {
/// Public key of the target gateway.
pub(crate) pub_key: identity::PublicKey,
pub(crate) encryption_key: encryption::PublicKey,
/// All the packets that are going to get sent to the gateway.
pub(crate) packets: Vec<MixPacket>,
}
@@ -46,19 +49,26 @@ impl GatewayPackets {
pub(crate) fn new(
clients_address: String,
pub_key: identity::PublicKey,
encryption_key: encryption::PublicKey,
packets: Vec<MixPacket>,
) -> Self {
GatewayPackets {
clients_address,
pub_key,
encryption_key,
packets,
}
}
pub(crate) fn empty(clients_address: String, pub_key: identity::PublicKey) -> Self {
pub(crate) fn empty(
clients_address: String,
pub_key: identity::PublicKey,
encryption_key: encryption::PublicKey,
) -> Self {
GatewayPackets {
clients_address,
pub_key,
encryption_key,
packets: Vec::new(),
}
}
@@ -79,6 +89,7 @@ impl GatewayPackets {
struct FreshGatewayClientData {
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_response_timeout: Duration,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
@@ -134,6 +145,7 @@ impl PacketSender {
pub(crate) fn new(
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_response_timeout: Duration,
gateway_connection_timeout: Duration,
max_concurrent_clients: usize,
@@ -146,6 +158,7 @@ impl PacketSender {
fresh_gateway_client_data: Arc::new(FreshGatewayClientData {
gateways_status_updater,
local_identity,
local_sphinx,
gateway_response_timeout,
bandwidth_controller,
disabled_credentials_mode,
@@ -171,6 +184,7 @@ impl PacketSender {
fn new_gateway_client_handle(
address: String,
identity: identity::PublicKey,
sphinx: encryption::PublicKey,
fresh_gateway_client_data: &FreshGatewayClientData,
) -> (
GatewayClientHandle,
@@ -187,7 +201,9 @@ impl PacketSender {
let mut gateway_client = GatewayClient::new(
address,
Arc::clone(&fresh_gateway_client_data.local_identity),
Arc::clone(&fresh_gateway_client_data.local_sphinx),
identity,
sphinx,
None,
message_sender,
ack_sender,
@@ -267,6 +283,7 @@ impl PacketSender {
async fn create_new_gateway_client_handle_and_authenticate(
address: String,
identity: identity::PublicKey,
sphinx: encryption::PublicKey,
fresh_gateway_client_data: &FreshGatewayClientData,
gateway_connection_timeout: Duration,
) -> Option<(
@@ -274,7 +291,7 @@ impl PacketSender {
(MixnetMessageReceiver, AcknowledgementReceiver),
)> {
let (new_client, (message_receiver, ack_receiver)) =
Self::new_gateway_client_handle(address, identity, fresh_gateway_client_data);
Self::new_gateway_client_handle(address, identity, sphinx, fresh_gateway_client_data);
// Put this in timeout in case the gateway has incorrectly set their ulimit and our connection
// gets stuck in their TCP queue and just hangs on our end but does not terminate
@@ -351,6 +368,7 @@ impl PacketSender {
Self::create_new_gateway_client_handle_and_authenticate(
packets.clients_address,
packets.pub_key,
packets.encryption_key,
&fresh_gateway_client_data,
gateway_connection_timeout,
)
@@ -3,7 +3,7 @@
use crate::network_monitor::test_packet::NymApiTestMessageExt;
use crate::network_monitor::ROUTE_TESTING_TEST_NONCE;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_topology::{gateway, mix, NymTopology};
use std::fmt::{Debug, Formatter};
@@ -63,6 +63,10 @@ impl TestRoute {
self.gateway().identity_key
}
pub(crate) fn gateway_sphinx(&self) -> encryption::PublicKey {
self.gateway().sphinx_key
}
pub(crate) fn topology(&self) -> &NymTopology {
&self.nodes
}