diff --git a/server/src/network/connection/actor.rs b/server/src/network/connection/actor.rs index cc10b54..98dbeb1 100644 --- a/server/src/network/connection/actor.rs +++ b/server/src/network/connection/actor.rs @@ -1,6 +1,7 @@ -use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc}; +use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; use actix::{ + clock::timeout, fut::wrap_future, Actor, ActorContext, @@ -12,7 +13,7 @@ use actix::{ SpawnHandle, WeakRecipient, }; -use futures::{future::join_all, Future, FutureExt}; +use futures::{future::join_all, stream::Buffered, Future, FutureExt}; use tokio::{ io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, net::TcpStream, @@ -35,11 +36,9 @@ use crate::{ /// - observers: A list of observers to events created by the connection. /// - loop_future: the future holding the receiving loop. pub struct Connection { - read_half: Option>, write_half: Arc>>, address: SocketAddr, observers: Vec>, - loop_future: Option, } impl Connection { @@ -48,14 +47,85 @@ impl Connection { /// returns: the Addr of the connection. pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr { let (read_half, write_half) = split(stream); - Connection { - read_half: Some(read_half), + let addr = Connection { write_half: Arc::new(Mutex::new(write_half)), address, observers: Vec::new(), - loop_future: None, } - .start() + .start(); + addr.do_send(ConnectionPrivateMessage::DoRead(BufReader::new(read_half))); + addr + } + + #[inline] + fn broadcast( + &self, + ctx: &mut ::Context, + data: ConnectionObservableOutput, + ) { + let futs: Vec + Send>>> = self + .observers + .iter() + .cloned() + .map(|r| { + let data = data.clone(); + async move { + if let Some(r) = r.upgrade() { + let _ = r.send(data).await; + } + } + .boxed() + }) + .collect(); + let _ = ctx.spawn(wrap_future(async { + join_all(futs).await; + })); + } + + #[inline] + fn do_read( + &mut self, + ctx: &mut ::Context, + mut buf_reader: BufReader>, + ) { + let address = self.address; + let weak_addr = ctx.address().downgrade(); + + let read_fut = async move { + let dur = Duration::from_millis(100); + let mut buffer_string: String = Default::default(); + + let read_fut = buf_reader.read_line(&mut buffer_string); + let Ok(Ok(len)) = timeout(dur, read_fut).await else { + println!("[Connection] timeout reached"); + if let Some(addr) = weak_addr.upgrade() { + addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); + } + return; + }; + + if len == 0 { + println!("[Connection] readline returned 0"); + return; + } + + if let Some(addr) = weak_addr.upgrade() { + let _ = addr + .send(ConnectionPrivateMessage::Broadcast( + ConnectionObservableOutput::RecvData( + addr.downgrade(), + address, + buffer_string.clone(), + ), + )) + .await; + } + + if let Some(addr) = weak_addr.upgrade() { + addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); + } + }; + ctx.spawn(wrap_future(read_fut)); } } @@ -67,49 +137,6 @@ impl Actor for Connection { /// then eneters loop readling lines from the tcp stream fn started(&mut self, ctx: &mut Self::Context) { println!("[Connection] started"); - let weak_addr = ctx.address().downgrade(); - - let read_half = self - .read_half - .take() - .expect("What the hell did you do wrong"); - - let mut reader = BufReader::new(read_half); - let mut buffer_string = String::new(); - let address = self.address; - - let reader_fut = wrap_future(async move { - while let Ok(len) = reader.read_line(&mut buffer_string).await { - if len == 0 { - println!("[Connection] readline returned 0"); - break; - } - - if let Some(addr) = weak_addr.upgrade() { - let _ = addr - .send(ConnectionPrivateMessage::Broadcast( - ConnectionObservableOutput::RecvData( - addr.downgrade(), - address, - buffer_string.clone(), - ), - )) - .await; - } - buffer_string.clear(); - - println!("[Connection] send data to observers"); - } - }) - .map(|_out, _a: &mut Connection, ctx| { - println!("[Connection] readline returned 0"); - let addr = ctx.address(); - addr.do_send(ConnectionPrivateMessage::Broadcast( - ConnectionObservableOutput::ConnectionClosed(addr.downgrade()), - )); - }); - - self.loop_future = Some(ctx.spawn(reader_fut)); } fn stopped(&mut self, ctx: &mut Self::Context) { @@ -150,7 +177,7 @@ impl Handler> for Connection { } } -impl Handler for Connection { +impl Handler for Connection { type Result = (); fn handle( &mut self, @@ -220,25 +247,9 @@ impl Handler for Connection { ) -> Self::Result { use ConnectionPrivateMessage::Broadcast; match msg { - Broadcast(data) => { - // this is a mess - let futs: Vec + Send>>> = self - .observers - .iter() - .cloned() - .map(|r| { - let data = data.clone(); - async move { - if let Some(r) = r.upgrade() { - let _ = r.send(data).await; - } - } - .boxed() - }) - .collect(); - let _ = ctx.spawn(wrap_future(async { - join_all(futs).await; - })); + Broadcast(data) => self.broadcast(ctx, data), + ConnectionPrivateMessage::DoRead(buf_reader) => { + self.do_read(ctx, buf_reader) } }; } diff --git a/server/src/network/connection/messages.rs b/server/src/network/connection/messages.rs index 56e9d07..c8005ce 100644 --- a/server/src/network/connection/messages.rs +++ b/server/src/network/connection/messages.rs @@ -1,6 +1,10 @@ use std::net::SocketAddr; use actix::{Message, WeakAddr}; +use tokio::{ + io::{BufReader, ReadHalf}, + net::TcpStream, +}; use crate::prelude::actors::Connection; @@ -23,4 +27,5 @@ pub(crate) enum ConnectionObservableOutput { #[rtype(result = "()")] pub(super) enum ConnectionPrivateMessage { Broadcast(ConnectionObservableOutput), + DoRead(BufReader>), } diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index b6ef8d3..6e429da 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -1,31 +1,31 @@ -//! # Network -//! -//! This module contains network code for the server. -//! -//! This includes: -//! - The network manager: For that handles all server network connections. -//! - The network listener: For listening for connections on a port. -//! - The conneciton: An abstraction over sockets sockets, for actix. -//! - The connection initiator: For initiating new connections to the server -//! -//! ## Diagrams -//! -//! ```mermaid -//! sequenceDiagram -//! Server->>NetworkManager: creates -//! NetworkManager->>NetworkListener: create -//! NetworkManager->>+NetworkListener: start listening -//! -//! loop async tcp listen -//! NetworkListener->>NetworkListener: check for new connections -//! end -//! -//! NetworkListener->>Connection: create from socket -//! NetworkListener->>NetworkManager: new connection -//! NetworkManager->>Server: new connection -//! -//! Server->>ConnectionInitiator: create with connection -//! ``` +#![doc = r"# Network + +This module contains network code for the server. + +This includes: +- The network manager: For that handles all server network connections. +- The network listener: For listening for connections on a port. +- The conneciton: An abstraction over sockets sockets, for actix. +- The connection initiator: For initiating new connections to the server + +## Diagrams + +```mermaid +sequenceDiagram + Server->>NetworkManager: creates + NetworkManager->>NetworkListener: create + NetworkManager->>+NetworkListener: start listening + + loop async tcp listen + NetworkListener->>NetworkListener: check for new connections + end + + NetworkListener->>Connection: create from socket + NetworkListener->>NetworkManager: new connection + NetworkManager->>Server: new connection + + Server->>ConnectionInitiator: create with connection +```"] mod connection; mod connection_initiator;