merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
3 changed files with 190 additions and 168 deletions
Showing only changes of commit c8246f5c86 - Show all commits

View File

@ -0,0 +1,171 @@
use std::net::SocketAddr;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::InitiatorOutput,
prelude::{
actors::Connection,
messages::{
ConnectionMessage,
ConnectionObservableOutput,
ObservableMessage,
},
},
};
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let (Some(delegate), Some(sender)) =
(self.delegate.upgrade(), sender.upgrade())
{
match msg {
Info => {
delegate.do_send(InfoRequest(ctx.address().downgrade(), sender))
}
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address().downgrade(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: WeakAddr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
if let Some(sender) = sender.upgrade() {
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error {
msg: "Error in connection initiator?".to_owned(),
})
.unwrap(),
));
sender.do_send(CloseConnection);
}
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use ConnectionMessage::SendData;
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient().downgrade()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient().downgrade()));
}
}
impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}
impl Drop for ConnectionInitiator {
fn drop(&mut self) {
println!("[ConnectionInitiator] Dropping value")
}
}

View File

@ -0,0 +1,15 @@
use actix::{Addr, Message, WeakAddr};
use foundation::ClientDetails;
use crate::prelude::actors::{Connection, ConnectionInitiator};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(WeakAddr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(
WeakAddr<ConnectionInitiator>,
Addr<Connection>,
ClientDetails,
),
}

View File

@ -1,169 +1,5 @@
use std::net::SocketAddr;
mod actor;
mod messages;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::{connection::ConnectionObservableOutput, Connection, ConnectionMessage},
prelude::messages::ObservableMessage,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(WeakAddr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(
WeakAddr<ConnectionInitiator>,
Addr<Connection>,
ClientDetails,
),
}
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let (Some(delegate), Some(sender)) = (self.delegate.upgrade(), sender.upgrade()) {
match msg {
Info => delegate.do_send(InfoRequest(ctx.address().downgrade(), sender)),
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address().downgrade(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(&mut self, ctx: &mut <Self as Actor>::Context, sender: WeakAddr<Connection>) {
use ConnectionMessage::{CloseConnection, SendData};
if let Some(sender) = sender.upgrade() {
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error {
msg: "Error in connection initiator?".to_owned(),
})
.unwrap(),
));
sender.do_send(CloseConnection);
}
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
use super::ConnectionMessage::SendData;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient().downgrade()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient().downgrade()));
}
}
impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}
impl Drop for ConnectionInitiator {
fn drop(&mut self) {
println!("[ConnectionInitiator] Dropping value")
}
}
pub(crate) use actor::ConnectionInitiator;
pub(crate) use messages::InitiatorOutput;