Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 14a3186c8a | |||
| e924a4e869 | |||
| dae7568b6f |
@@ -93,23 +93,20 @@ impl Client {
|
|||||||
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
||||||
Ok(stream_res) => match stream_res {
|
Ok(stream_res) => match stream_res {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
debug!("Managed to establish connection to {}", address);
|
debug!("Managed to establish connection to {address}");
|
||||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||||
current_reconnection.store(0, Ordering::Release);
|
current_reconnection.store(0, Ordering::Release);
|
||||||
Framed::new(stream, SphinxCodec)
|
Framed::new(stream, SphinxCodec)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!(
|
debug!("failed to establish connection to {address} (err: {err})");
|
||||||
"failed to establish connection to {} (err: {})",
|
|
||||||
address, err
|
|
||||||
);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_err) => {
|
||||||
debug!(
|
debug!(
|
||||||
"failed to connect to {} within {:?}",
|
"failed to connect to {address} within {:?}",
|
||||||
address, connection_timeout
|
connection_timeout
|
||||||
);
|
);
|
||||||
|
|
||||||
// we failed to connect - increase reconnection attempt
|
// we failed to connect - increase reconnection attempt
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ impl ActiveClientsStore {
|
|||||||
///
|
///
|
||||||
/// * `client`: address of the client for which to remove the handle.
|
/// * `client`: address of the client for which to remove the handle.
|
||||||
pub(crate) fn disconnect(&self, client: DestinationAddressBytes) {
|
pub(crate) fn disconnect(&self, client: DestinationAddressBytes) {
|
||||||
|
log::debug!("ActiveClientsStore:: explicit disconnect: {}", client);
|
||||||
self.0.remove(&client);
|
self.0.remove(&client);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,6 +54,7 @@ impl ActiveClientsStore {
|
|||||||
/// * `client`: address of the client for which to insert the handle.
|
/// * `client`: address of the client for which to insert the handle.
|
||||||
/// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket
|
/// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket
|
||||||
pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) {
|
pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) {
|
||||||
|
log::debug!("ActiveClientsStore::insert: {}", client);
|
||||||
self.0.insert(client, handle);
|
self.0.insert(client, handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -99,6 +99,7 @@ pub(crate) struct AuthenticatedHandler<R, S, St> {
|
|||||||
// explicitly remove handle from the global store upon being dropped
|
// explicitly remove handle from the global store upon being dropped
|
||||||
impl<R, S, St> Drop for AuthenticatedHandler<R, S, St> {
|
impl<R, S, St> Drop for AuthenticatedHandler<R, S, St> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
log::debug!("AuthenticatedHandler::drop");
|
||||||
self.inner
|
self.inner
|
||||||
.active_clients_store
|
.active_clients_store
|
||||||
.disconnect(self.client.address)
|
.disconnect(self.client.address)
|
||||||
@@ -134,6 +135,7 @@ where
|
|||||||
|
|
||||||
/// Explicitly removes handle from the global store.
|
/// Explicitly removes handle from the global store.
|
||||||
fn disconnect(self) {
|
fn disconnect(self) {
|
||||||
|
log::debug!("AuthenticatedHandler::disconnect");
|
||||||
self.inner
|
self.inner
|
||||||
.active_clients_store
|
.active_clients_store
|
||||||
.disconnect(self.client.address)
|
.disconnect(self.client.address)
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ pub(crate) async fn handle_connection<R, S, St>(
|
|||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Managed to perform websocket handshake!");
|
log::debug!("Managed to perform websocket handshake!");
|
||||||
|
|
||||||
match shutdown
|
match shutdown
|
||||||
.run_future(handle.perform_initial_authentication())
|
.run_future(handle.perform_initial_authentication())
|
||||||
@@ -112,5 +112,5 @@ pub(crate) async fn handle_connection<R, S, St>(
|
|||||||
Some(Some(auth_handle)) => auth_handle.listen_for_requests(shutdown).await,
|
Some(Some(auth_handle)) => auth_handle.listen_for_requests(shutdown).await,
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("The handler is done!");
|
log::debug!("The handler is done!");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ impl Listener {
|
|||||||
connection = tcp_listener.accept() => {
|
connection = tcp_listener.accept() => {
|
||||||
match connection {
|
match connection {
|
||||||
Ok((socket, remote_addr)) => {
|
Ok((socket, remote_addr)) => {
|
||||||
trace!("received a socket connection from {remote_addr}");
|
debug!("received a socket connection from {remote_addr}");
|
||||||
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
|
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
|
||||||
// clients or spawned tokio tasks -> perhaps a worker system?
|
// clients or spawned tokio tasks -> perhaps a worker system?
|
||||||
let handle = FreshHandler::new(
|
let handle = FreshHandler::new(
|
||||||
|
|||||||
@@ -333,6 +333,8 @@ where
|
|||||||
.map_err(|source| GatewayError::CoconutVerifierCreationFailure { source })?
|
.map_err(|source| GatewayError::CoconutVerifierCreationFailure { source })?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// The packet forwarder just forwards packets onto the mixnet. When unable to send it will
|
||||||
|
// just drop the packet and assume that the originator will try to retransmit
|
||||||
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
|
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
|
||||||
|
|
||||||
let active_clients_store = ActiveClientsStore::new();
|
let active_clients_store = ActiveClientsStore::new();
|
||||||
|
|||||||
Reference in New Issue
Block a user