Compare commits

...

3 Commits

Author SHA1 Message Date
Jon Häggblad 14a3186c8a wip 2023-01-16 13:40:14 +01:00
Jon Häggblad e924a4e869 wip 2023-01-16 10:40:11 +01:00
Jon Häggblad dae7568b6f wip 2023-01-16 10:40:11 +01:00
6 changed files with 14 additions and 11 deletions
@@ -93,23 +93,20 @@ impl Client {
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await { let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res { Ok(stream_res) => match stream_res {
Ok(stream) => { Ok(stream) => {
debug!("Managed to establish connection to {}", address); debug!("Managed to establish connection to {address}");
// if we managed to connect, reset the reconnection count (whatever it might have been) // if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release); current_reconnection.store(0, Ordering::Release);
Framed::new(stream, SphinxCodec) Framed::new(stream, SphinxCodec)
} }
Err(err) => { Err(err) => {
debug!( debug!("failed to establish connection to {address} (err: {err})");
"failed to establish connection to {} (err: {})",
address, err
);
return; return;
} }
}, },
Err(_) => { Err(_err) => {
debug!( debug!(
"failed to connect to {} within {:?}", "failed to connect to {address} within {:?}",
address, connection_timeout connection_timeout
); );
// we failed to connect - increase reconnection attempt // we failed to connect - increase reconnection attempt
@@ -43,6 +43,7 @@ impl ActiveClientsStore {
/// ///
/// * `client`: address of the client for which to remove the handle. /// * `client`: address of the client for which to remove the handle.
pub(crate) fn disconnect(&self, client: DestinationAddressBytes) { pub(crate) fn disconnect(&self, client: DestinationAddressBytes) {
log::debug!("ActiveClientsStore:: explicit disconnect: {}", client);
self.0.remove(&client); self.0.remove(&client);
} }
@@ -53,6 +54,7 @@ impl ActiveClientsStore {
/// * `client`: address of the client for which to insert the handle. /// * `client`: address of the client for which to insert the handle.
/// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket /// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket
pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) { pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) {
log::debug!("ActiveClientsStore::insert: {}", client);
self.0.insert(client, handle); self.0.insert(client, handle);
} }
@@ -99,6 +99,7 @@ pub(crate) struct AuthenticatedHandler<R, S, St> {
// explicitly remove handle from the global store upon being dropped // explicitly remove handle from the global store upon being dropped
impl<R, S, St> Drop for AuthenticatedHandler<R, S, St> { impl<R, S, St> Drop for AuthenticatedHandler<R, S, St> {
fn drop(&mut self) { fn drop(&mut self) {
log::debug!("AuthenticatedHandler::drop");
self.inner self.inner
.active_clients_store .active_clients_store
.disconnect(self.client.address) .disconnect(self.client.address)
@@ -134,6 +135,7 @@ where
/// Explicitly removes handle from the global store. /// Explicitly removes handle from the global store.
fn disconnect(self) { fn disconnect(self) {
log::debug!("AuthenticatedHandler::disconnect");
self.inner self.inner
.active_clients_store .active_clients_store
.disconnect(self.client.address) .disconnect(self.client.address)
@@ -95,7 +95,7 @@ pub(crate) async fn handle_connection<R, S, St>(
_ => (), _ => (),
} }
trace!("Managed to perform websocket handshake!"); log::debug!("Managed to perform websocket handshake!");
match shutdown match shutdown
.run_future(handle.perform_initial_authentication()) .run_future(handle.perform_initial_authentication())
@@ -112,5 +112,5 @@ pub(crate) async fn handle_connection<R, S, St>(
Some(Some(auth_handle)) => auth_handle.listen_for_requests(shutdown).await, Some(Some(auth_handle)) => auth_handle.listen_for_requests(shutdown).await,
} }
trace!("The handler is done!"); log::debug!("The handler is done!");
} }
@@ -70,7 +70,7 @@ impl Listener {
connection = tcp_listener.accept() => { connection = tcp_listener.accept() => {
match connection { match connection {
Ok((socket, remote_addr)) => { Ok((socket, remote_addr)) => {
trace!("received a socket connection from {remote_addr}"); debug!("received a socket connection from {remote_addr}");
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected // TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
// clients or spawned tokio tasks -> perhaps a worker system? // clients or spawned tokio tasks -> perhaps a worker system?
let handle = FreshHandler::new( let handle = FreshHandler::new(
+2
View File
@@ -333,6 +333,8 @@ where
.map_err(|source| GatewayError::CoconutVerifierCreationFailure { source })? .map_err(|source| GatewayError::CoconutVerifierCreationFailure { source })?
}; };
// The packet forwarder just forwards packets onto the mixnet. When unable to send it will
// just drop the packet and assume that the originator will try to retransmit
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe()); let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
let active_clients_store = ActiveClientsStore::new(); let active_clients_store = ActiveClientsStore::new();