Added client updates for addition and removal of clients

This commit is contained in:
michael-bailey 2024-01-03 23:25:34 +00:00
parent 3dfa71a7b1
commit 79d1271626
7 changed files with 66 additions and 42 deletions

View File

@ -83,7 +83,7 @@ impl Client {
} }
} }
pub(crate) fn error(&self, msg: String) { pub(crate) fn _error(&self, msg: String) {
println!("[Client] sending error: {}", msg); println!("[Client] sending error: {}", msg);
use serde_json::to_string; use serde_json::to_string;
use ConnectionMessage::SendData; use ConnectionMessage::SendData;
@ -219,29 +219,33 @@ impl Handler<ConnectionObservableOutput> for Client {
msg: ConnectionObservableOutput, msg: ConnectionObservableOutput,
ctx: &mut Self::Context, ctx: &mut Self::Context,
) -> Self::Result { ) -> Self::Result {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
GetMessages,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
use crate::network::ConnectionObservableOutput::RecvData; use crate::network::ConnectionObservableOutput::RecvData;
if let RecvData(_sender, _addr, data) = msg {
use foundation::messages::client::ClientStreamIn::{ match msg {
Disconnect, RecvData(_sender, data) => {
GetClients, if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
GetMessages, match msg {
SendGlobalMessage, GetClients => self.get_clients(ctx),
SendMessage, GetMessages => self.get_messages(ctx),
}; SendMessage { to, content } => self.send_message(ctx, to, content),
use serde_json::from_str; SendGlobalMessage { content } => {
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) { self.send_gloal_message(ctx, content)
match msg { }
GetClients => self.get_clients(ctx), Disconnect => self.disconnect(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => {
self.send_gloal_message(ctx, content)
} }
Disconnect => self.disconnect(ctx),
} }
} else {
self.error(format!("Failed to parse Message: {}", data));
} }
ConnectionObservableOutput::ConnectionClosed(_) => self
.broadcast(ClientObservableMessage::Disconnecting(self.details.uuid)),
} }
} }
} }

View File

@ -58,7 +58,7 @@ impl ClientManager {
} }
pub(crate) fn send_client_list( pub(crate) fn send_client_list(
&mut self, &self,
ctx: &mut Context<Self>, ctx: &mut Context<Self>,
sender: WeakAddr<Client>, sender: WeakAddr<Client>,
) { ) {
@ -66,7 +66,7 @@ impl ClientManager {
use crate::client_management::client::ClientMessage::ClientList; use crate::client_management::client::ClientMessage::ClientList;
if let Some(to_send) = sender.upgrade() { if let Some(to_send) = sender.upgrade() {
let client_addr: Vec<Addr<Client>> = let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect(); self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr) let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage::Details)) .then(|addr| addr.send(ClientDataMessage::Details))
@ -115,7 +115,7 @@ impl ClientManager {
) { ) {
println!("[ClientManager] sending message to client"); println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> = let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect(); self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr.clone()) let collection = tokio_stream::iter(client_addr.clone())
.then(|addr| addr.send(ClientDataMessage::Details)) .then(|addr| addr.send(ClientDataMessage::Details))
@ -164,7 +164,7 @@ impl ClientManager {
use crate::client_management::client::ClientMessage::GloballySentMessage; use crate::client_management::client::ClientMessage::GloballySentMessage;
let client_addr: Vec<Addr<Client>> = let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect(); self.clients.values().cloned().collect();
if let Some(sender) = sender.upgrade() { if let Some(sender) = sender.upgrade() {
let cm = self.chat_manager.clone(); let cm = self.chat_manager.clone();
@ -227,16 +227,24 @@ impl ClientManager {
println!("[ClientManager] sending subscribe message to client"); println!("[ClientManager] sending subscribe message to client");
addr.do_send(Subscribe(recp.downgrade())); addr.do_send(Subscribe(recp.downgrade()));
self.clients.insert(uuid, addr); self.clients.insert(uuid, addr);
for (_k, v) in self.clients.clone() {
self.send_client_list(ctx, v.downgrade())
}
} }
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) { fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] removing client"); println!("[ClientManager] removing client");
use crate::prelude::messages::ObservableMessage::Unsubscribe; use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>(); let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) { let addr = self.clients.remove(&uuid);
if let Some(addr) = addr {
println!("[ClientManager] sending unsubscribe message to client"); println!("[ClientManager] sending unsubscribe message to client");
addr.do_send(Unsubscribe(recp.downgrade())); addr.do_send(Unsubscribe(recp.downgrade()));
} }
println!("[ClientManager] sending client list to other clients");
for (_k, v) in self.clients.iter() {
self.send_client_list(ctx, v.downgrade())
}
} }
fn disconnect_client( fn disconnect_client(
@ -249,6 +257,7 @@ impl ClientManager {
let recp = ctx.address().recipient::<ClientObservableMessage>(); let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) { if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp.downgrade())); addr.do_send(Unsubscribe(recp.downgrade()));
self.remove_client(ctx, uuid);
} }
} }
} }

View File

@ -5,15 +5,13 @@ use actix::{
fut::wrap_future, fut::wrap_future,
Actor, Actor,
ActorContext, ActorContext,
ActorFutureExt,
Addr, Addr,
AsyncContext, AsyncContext,
Context, Context,
Handler, Handler,
SpawnHandle,
WeakRecipient, WeakRecipient,
}; };
use futures::{future::join_all, stream::Buffered, Future, FutureExt}; use futures::{future::join_all, Future, FutureExt};
use tokio::{ use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream, net::TcpStream,
@ -37,7 +35,7 @@ use crate::{
/// - loop_future: the future holding the receiving loop. /// - loop_future: the future holding the receiving loop.
pub struct Connection { pub struct Connection {
write_half: Arc<Mutex<WriteHalf<TcpStream>>>, write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr, _address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>, observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
} }
@ -49,7 +47,7 @@ impl Connection {
let (read_half, write_half) = split(stream); let (read_half, write_half) = split(stream);
let addr = Connection { let addr = Connection {
write_half: Arc::new(Mutex::new(write_half)), write_half: Arc::new(Mutex::new(write_half)),
address, _address: address,
observers: Vec::new(), observers: Vec::new(),
} }
.start(); .start();
@ -88,7 +86,6 @@ impl Connection {
ctx: &mut <Self as Actor>::Context, ctx: &mut <Self as Actor>::Context,
mut buf_reader: BufReader<ReadHalf<TcpStream>>, mut buf_reader: BufReader<ReadHalf<TcpStream>>,
) { ) {
let address = self.address;
let weak_addr = ctx.address().downgrade(); let weak_addr = ctx.address().downgrade();
let read_fut = async move { let read_fut = async move {
@ -97,7 +94,6 @@ impl Connection {
let read_fut = buf_reader.read_line(&mut buffer_string); let read_fut = buf_reader.read_line(&mut buffer_string);
let Ok(Ok(len)) = timeout(dur, read_fut).await else { let Ok(Ok(len)) = timeout(dur, read_fut).await else {
println!("[Connection] timeout reached");
if let Some(addr) = weak_addr.upgrade() { if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
} }
@ -106,6 +102,9 @@ impl Connection {
if len == 0 { if len == 0 {
println!("[Connection] readline returned 0"); println!("[Connection] readline returned 0");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::Close);
}
return; return;
} }
@ -114,7 +113,6 @@ impl Connection {
.send(ConnectionPrivateMessage::Broadcast( .send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData( ConnectionObservableOutput::RecvData(
addr.downgrade(), addr.downgrade(),
address,
buffer_string.clone(), buffer_string.clone(),
), ),
)) ))
@ -127,6 +125,11 @@ impl Connection {
}; };
ctx.spawn(wrap_future(read_fut)); ctx.spawn(wrap_future(read_fut));
} }
fn close_connection(&self, ctx: &mut <Self as Actor>::Context) {
use ConnectionObservableOutput::ConnectionClosed;
self.broadcast(ctx, ConnectionClosed(ctx.address().downgrade()))
}
} }
impl Actor for Connection { impl Actor for Connection {
@ -135,7 +138,7 @@ impl Actor for Connection {
/// runs when the actor is started. /// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader /// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream /// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, _ctx: &mut Self::Context) {
println!("[Connection] started"); println!("[Connection] started");
} }
@ -251,6 +254,7 @@ impl Handler<ConnectionPrivateMessage> for Connection {
ConnectionPrivateMessage::DoRead(buf_reader) => { ConnectionPrivateMessage::DoRead(buf_reader) => {
self.do_read(ctx, buf_reader) self.do_read(ctx, buf_reader)
} }
ConnectionPrivateMessage::Close => self.close_connection(ctx),
}; };
} }
} }

View File

@ -1,5 +1,3 @@
use std::net::SocketAddr;
use actix::{Message, WeakAddr}; use actix::{Message, WeakAddr};
use tokio::{ use tokio::{
io::{BufReader, ReadHalf}, io::{BufReader, ReadHalf},
@ -19,7 +17,7 @@ pub(crate) enum ConnectionMessage {
#[derive(Message, Clone)] #[derive(Message, Clone)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub(crate) enum ConnectionObservableOutput { pub(crate) enum ConnectionObservableOutput {
RecvData(WeakAddr<Connection>, SocketAddr, String), RecvData(WeakAddr<Connection>, String),
ConnectionClosed(WeakAddr<Connection>), ConnectionClosed(WeakAddr<Connection>),
} }
@ -28,4 +26,5 @@ pub(crate) enum ConnectionObservableOutput {
pub(super) enum ConnectionPrivateMessage { pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput), Broadcast(ConnectionObservableOutput),
DoRead(BufReader<ReadHalf<TcpStream>>), DoRead(BufReader<ReadHalf<TcpStream>>),
Close,
} }

View File

@ -58,7 +58,6 @@ impl ConnectionInitiator {
&mut self, &mut self,
sender: WeakAddr<Connection>, sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context, ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String, data: String,
) { ) {
use InitiatorOutput::{ClientRequest, InfoRequest}; use InitiatorOutput::{ClientRequest, InfoRequest};
@ -158,8 +157,8 @@ impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
) -> Self::Result { ) -> Self::Result {
use ConnectionObservableOutput::RecvData; use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg { if let RecvData(sender, data) = msg {
self.handle_request(sender, ctx, addr, data) self.handle_request(sender, ctx, data)
} }
} }
} }

View File

@ -57,7 +57,9 @@ impl NetworkListener {
let delegate = self.delegate.clone(); let delegate = self.delegate.clone();
ctx.spawn(wrap_future(async move { ctx.spawn(wrap_future(async move {
use ListenerOutput::NewConnection; use ListenerOutput::NewConnection;
let listener = TcpListener::bind(addr).await.unwrap(); let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await { while let Ok((stream, addr)) = listener.accept().await {
println!("[NetworkListener] accepted socket"); println!("[NetworkListener] accepted socket");
let conn = Connection::new(stream, addr); let conn = Connection::new(stream, addr);
@ -66,6 +68,7 @@ impl NetworkListener {
break; break;
}; };
println!("[NetworkListener] sending connection to delegate");
delegate.do_send(NewConnection(conn)) delegate.do_send(NewConnection(conn))
} }
})); }));

View File

@ -170,9 +170,12 @@ impl Actor for NetworkManager {
}, },
); );
ctx.spawn(fut); ctx.spawn(fut);
println!("[NetworkManager] Finished Starting");
} }
} }
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("[NetworkManager] network manager stopped");
}
} }
impl Handler<NetworkMessage> for NetworkManager { impl Handler<NetworkMessage> for NetworkManager {
@ -215,7 +218,10 @@ impl Handler<ListenerOutput> for NetworkManager {
) -> Self::Result { ) -> Self::Result {
use ListenerOutput::NewConnection; use ListenerOutput::NewConnection;
match msg { match msg {
NewConnection(connection) => self.new_connection(ctx, connection), NewConnection(connection) => {
println!("new connection");
self.new_connection(ctx, connection)
}
}; };
} }
} }