From 79d12716266d7265b17f9da7e9a42b9ffbb035ba Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 3 Jan 2024 23:25:34 +0000 Subject: [PATCH] Added client updates for addition and removal of clients --- server/src/client_management/client/actor.rs | 44 ++++++++++--------- .../src/client_management/client_manager.rs | 19 +++++--- server/src/network/connection/actor.rs | 22 ++++++---- server/src/network/connection/messages.rs | 5 +-- .../src/network/connection_initiator/actor.rs | 5 +-- server/src/network/listener/mod.rs | 3 ++ server/src/network/network_manager/actor.rs | 10 ++++- 7 files changed, 66 insertions(+), 42 deletions(-) diff --git a/server/src/client_management/client/actor.rs b/server/src/client_management/client/actor.rs index c553c7a..8c5f118 100644 --- a/server/src/client_management/client/actor.rs +++ b/server/src/client_management/client/actor.rs @@ -83,7 +83,7 @@ impl Client { } } - pub(crate) fn error(&self, msg: String) { + pub(crate) fn _error(&self, msg: String) { println!("[Client] sending error: {}", msg); use serde_json::to_string; use ConnectionMessage::SendData; @@ -219,29 +219,33 @@ impl Handler for Client { msg: ConnectionObservableOutput, ctx: &mut Self::Context, ) -> Self::Result { + use foundation::messages::client::ClientStreamIn::{ + Disconnect, + GetClients, + GetMessages, + SendGlobalMessage, + SendMessage, + }; + use serde_json::from_str; + use crate::network::ConnectionObservableOutput::RecvData; - if let RecvData(_sender, _addr, data) = msg { - use foundation::messages::client::ClientStreamIn::{ - Disconnect, - GetClients, - GetMessages, - SendGlobalMessage, - SendMessage, - }; - use serde_json::from_str; - if let Ok(msg) = from_str::(data.as_str()) { - match msg { - GetClients => self.get_clients(ctx), - GetMessages => self.get_messages(ctx), - SendMessage { to, content } => self.send_message(ctx, to, content), - SendGlobalMessage { content } => { - self.send_gloal_message(ctx, content) + + match msg { + RecvData(_sender, data) => { + if let Ok(msg) = from_str::(data.as_str()) { + match msg { + GetClients => self.get_clients(ctx), + GetMessages => self.get_messages(ctx), + SendMessage { to, content } => self.send_message(ctx, to, content), + SendGlobalMessage { content } => { + self.send_gloal_message(ctx, content) + } + Disconnect => self.disconnect(ctx), } - Disconnect => self.disconnect(ctx), } - } else { - self.error(format!("Failed to parse Message: {}", data)); } + ConnectionObservableOutput::ConnectionClosed(_) => self + .broadcast(ClientObservableMessage::Disconnecting(self.details.uuid)), } } } diff --git a/server/src/client_management/client_manager.rs b/server/src/client_management/client_manager.rs index d3c25b4..2148a4e 100644 --- a/server/src/client_management/client_manager.rs +++ b/server/src/client_management/client_manager.rs @@ -58,7 +58,7 @@ impl ClientManager { } pub(crate) fn send_client_list( - &mut self, + &self, ctx: &mut Context, sender: WeakAddr, ) { @@ -66,7 +66,7 @@ impl ClientManager { use crate::client_management::client::ClientMessage::ClientList; if let Some(to_send) = sender.upgrade() { let client_addr: Vec> = - self.clients.iter().map(|(_, v)| v).cloned().collect(); + self.clients.values().cloned().collect(); let collection = tokio_stream::iter(client_addr) .then(|addr| addr.send(ClientDataMessage::Details)) @@ -115,7 +115,7 @@ impl ClientManager { ) { println!("[ClientManager] sending message to client"); let client_addr: Vec> = - self.clients.iter().map(|(_, v)| v).cloned().collect(); + self.clients.values().cloned().collect(); let collection = tokio_stream::iter(client_addr.clone()) .then(|addr| addr.send(ClientDataMessage::Details)) @@ -164,7 +164,7 @@ impl ClientManager { use crate::client_management::client::ClientMessage::GloballySentMessage; let client_addr: Vec> = - self.clients.iter().map(|(_, v)| v).cloned().collect(); + self.clients.values().cloned().collect(); if let Some(sender) = sender.upgrade() { let cm = self.chat_manager.clone(); @@ -227,16 +227,24 @@ impl ClientManager { println!("[ClientManager] sending subscribe message to client"); addr.do_send(Subscribe(recp.downgrade())); self.clients.insert(uuid, addr); + for (_k, v) in self.clients.clone() { + self.send_client_list(ctx, v.downgrade()) + } } fn remove_client(&mut self, ctx: &mut Context, uuid: Uuid) { println!("[ClientManager] removing client"); use crate::prelude::messages::ObservableMessage::Unsubscribe; let recp = ctx.address().recipient::(); - if let Some(addr) = self.clients.remove(&uuid) { + let addr = self.clients.remove(&uuid); + if let Some(addr) = addr { println!("[ClientManager] sending unsubscribe message to client"); addr.do_send(Unsubscribe(recp.downgrade())); } + println!("[ClientManager] sending client list to other clients"); + for (_k, v) in self.clients.iter() { + self.send_client_list(ctx, v.downgrade()) + } } fn disconnect_client( @@ -249,6 +257,7 @@ impl ClientManager { let recp = ctx.address().recipient::(); if let Some(addr) = self.clients.remove(&uuid) { addr.do_send(Unsubscribe(recp.downgrade())); + self.remove_client(ctx, uuid); } } } diff --git a/server/src/network/connection/actor.rs b/server/src/network/connection/actor.rs index 98dbeb1..a5b7819 100644 --- a/server/src/network/connection/actor.rs +++ b/server/src/network/connection/actor.rs @@ -5,15 +5,13 @@ use actix::{ fut::wrap_future, Actor, ActorContext, - ActorFutureExt, Addr, AsyncContext, Context, Handler, - SpawnHandle, WeakRecipient, }; -use futures::{future::join_all, stream::Buffered, Future, FutureExt}; +use futures::{future::join_all, Future, FutureExt}; use tokio::{ io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, net::TcpStream, @@ -37,7 +35,7 @@ use crate::{ /// - loop_future: the future holding the receiving loop. pub struct Connection { write_half: Arc>>, - address: SocketAddr, + _address: SocketAddr, observers: Vec>, } @@ -49,7 +47,7 @@ impl Connection { let (read_half, write_half) = split(stream); let addr = Connection { write_half: Arc::new(Mutex::new(write_half)), - address, + _address: address, observers: Vec::new(), } .start(); @@ -88,7 +86,6 @@ impl Connection { ctx: &mut ::Context, mut buf_reader: BufReader>, ) { - let address = self.address; let weak_addr = ctx.address().downgrade(); let read_fut = async move { @@ -97,7 +94,6 @@ impl Connection { let read_fut = buf_reader.read_line(&mut buffer_string); let Ok(Ok(len)) = timeout(dur, read_fut).await else { - println!("[Connection] timeout reached"); if let Some(addr) = weak_addr.upgrade() { addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); } @@ -106,6 +102,9 @@ impl Connection { if len == 0 { println!("[Connection] readline returned 0"); + if let Some(addr) = weak_addr.upgrade() { + addr.do_send(ConnectionPrivateMessage::Close); + } return; } @@ -114,7 +113,6 @@ impl Connection { .send(ConnectionPrivateMessage::Broadcast( ConnectionObservableOutput::RecvData( addr.downgrade(), - address, buffer_string.clone(), ), )) @@ -127,6 +125,11 @@ impl Connection { }; ctx.spawn(wrap_future(read_fut)); } + + fn close_connection(&self, ctx: &mut ::Context) { + use ConnectionObservableOutput::ConnectionClosed; + self.broadcast(ctx, ConnectionClosed(ctx.address().downgrade())) + } } impl Actor for Connection { @@ -135,7 +138,7 @@ impl Actor for Connection { /// runs when the actor is started. /// takes out eh read_half ad turns it into a buffered reader /// then eneters loop readling lines from the tcp stream - fn started(&mut self, ctx: &mut Self::Context) { + fn started(&mut self, _ctx: &mut Self::Context) { println!("[Connection] started"); } @@ -251,6 +254,7 @@ impl Handler for Connection { ConnectionPrivateMessage::DoRead(buf_reader) => { self.do_read(ctx, buf_reader) } + ConnectionPrivateMessage::Close => self.close_connection(ctx), }; } } diff --git a/server/src/network/connection/messages.rs b/server/src/network/connection/messages.rs index c8005ce..4876323 100644 --- a/server/src/network/connection/messages.rs +++ b/server/src/network/connection/messages.rs @@ -1,5 +1,3 @@ -use std::net::SocketAddr; - use actix::{Message, WeakAddr}; use tokio::{ io::{BufReader, ReadHalf}, @@ -19,7 +17,7 @@ pub(crate) enum ConnectionMessage { #[derive(Message, Clone)] #[rtype(result = "()")] pub(crate) enum ConnectionObservableOutput { - RecvData(WeakAddr, SocketAddr, String), + RecvData(WeakAddr, String), ConnectionClosed(WeakAddr), } @@ -28,4 +26,5 @@ pub(crate) enum ConnectionObservableOutput { pub(super) enum ConnectionPrivateMessage { Broadcast(ConnectionObservableOutput), DoRead(BufReader>), + Close, } diff --git a/server/src/network/connection_initiator/actor.rs b/server/src/network/connection_initiator/actor.rs index cb7e10c..b950792 100644 --- a/server/src/network/connection_initiator/actor.rs +++ b/server/src/network/connection_initiator/actor.rs @@ -58,7 +58,6 @@ impl ConnectionInitiator { &mut self, sender: WeakAddr, ctx: &mut ::Context, - _address: SocketAddr, data: String, ) { use InitiatorOutput::{ClientRequest, InfoRequest}; @@ -158,8 +157,8 @@ impl Handler for ConnectionInitiator { ) -> Self::Result { use ConnectionObservableOutput::RecvData; - if let RecvData(sender, addr, data) = msg { - self.handle_request(sender, ctx, addr, data) + if let RecvData(sender, data) = msg { + self.handle_request(sender, ctx, data) } } } diff --git a/server/src/network/listener/mod.rs b/server/src/network/listener/mod.rs index 7d254cf..fec10ae 100644 --- a/server/src/network/listener/mod.rs +++ b/server/src/network/listener/mod.rs @@ -57,7 +57,9 @@ impl NetworkListener { let delegate = self.delegate.clone(); ctx.spawn(wrap_future(async move { use ListenerOutput::NewConnection; + let listener = TcpListener::bind(addr).await.unwrap(); + while let Ok((stream, addr)) = listener.accept().await { println!("[NetworkListener] accepted socket"); let conn = Connection::new(stream, addr); @@ -66,6 +68,7 @@ impl NetworkListener { break; }; + println!("[NetworkListener] sending connection to delegate"); delegate.do_send(NewConnection(conn)) } })); diff --git a/server/src/network/network_manager/actor.rs b/server/src/network/network_manager/actor.rs index 1ca6621..567ea3c 100644 --- a/server/src/network/network_manager/actor.rs +++ b/server/src/network/network_manager/actor.rs @@ -170,9 +170,12 @@ impl Actor for NetworkManager { }, ); ctx.spawn(fut); - println!("[NetworkManager] Finished Starting"); } } + + fn stopped(&mut self, ctx: &mut Self::Context) { + println!("[NetworkManager] network manager stopped"); + } } impl Handler for NetworkManager { @@ -215,7 +218,10 @@ impl Handler for NetworkManager { ) -> Self::Result { use ListenerOutput::NewConnection; match msg { - NewConnection(connection) => self.new_connection(ctx, connection), + NewConnection(connection) => { + println!("new connection"); + self.new_connection(ctx, connection) + } }; } }