added ability to add and remove clients

This commit is contained in:
michael-bailey 2022-06-11 23:20:11 +01:00
parent d7c47f3b3b
commit f22e00e54a
8 changed files with 141 additions and 58 deletions

View File

@ -22,7 +22,6 @@ use foundation::messages::network::NetworkSockOut;
use foundation::ClientDetails;
use crate::network::{NetworkManager, NetworkMessage};
/// This struct is the main actor of the server.
/// all other actors are ran through here.
pub struct ServerActor {
@ -41,14 +40,14 @@ impl ServerActor {
pub(crate) fn client_request(
&mut self,
ctx: &mut <Self as Actor>::Context,
_ctx: &mut <Self as Actor>::Context,
addr: Addr<Connection>,
details: ClientDetails
) {
use ClientManagerMessage::{AddClient};
if let Some(mgr) = self.client_management.as_ref() {
let client = Client::new(addr, details);
mgr.do_send(AddClient(client));
let client = Client::new(addr, details.clone());
mgr.do_send(AddClient(details.uuid, client));
}
}
@ -104,7 +103,7 @@ impl Handler<NetworkOutput> for ServerActor {
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkOutput::{InfoRequested, NewClient,NewConnection};
use NetworkOutput::{InfoRequested, NewClient};
use NetworkSockOut::GotInfo;
println!("[ServerActor] received message");
match msg {
@ -113,11 +112,7 @@ impl Handler<NetworkOutput> for ServerActor {
// so they occur in the right order
InfoRequested(sender) => self.info_request(ctx, sender),
// A new client is to be added
NewClient(addr, details) => {
}
// A new client is to be added
NewConnection(_) => todo!(),
NewClient(addr, details) => self.client_request(ctx, addr, details),
};
}
}
@ -126,10 +121,13 @@ impl Handler<ClientManagerOutput> for ServerActor {
type Result = ();
fn handle(
&mut self,
_msg: ClientManagerOutput,
msg: ClientManagerOutput,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerOutput::{};
match msg {
_ => todo!()
}
}
}

View File

@ -1,9 +1,14 @@
mod actix_server;
mod client_management;
mod network;
//! # actor
//! This is the main module of the actix server.
//! It starts the actor runtime and then sleeps
//! for the duration of the program.
pub(crate) mod actix_server;
pub(crate) mod client_management;
pub(crate) mod network;
pub(crate) mod prelude;
pub(crate) use actix_server::ServerActor;
use actix_server::ServerActor;
use tokio::time::sleep;
use tokio::time::Duration;

View File

@ -1,20 +1,24 @@
use crate::network::Connection;
use crate::prelude::ObservableMessage;
use actix::{Actor, Addr, Context, Handler, Message, WeakAddr};
use actix::{Actor, Addr, Context, Handler, Message, WeakAddr, Recipient, Running, ArbiterHandle};
use serde_json::to_string;
use foundation::ClientDetails;
use crate::network::ConnectionMessage;
use uuid::Uuid;
use foundation::messages::client::ClientStreamOut;
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientMessage {
AddClient(Uuid, Addr<Client>),
RemoveClient(Uuid),
}
/// message that is sent to all observers of the current client.
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientObservableMessage {
SendRequest(WeakAddr<Client>, Uuid, String),
SendMessageRequest(WeakAddr<Client>, Uuid, String),
}
/// # Client
@ -23,6 +27,7 @@ pub(crate) enum ClientObservableMessage {
pub(crate) struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>
}
impl Client {
@ -33,6 +38,7 @@ impl Client {
Client {
connection,
details,
observers: Vec::default(),
}
.start()
}
@ -40,6 +46,14 @@ impl Client {
impl Actor for Client {
type Context = Context<Self>;
// tells the client that it has been connected
fn started(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Connected;
use ConnectionMessage::{SendData};
println!("[Client] started");
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Connected).unwrap()));
}
}
impl Handler<ClientMessage> for Client {
@ -55,4 +69,25 @@ impl Handler<ClientMessage> for Client {
}
}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
type Result = ();
fn handle(&mut self, msg: ObservableMessage<ClientObservableMessage>, ctx: &mut Self::Context) -> Self::Result {
use ObservableMessage::{Subscribe,Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Client] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Client] removing subscriber");
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
}
}
}

View File

@ -1,5 +1,5 @@
use crate::client_management::Client;
use actix::Actor;
use actix::{Actor, ArbiterHandle, Recipient, Running, WeakAddr};
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
@ -8,6 +8,8 @@ use actix::Message;
use actix::WeakRecipient;
use std::collections::HashMap;
use uuid::Uuid;
use crate::client_management::client::ClientObservableMessage;
use crate::prelude::ObservableMessage;
#[derive(Message)]
#[rtype(result = "()")]
@ -25,6 +27,12 @@ pub(crate) struct ClientManager {
delegate: WeakRecipient<ClientManagerOutput>,
}
impl ClientManager {
pub(crate) fn send_message_request(&self, ctx: &mut Context<ClientManager>, addr: WeakAddr<Client>, uuid: Uuid, content: String) {
todo!()
}
}
impl ClientManager {
pub(crate) fn new(
delegate: WeakRecipient<ClientManagerOutput>,
@ -36,16 +44,29 @@ impl ClientManager {
.start()
}
fn add_client(&mut self, ctx: Context<Self>, uuid: Uuid, addr: Addr<Client>) {
fn add_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid, addr: Addr<Client>) {
use crate::prelude::ObservableMessage::Subscribe;
let recp = ctx.address().recipient().downgrade();
let recp = ctx.address().recipient::<ClientObservableMessage>();
addr.do_send(Subscribe(recp));
self.clients.insert(uuid, addr)
self.clients.insert(uuid, addr);
}
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
use crate::prelude::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp));
}
}
}
impl Actor for ClientManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("[ClientManager] started");
}
}
impl Handler<ClientManagerMessage> for ClientManager {
@ -53,18 +74,25 @@ impl Handler<ClientManagerMessage> for ClientManager {
fn handle(
&mut self,
msg: ClientManagerMessage,
_ctx: &mut Self::Context,
ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerMessage::{AddClient, RemoveClient};
match msg {
// todo: Add subscription to the client.
AddClient(uuid, addr) => self.add_client(uuid, addr),
AddClient(uuid, addr) => self.add_client(ctx, uuid, addr),
// todo: remove subscription to client.
RemoveClient(addr) => {
if let Some(index) = self.clients.iter().position(|i| i == &addr) {
self.clients.remove(index);
}
}
RemoveClient(uuid) => self.remove_client(ctx, uuid),
}
}
}
impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(&mut self, msg: ClientObservableMessage, ctx: &mut Self::Context) -> Self::Result {
use ClientObservableMessage::SendMessageRequest;
match msg {
SendMessageRequest(addr, uuid, content) => self.send_message_request(ctx, addr, uuid, content),
}
}
}

View File

@ -29,7 +29,7 @@ use tokio::sync::Mutex;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionMessage {
pub enum ConnectionMessage {
SendData(String),
CloseConnection,
}
@ -56,7 +56,7 @@ enum SelfMessage {
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub(crate) struct Connection {
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,

View File

@ -1,7 +1,6 @@
use crate::network::connection::ConnectionOuput;
use crate::network::Connection;
use crate::network::{Connection, ConnectionMessage};
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorContext;
use actix::Addr;
@ -11,6 +10,8 @@ use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::WeakRecipient;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::ClientStreamOut::Error;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::ClientDetails;
use serde_json::{from_str, to_string};
@ -35,7 +36,7 @@ pub(crate) enum InitiatorOutput {
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub(crate) struct ConnectionInitiator {
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
@ -64,8 +65,14 @@ impl ConnectionInitiator {
use NetworkSockOut::{Connecting, GotInfo};
use ObservableMessage::Unsubscribe;
let msg = from_str::<NetworkSockIn>(data.as_str())
.expect("[ConnectionInitiator] error deserialising incomming message");
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let Some(delegate) = self.delegate.upgrade() {
match msg {
@ -88,6 +95,20 @@ impl ConnectionInitiator {
ctx.stop();
}
}
fn error(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error)
.expect("failed to convert error to string"),
));
sender.do_send(CloseConnection);
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
@ -102,12 +123,10 @@ impl Actor for ConnectionInitiator {
println!("[ConnectionInitiator] started");
self
.connection
self.connection
.do_send(Subscribe(ctx.address().recipient()));
self
.connection
self.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
@ -115,8 +134,7 @@ impl Actor for ConnectionInitiator {
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
self.connection
.do_send(Unsubscribe(ctx.address().recipient()));
}
}

View File

@ -6,6 +6,7 @@ use crate::network::listener::ListenerOutput;
use crate::network::Connection;
use crate::network::ConnectionInitiator;
use crate::network::InitiatorOutput;
use crate::network::InitiatorOutput::ClientRequest;
use crate::network::ListenerMessage;
use crate::network::NetworkListener;
use actix::Actor;
@ -19,29 +20,26 @@ use foundation::ClientDetails;
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub(crate) enum NetworkMessage {
pub enum NetworkMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum NetworkOutput {
NewConnection(Addr<Connection>),
pub enum NetworkOutput {
NewClient(Addr<Connection>, ClientDetails),
InfoRequested(Addr<Connection>),
}
pub(crate) struct NetworkManager {
pub struct NetworkManager {
listener_addr: Option<Addr<NetworkListener>>,
delegate: WeakRecipient<NetworkOutput>,
initiators: Vec<Addr<ConnectionInitiator>>,
}
impl NetworkManager {
pub(crate) fn new(
delegate: WeakRecipient<NetworkOutput>,
) -> Addr<NetworkManager> {
pub fn new(delegate: WeakRecipient<NetworkOutput>) -> Addr<NetworkManager> {
NetworkManager {
listener_addr: None,
delegate,
@ -94,16 +92,18 @@ impl NetworkManager {
/// to the server actor to be dispatched to the appropriate
/// manager
#[inline]
#[allow(unreachable_code, unused_variables)]
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
_connection: Addr<Connection>,
_client_details: ClientDetails,
connection: Addr<Connection>,
client_details: ClientDetails,
) {
use NetworkOutput::NewClient;
println!("[NetworkManager] recieved client request");
todo!();
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(NewClient(connection, client_details));
}
self.remove_initiator(sender);
}
@ -132,8 +132,7 @@ impl Actor for NetworkManager {
fn started(&mut self, ctx: &mut Self::Context) {
println!("started network manager");
let recipient = ctx.address().recipient();
self
.listener_addr
self.listener_addr
.replace(NetworkListener::new("0.0.0.0:5600", recipient));
}
}

View File

@ -5,7 +5,7 @@ use actix::Recipient;
/// represents common messages for observers
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ObservableMessage<M>
pub enum ObservableMessage<M>
where
M: Message + Send,
M::Result: Send,