turned some references into weak variaties, to prevent memory leaks.

This commit is contained in:
michael-bailey 2023-01-15 11:16:39 +00:00
parent c79da5c893
commit f81246e1fa
15 changed files with 463 additions and 285 deletions

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
# First stage: build the server file.
FROM rust:alpine3.16 AS build
WORKDIR /app # avoid the root directory
COPY ./ ./
RUN cargo build --release --bin server
# Second stage: actually run the server file.
FROM alpine:latest
WORKDIR /app
COPY --from=build /app/target/release/server ./server
CMD server

View File

@ -44,23 +44,19 @@ impl Connection {
T: Serialize,
{
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
writeln!(&mut out_buffer, "{}", out)?;
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
return if let Some(mut writer) = old {
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
} else {
Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"))
let Some(mut writer) = old else {
return Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"));
};
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
}
pub async fn read<T>(&self) -> Result<T, Error>

View File

@ -45,7 +45,6 @@ pub trait IManager {
if let Some(manager) = Weak::upgrade(&weak_self) {
manager.run().await
}
()
}
});
}

View File

@ -1,5 +1,8 @@
use actix::{Actor, Addr, AsyncContext, Context, Handler, Recipient};
use foundation::{messages::client::ClientStreamIn, ClientDetails};
use actix::{Actor, Addr, AsyncContext, Context, Handler, WeakRecipient};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamOut},
ClientDetails,
};
use uuid::Uuid;
use crate::{
@ -9,8 +12,8 @@ use crate::{
ClientMessage,
ClientObservableMessage,
},
network::{Connection, ConnectionOuput},
prelude::messages::ObservableMessage,
network::{Connection, ConnectionObservableOutput},
prelude::messages::{ConnectionMessage, ObservableMessage},
};
/// # Client
@ -19,7 +22,7 @@ use crate::{
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>,
observers: Vec<WeakRecipient<ClientObservableMessage>>,
}
impl Client {
@ -34,39 +37,59 @@ impl Client {
#[inline]
fn get_clients(&self, ctx: &mut Context<Client>) {
println!("[Client] getting clients");
use ClientObservableMessage::GetClients;
self.broadcast(GetClients(ctx.address().downgrade()));
}
#[inline]
fn get_messages(&self, ctx: &mut Context<Client>) {
println!("[Client] getting messages");
use ClientObservableMessage::GetGlobalMessages;
self.broadcast(GetGlobalMessages(ctx.address().downgrade()));
}
#[inline]
fn send_message(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
println!("[Client] sending message");
use ClientObservableMessage::Message;
self.broadcast(Message(ctx.address().downgrade(), to, content));
}
#[inline]
fn send_gloal_message(&self, ctx: &mut Context<Client>, content: String) {
println!("[Client] sending global message");
use ClientObservableMessage::GlobalMessage;
self.broadcast(GlobalMessage(ctx.address().downgrade(), content));
}
#[inline]
fn disconnect(&self, _ctx: &mut Context<Client>) {
todo!()
println!("[Client] disconnecting");
use ClientObservableMessage::Disconnecting;
self.broadcast(Disconnecting(self.details.uuid));
}
#[inline]
fn broadcast(&self, message: ClientObservableMessage) {
println!("[Client] broadcasting message");
for recp in &self.observers {
recp.do_send(message.clone());
if let Some(upgraded) = recp.upgrade() {
upgraded.do_send(message.clone());
}
}
}
pub(crate) fn error(&self, msg: String) {
println!("[Client] sending error: {}", msg);
use serde_json::to_string;
use ConnectionMessage::SendData;
let msg = to_string::<ClientStreamOut>(&ClientStreamOut::Error { msg })
.expect("[Client] This should not fail");
self.connection.do_send(SendData(msg));
}
}
impl Actor for Client {
@ -74,7 +97,7 @@ impl Actor for Client {
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::{ClientStreamOut, ClientStreamOut::Connected};
use foundation::messages::client::ClientStreamOut::Connected;
use serde_json::to_string;
use crate::{
@ -84,8 +107,8 @@ impl Actor for Client {
println!("[Client] started");
self
.connection
.do_send::<ObservableMessage<ConnectionOuput>>(Subscribe(
ctx.address().recipient(),
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Subscribe(
ctx.address().recipient().downgrade(),
));
self
.connection
@ -93,7 +116,7 @@ impl Actor for Client {
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::{ClientStreamOut, ClientStreamOut::Disconnected};
use foundation::messages::client::ClientStreamOut::Disconnected;
use serde_json::to_string;
use crate::{
@ -105,8 +128,8 @@ impl Actor for Client {
self
.connection
.do_send::<ObservableMessage<ConnectionOuput>>(Unsubscribe(
ctx.address().recipient(),
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Unsubscribe(
ctx.address().recipient().downgrade(),
));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Disconnected).unwrap(),
@ -128,14 +151,11 @@ impl Handler<ClientDataMessage> for Client {
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Self::Context) -> Self::Result {
use foundation::messages::client::{
ClientStreamOut,
ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
},
use foundation::messages::client::ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
};
use serde_json::to_string;
@ -174,11 +194,15 @@ impl Handler<ClientMessage> for Client {
}
// Handles outputs from the connection.
impl Handler<ConnectionOuput> for Client {
impl Handler<ConnectionObservableOutput> for Client {
type Result = ();
fn handle(&mut self, msg: ConnectionOuput, ctx: &mut Self::Context) -> Self::Result {
use crate::network::ConnectionOuput::RecvData;
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use crate::network::ConnectionObservableOutput::RecvData;
if let RecvData(_sender, _addr, data) = msg {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
@ -188,14 +212,16 @@ impl Handler<ConnectionOuput> for Client {
SendMessage,
};
use serde_json::from_str;
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => self.send_gloal_message(ctx, content),
Disconnect => self.disconnect(ctx),
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => self.send_gloal_message(ctx, content),
Disconnect => self.disconnect(ctx),
}
} else {
self.error(format!("Failed to parse Message: {}", data));
}
}
}
@ -217,13 +243,20 @@ impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
}
Unsubscribe(r) => {
println!("[Client] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.filter(|a| a.upgrade() != r)
.collect();
}
}
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] Dropping value")
}
}

View File

@ -40,4 +40,5 @@ pub enum ClientObservableMessage {
GlobalMessage(WeakAddr<Client>, String),
GetClients(WeakAddr<Client>),
GetGlobalMessages(WeakAddr<Client>),
Disconnecting(Uuid),
}

View File

@ -220,7 +220,7 @@ impl ClientManager {
println!("[ClientManager] adding client");
use crate::prelude::messages::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
addr.do_send(Subscribe(recp));
addr.do_send(Subscribe(recp.downgrade()));
self.clients.insert(uuid, addr);
}
@ -229,7 +229,16 @@ impl ClientManager {
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp));
addr.do_send(Unsubscribe(recp.downgrade()));
}
}
fn disconnect_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] disconnecting client");
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp.downgrade()));
}
}
}
@ -268,6 +277,7 @@ impl Handler<ClientObservableMessage> for ClientManager {
ctx: &mut Self::Context,
) -> Self::Result {
use crate::client_management::client::ClientObservableMessage::{
Disconnecting,
GetClients,
GetGlobalMessages,
GlobalMessage,
@ -280,6 +290,7 @@ impl Handler<ClientObservableMessage> for ClientManager {
}
GetClients(sender) => self.send_client_list(ctx, sender),
GetGlobalMessages(sender) => self.send_global_messages(ctx, sender),
Disconnecting(uuid) => self.disconnect_client(ctx, uuid),
}
}
}

View File

@ -14,7 +14,9 @@ use crate::{
arg_parser::Arguments,
builder::Builder,
messages::{
ConfigManagerDataMessage, ConfigManagerDataResponse, ConfigManagerOutput,
ConfigManagerDataMessage,
ConfigManagerDataResponse,
ConfigManagerOutput,
},
types::ConfigValue::{Dict, Number, String as ConfigString},
ConfigValue,
@ -145,6 +147,8 @@ impl From<Builder> for ConfigManager {
.ok()
.unwrap_or_else(|| Dict(BTreeMap::new()));
println!("[ConfigManager] got stored: {:?}", stored);
let mut root = stored.clone();
if let Dict(root) = &mut root {
builder.args.map(|v| {

View File

@ -0,0 +1,255 @@
use std::{
io::Write,
net::SocketAddr,
pin::Pin,
sync::{Arc, Weak},
};
use actix::{
fut::wrap_future,
Actor,
ActorContext,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
Running,
SpawnHandle,
WeakRecipient,
};
use futures::{future::join_all, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
sync::Mutex,
};
use super::{ConnectionMessage, ConnectionObservableOutput};
use crate::{
network::connection::messages::ConnectionPrivateMessage,
prelude::messages::ObservableMessage,
};
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
loop_future: Option<SpawnHandle>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
loop_future: None,
}
.start()
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let addr = ctx.address().downgrade();
let read_half = self
.read_half
.take()
.expect("What the hell did you do wrong");
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
let address = self.address;
self.loop_future = Some(
ctx.spawn(
wrap_future(async move {
while let Ok(len) = reader.read_line(&mut buffer_string).await {
if len == 0 {
println!("[Connection] readline returned 0");
break;
}
if let Some(addr) = addr.upgrade() {
let _ = addr
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
address,
buffer_string.clone(),
),
))
.await;
}
buffer_string.clear();
println!("[Connection] send data to observers");
}
})
.map(|_out, _a: &mut Connection, ctx| {
println!("[Connection] readline returned 0");
let addr = ctx.address();
addr.do_send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::ConnectionClosed(addr.downgrade()),
));
}),
),
);
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionObservableOutput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
if let Some(recp) = recp.upgrade() {
recp.do_send(ConnectionClosed(ctx.address().downgrade()))
}
}
}
}
impl Handler<ObservableMessage<ConnectionObservableOutput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionObservableOutput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionObservableOutput>>>::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a.upgrade() != r)
.collect();
}
};
}
}
impl Handler<super::messages::ConnectionMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: ConnectionMessage, ctx: &mut Self::Context) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = Arc::downgrade(&self.write_half);
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
let Some(writer) = writer.upgrade() else {
return;
};
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
let _ = writeln!(&mut buffer, "{}", d.as_str());
let _ = lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
// impl Handler<SelfMessage> for Connection {
// type Result = ();
// fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result {
// use ConnectionObservableOutput::RecvData;
// use SelfMessage::UpdateObserversWithData;
// match msg {
// UpdateObserversWithData(data) => {
// let send = ctx.address();
// let addr = self.address;
// // this is a mess
// let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
// .observers
// .iter()
// .cloned()
// .map(|r| {
// let send = send.clone();
// let data = data.clone();
// async move {
// let _ = r.send(RecvData(send, addr, data)).await;
// }
// .boxed()
// })
// .collect();
// let _ = ctx.spawn(wrap_future(async {
// join_all(futs).await;
// }));
// }
// };
// }
// }
impl Handler<ConnectionPrivateMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionPrivateMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionPrivateMessage::Broadcast;
match msg {
Broadcast(data) => {
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let data = data.clone();
async move {
if let Some(r) = r.upgrade() {
let _ = r.send(data).await;
}
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
};
}
}
impl Drop for Connection {
fn drop(&mut self) {
println!("[Connection] Dropping value")
}
}

View File

@ -0,0 +1,30 @@
use std::net::SocketAddr;
use actix::{Addr, Message, WeakAddr};
use tokio::{
io::BufReader,
net::{tcp::ReadHalf, TcpStream},
};
use crate::prelude::actors::Connection;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub(crate) enum ConnectionObservableOutput {
RecvData(WeakAddr<Connection>, SocketAddr, String),
ConnectionClosed(WeakAddr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput),
}

View File

@ -1,202 +1,5 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc};
mod actor;
mod messages;
use actix::{
fut::wrap_future,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use futures::{future::join_all, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
sync::Mutex,
};
use crate::prelude::messages::ObservableMessage;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionOuput {
RecvData(Addr<Connection>, SocketAddr, String),
ConnectionClosed(Addr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
enum SelfMessage {
UpdateObserversWithData(String),
}
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<Recipient<ConnectionOuput>>,
_loop_future: Option<SpawnHandle>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(super) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
_loop_future: None,
}
.start()
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let addr = ctx.address();
let read_half = self
.read_half
.take()
.expect("What the hell did yu do wrong");
ctx.spawn(wrap_future(async move {
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
while let Ok(len) = reader.read_line(&mut buffer_string).await {
use ConnectionMessage::CloseConnection;
use SelfMessage::UpdateObserversWithData;
if len == 0 {
println!("[Connection] connection closed");
addr
.send(CloseConnection)
.await
.expect("[Connection] failed to send close message to self");
return;
}
println!("[Connection] read line");
let _ = addr
.send(UpdateObserversWithData(buffer_string.clone()))
.await;
buffer_string.clear();
}
}));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionOuput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
recp.do_send(ConnectionClosed(ctx.address()));
}
}
}
impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionOuput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionOuput>>>::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: ConnectionMessage, ctx: &mut Self::Context) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = self.write_half.clone();
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
let _ = writeln!(&mut buffer, "{}", d.as_str());
let _ = lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
impl Handler<SelfMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result {
use ConnectionOuput::RecvData;
use SelfMessage::UpdateObserversWithData;
match msg {
UpdateObserversWithData(data) => {
let send = ctx.address();
let addr = self.address;
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let send = send.clone();
let data = data.clone();
async move {
let _ = r.send(RecvData(send, addr, data)).await;
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
};
}
}
pub(crate) use actor::Connection;
pub(crate) use messages::{ConnectionMessage, ConnectionObservableOutput};

View File

@ -1,7 +1,15 @@
use std::net::SocketAddr;
use actix::{
Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, WeakRecipient,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::{
@ -13,15 +21,19 @@ use foundation::{
use serde_json::{from_str, to_string};
use crate::{
network::{connection::ConnectionOuput, Connection, ConnectionMessage},
network::{connection::ConnectionObservableOutput, Connection, ConnectionMessage},
prelude::messages::ObservableMessage,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(Addr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(Addr<ConnectionInitiator>, Addr<Connection>, ClientDetails),
InfoRequest(WeakAddr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(
WeakAddr<ConnectionInitiator>,
Addr<Connection>,
ClientDetails,
),
}
/// # ConnectionInitiator
@ -49,7 +61,7 @@ impl ConnectionInitiator {
fn handle_request(
&mut self,
sender: Addr<Connection>,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
@ -66,15 +78,15 @@ impl ConnectionInitiator {
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let Some(delegate) = self.delegate.upgrade() {
if let (Some(delegate), Some(sender)) = (self.delegate.upgrade(), sender.upgrade()) {
match msg {
Info => delegate.do_send(InfoRequest(ctx.address(), sender)),
Info => delegate.do_send(InfoRequest(ctx.address().downgrade(), sender)),
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address(),
ctx.address().downgrade(),
sender,
ClientDetails {
uuid,
@ -88,12 +100,17 @@ impl ConnectionInitiator {
}
}
fn error(&mut self, ctx: &mut <Self as Actor>::Context, sender: Addr<Connection>) {
fn error(&mut self, ctx: &mut <Self as Actor>::Context, sender: WeakAddr<Connection>) {
use ConnectionMessage::{CloseConnection, SendData};
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error).expect("failed to convert error to string"),
));
sender.do_send(CloseConnection);
if let Some(sender) = sender.upgrade() {
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error {
msg: "Error in connection initiator?".to_owned(),
})
.unwrap(),
));
sender.do_send(CloseConnection);
}
ctx.stop()
}
}
@ -113,7 +130,7 @@ impl Actor for ConnectionInitiator {
self
.connection
.do_send(Subscribe(ctx.address().recipient()));
.do_send(Subscribe(ctx.address().recipient().downgrade()));
self
.connection
@ -126,17 +143,27 @@ impl Actor for ConnectionInitiator {
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient()));
.do_send(Unsubscribe(ctx.address().recipient().downgrade()));
}
}
impl Handler<ConnectionOuput> for ConnectionInitiator {
impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
type Result = ();
fn handle(&mut self, msg: ConnectionOuput, ctx: &mut Self::Context) -> Self::Result {
use ConnectionOuput::RecvData;
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}
impl Drop for ConnectionInitiator {
fn drop(&mut self) {
println!("[ConnectionInitiator] Dropping value")
}
}

View File

@ -32,7 +32,7 @@ mod connection_initiator;
mod listener;
mod network_manager;
pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection::{Connection, ConnectionMessage, ConnectionObservableOutput};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
// use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{

View File

@ -86,10 +86,12 @@ impl NetworkManager {
}
#[inline]
fn remove_initiator(&mut self, sender: Addr<ConnectionInitiator>) {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
self.initiators.remove(index);
fn remove_initiator(&mut self, sender: WeakAddr<ConnectionInitiator>) {
if let Some(sender) = sender.upgrade() {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
let _ = self.initiators.remove(index);
}
}
/// handles a initiator client request
@ -100,7 +102,7 @@ impl NetworkManager {
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
client_details: ClientDetails,
) {
@ -119,7 +121,7 @@ impl NetworkManager {
fn info_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
) {
use NetworkOutput::InfoRequested;

View File

@ -6,18 +6,24 @@ mod observer;
#[allow(unused_imports)]
pub mod actors {
//! exports all actors used in the program.
pub(crate) use crate::client_management::client::Client;
pub(crate) use crate::client_management::ClientManager;
pub(crate) use crate::network::{Connection, ConnectionInitiator, NetworkManager};
pub use crate::server::Server;
pub(crate) use crate::{
client_management::{client::Client, ClientManager},
network::{Connection, ConnectionInitiator, NetworkManager},
};
}
#[allow(unused_imports)]
pub mod messages {
//! exports all messages used in the program.
pub(crate) use super::observer::ObservableMessage;
pub(crate) use crate::client_management::{ClientManagerMessage, ClientManagerOutput};
pub(crate) use crate::network::{
ConnectionMessage, ConnectionOuput, NetworkMessage, NetworkOutput,
pub(crate) use crate::{
client_management::{ClientManagerMessage, ClientManagerOutput},
network::{
ConnectionMessage,
ConnectionObservableOutput,
NetworkMessage,
NetworkOutput,
},
};
}

View File

@ -1,7 +1,7 @@
//! # observer.rs
//! crates a message type for the observer pattern.
use actix::{Message, Recipient};
use actix::{Message, WeakRecipient};
/// # ObservableMessage
/// represents common messages for observers
@ -12,6 +12,6 @@ where
M: Message + Send,
M::Result: Send,
{
Subscribe(Recipient<M>),
Unsubscribe(Recipient<M>),
Subscribe(WeakRecipient<M>),
Unsubscribe(WeakRecipient<M>),
}