removed the read loop, and replaced it with a recurrent messages.

This commit is contained in:
michael-bailey 2023-01-25 23:30:52 +00:00
parent 9e4bcfeb27
commit e0dfdd118e
3 changed files with 115 additions and 99 deletions

View File

@ -1,6 +1,7 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc};
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use actix::{
clock::timeout,
fut::wrap_future,
Actor,
ActorContext,
@ -12,7 +13,7 @@ use actix::{
SpawnHandle,
WeakRecipient,
};
use futures::{future::join_all, Future, FutureExt};
use futures::{future::join_all, stream::Buffered, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
@ -35,11 +36,9 @@ use crate::{
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
loop_future: Option<SpawnHandle>,
}
impl Connection {
@ -48,14 +47,85 @@ impl Connection {
/// returns: the Addr of the connection.
pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
let addr = Connection {
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
loop_future: None,
}
.start()
.start();
addr.do_send(ConnectionPrivateMessage::DoRead(BufReader::new(read_half)));
addr
}
#[inline]
fn broadcast(
&self,
ctx: &mut <Self as Actor>::Context,
data: ConnectionObservableOutput,
) {
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let data = data.clone();
async move {
if let Some(r) = r.upgrade() {
let _ = r.send(data).await;
}
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
#[inline]
fn do_read(
&mut self,
ctx: &mut <Self as Actor>::Context,
mut buf_reader: BufReader<ReadHalf<TcpStream>>,
) {
let address = self.address;
let weak_addr = ctx.address().downgrade();
let read_fut = async move {
let dur = Duration::from_millis(100);
let mut buffer_string: String = Default::default();
let read_fut = buf_reader.read_line(&mut buffer_string);
let Ok(Ok(len)) = timeout(dur, read_fut).await else {
println!("[Connection] timeout reached");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
return;
};
if len == 0 {
println!("[Connection] readline returned 0");
return;
}
if let Some(addr) = weak_addr.upgrade() {
let _ = addr
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
address,
buffer_string.clone(),
),
))
.await;
}
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
};
ctx.spawn(wrap_future(read_fut));
}
}
@ -67,49 +137,6 @@ impl Actor for Connection {
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let weak_addr = ctx.address().downgrade();
let read_half = self
.read_half
.take()
.expect("What the hell did you do wrong");
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
let address = self.address;
let reader_fut = wrap_future(async move {
while let Ok(len) = reader.read_line(&mut buffer_string).await {
if len == 0 {
println!("[Connection] readline returned 0");
break;
}
if let Some(addr) = weak_addr.upgrade() {
let _ = addr
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
address,
buffer_string.clone(),
),
))
.await;
}
buffer_string.clear();
println!("[Connection] send data to observers");
}
})
.map(|_out, _a: &mut Connection, ctx| {
println!("[Connection] readline returned 0");
let addr = ctx.address();
addr.do_send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::ConnectionClosed(addr.downgrade()),
));
});
self.loop_future = Some(ctx.spawn(reader_fut));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
@ -150,7 +177,7 @@ impl Handler<ObservableMessage<ConnectionObservableOutput>> for Connection {
}
}
impl Handler<super::messages::ConnectionMessage> for Connection {
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(
&mut self,
@ -220,25 +247,9 @@ impl Handler<ConnectionPrivateMessage> for Connection {
) -> Self::Result {
use ConnectionPrivateMessage::Broadcast;
match msg {
Broadcast(data) => {
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let data = data.clone();
async move {
if let Some(r) = r.upgrade() {
let _ = r.send(data).await;
}
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
Broadcast(data) => self.broadcast(ctx, data),
ConnectionPrivateMessage::DoRead(buf_reader) => {
self.do_read(ctx, buf_reader)
}
};
}

View File

@ -1,6 +1,10 @@
use std::net::SocketAddr;
use actix::{Message, WeakAddr};
use tokio::{
io::{BufReader, ReadHalf},
net::TcpStream,
};
use crate::prelude::actors::Connection;
@ -23,4 +27,5 @@ pub(crate) enum ConnectionObservableOutput {
#[rtype(result = "()")]
pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput),
DoRead(BufReader<ReadHalf<TcpStream>>),
}

View File

@ -1,31 +1,31 @@
//! # Network
//!
//! This module contains network code for the server.
//!
//! This includes:
//! - The network manager: For that handles all server network connections.
//! - The network listener: For listening for connections on a port.
//! - The conneciton: An abstraction over sockets sockets, for actix.
//! - The connection initiator: For initiating new connections to the server
//!
//! ## Diagrams
//!
//! ```mermaid
//! sequenceDiagram
//! Server->>NetworkManager: creates
//! NetworkManager->>NetworkListener: create
//! NetworkManager->>+NetworkListener: start listening
//!
//! loop async tcp listen
//! NetworkListener->>NetworkListener: check for new connections
//! end
//!
//! NetworkListener->>Connection: create from socket
//! NetworkListener->>NetworkManager: new connection
//! NetworkManager->>Server: new connection
//!
//! Server->>ConnectionInitiator: create with connection
//! ```
#![doc = r"# Network
This module contains network code for the server.
This includes:
- The network manager: For that handles all server network connections.
- The network listener: For listening for connections on a port.
- The conneciton: An abstraction over sockets sockets, for actix.
- The connection initiator: For initiating new connections to the server
## Diagrams
```mermaid
sequenceDiagram
Server->>NetworkManager: creates
NetworkManager->>NetworkListener: create
NetworkManager->>+NetworkListener: start listening
loop async tcp listen
NetworkListener->>NetworkListener: check for new connections
end
NetworkListener->>Connection: create from socket
NetworkListener->>NetworkManager: new connection
NetworkManager->>Server: new connection
Server->>ConnectionInitiator: create with connection
```"]
mod connection;
mod connection_initiator;