From 63181ec9b5615927a12e4338a216b7772cb1384f Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Mon, 30 May 2022 08:57:57 +0100 Subject: [PATCH] implemented GetInfo for actix server --- example_plugin/src/example.rs | 108 ++++++++-------- example_plugin/src/lib.rs | 16 +-- server/src/actor.rs | 91 ++++++++++++-- server/src/network/connection.rs | 62 ++++++--- server/src/network/connection_initiator.rs | 139 +++++++++++++++++++++ server/src/network/listener.rs | 48 +++++-- server/src/network/mod.rs | 10 +- server/src/network/network_manager.rs | 108 ++++++++++------ 8 files changed, 446 insertions(+), 136 deletions(-) create mode 100644 server/src/network/connection_initiator.rs diff --git a/example_plugin/src/example.rs b/example_plugin/src/example.rs index 40dedc5..bd69a02 100644 --- a/example_plugin/src/example.rs +++ b/example_plugin/src/example.rs @@ -1,63 +1,63 @@ -use futures::lock::Mutex; -use serverlib::plugin::WeakPluginInterface; -use std::sync::Mutex as StdMutex; -use std::thread::sleep; -use std::time::Duration; +// use futures::lock::Mutex; +// use serverlib::plugin::WeakPluginInterface; +// use std::sync::Mutex as StdMutex; +// use std::thread::sleep; +// use std::time::Duration; -use serverlib::plugin::IPlugin; -use serverlib::plugin::PluginDetails; +// use serverlib::plugin::IPlugin; +// use serverlib::plugin::PluginDetails; -#[derive(Debug)] -pub struct ExamplePlugin { - number: Mutex, - interface: StdMutex>, -} +// #[derive(Debug)] +// pub struct ExamplePlugin { +// number: Mutex, +// interface: StdMutex>, +// } -impl Default for ExamplePlugin { - fn default() -> Self { - ExamplePlugin { - number: Mutex::new(0), - interface: StdMutex::default(), - } - } -} +// impl Default for ExamplePlugin { +// fn default() -> Self { +// ExamplePlugin { +// number: Mutex::new(0), +// interface: StdMutex::default(), +// } +// } +// } -#[async_trait::async_trait] -impl IPlugin for ExamplePlugin { - fn details(&self) -> PluginDetails { - PluginDetails { - display_name: "ExamplePlugin", - id: "io.github.michael-bailey.ExamplePlugin", - version: "0.0.1", - contacts: vec!["bailey-michael1@outlook.com"], - } - } +// #[async_trait::async_trait] +// impl IPlugin for ExamplePlugin { +// fn details(&self) -> PluginDetails { +// PluginDetails { +// display_name: "ExamplePlugin", +// id: "io.github.michael-bailey.ExamplePlugin", +// version: "0.0.1", +// contacts: vec!["bailey-michael1@outlook.com"], +// } +// } - fn set_interface(&self, interface: WeakPluginInterface) { - if let Ok(mut lock) = self.interface.lock() { - *lock = Some(interface); - } - } +// fn set_interface(&self, interface: WeakPluginInterface) { +// if let Ok(mut lock) = self.interface.lock() { +// *lock = Some(interface); +// } +// } - async fn event(&self) { - println!("Not Implemented"); - } +// async fn event(&self) { +// println!("Not Implemented"); +// } - fn init(&self) { - println!("[ExamplePlugin]: example init") - } +// fn init(&self) { +// println!("[ExamplePlugin]: example init") +// } - async fn run(&self) { - println!("Example!!!"); - sleep(Duration::new(1, 0)); - let mut a = self.number.lock().await; - *a = a.overflowing_add(1).0; - println!("[ExamplePlugin]: example run {}", *a); - } +// async fn run(&self) { +// println!("Example!!!"); +// sleep(Duration::new(1, 0)); +// let mut a = self.number.lock().await; +// *a = a.overflowing_add(1).0; +// println!("[ExamplePlugin]: example run {}", *a); +// } - fn deinit(&self) { - if let Some(mut lock) = self.number.try_lock() { - *lock = 0; - } - } -} +// fn deinit(&self) { +// if let Some(mut lock) = self.number.try_lock() { +// *lock = 0; +// } +// } +// } diff --git a/example_plugin/src/lib.rs b/example_plugin/src/lib.rs index 282291f..6bdc4d3 100644 --- a/example_plugin/src/lib.rs +++ b/example_plugin/src/lib.rs @@ -2,13 +2,13 @@ mod example; use std::sync::Arc; -use serverlib::plugin::Plugin; +// use serverlib::plugin::Plugin; -use crate::example::ExamplePlugin; -use serverlib::plugin::plugin::Plugin; -use std::sync::Arc; +// use crate::example::ExamplePlugin; +// use serverlib::plugin::plugin::Plugin; +// use std::sync::Arc; -#[no_mangle] -pub extern "C" fn get_plugin() -> Plugin { - Arc::new(ExamplePlugin::default()) -} +// #[no_mangle] +// pub extern "C" fn get_plugin() -> Plugin { +// Arc::new(ExamplePlugin::default()) +// } diff --git a/server/src/actor.rs b/server/src/actor.rs index 41b76e1..87b3193 100644 --- a/server/src/actor.rs +++ b/server/src/actor.rs @@ -1,18 +1,89 @@ mod network; pub(crate) mod prelude; -use network::NetworkManager; -use network::NetworkMessage::Ping; -use network::NetworkResponse::Pong; +use crate::network::ConnectionMessage; +use crate::network::NetworkOutput; +use actix::clock::sleep; +use actix::fut::wrap_future; +use actix::Actor; +use actix::ActorFutureExt; +use actix::Addr; +use actix::AsyncContext; +use actix::Context; +use actix::Handler; +use foundation::messages::network::NetworkSockOut; +use network::{NetworkManager, NetworkMessage}; +use std::time::Duration; + +/// This struct is the main actor of teh server. +/// all other actors are ran through here. +struct Server { + network_manager: Option>, +} + +impl Server { + pub(crate) fn new() -> Addr { + Server { + network_manager: None, + } + .start() + } +} + +impl Actor for Server { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self + .network_manager + .replace(NetworkManager::new(ctx.address().recipient())); + + if let Some(net_mgr) = self.network_manager.as_ref() { + net_mgr.do_send(NetworkMessage::StartListening); + } + } +} + +impl Handler for Server { + type Result = (); + fn handle( + &mut self, + msg: NetworkOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use ConnectionMessage::{CloseConnection, SendData}; + use NetworkOutput::{InfoRequested, NewClient}; + use NetworkSockOut::GotInfo; + println!("server received message"); + match msg { + // This uses promise like funcionality to queue + // a set of async operations, + // so they occur in the right order + InfoRequested(sender) => { + let fut = wrap_future( + sender.send(SendData( + serde_json::to_string(&GotInfo { + server_name: "String".to_owned(), + server_owner: "String".to_owned(), + }) + .expect("Failed to serialise"), + )), + ) + // equivalent to using .then() in js + .map(move |out, act: &mut Self, ctx| { + sender.do_send(CloseConnection); + }); + ctx.spawn(fut); + } + NewClient(_, _) => todo!(), + }; + } +} #[actix::main()] async fn main() { - let network = NetworkManager::new(); - - let pong = network.send(Ping).await; - if let Ok(Pong) = pong { - println!("received pong"); - } else { - println!("error occurred") + let server = Server::new(); + loop { + sleep(Duration::from_millis(500)).await; } } diff --git a/server/src/network/connection.rs b/server/src/network/connection.rs index 734a16d..cca5801 100644 --- a/server/src/network/connection.rs +++ b/server/src/network/connection.rs @@ -1,8 +1,7 @@ -/// # connection.rs -/// An actor that handles a TcpStream. use crate::prelude::ObservableMessage; use actix::fut::wrap_future; use actix::Actor; +use actix::ActorContext; use actix::Addr; use actix::AsyncContext; use actix::Context; @@ -10,17 +9,27 @@ use actix::Handler; use actix::Message; use actix::Recipient; use actix::SpawnHandle; +use futures::future::join_all; +use futures::Future; +use futures::FutureExt; +use serde::Serialize; +use std::io::Write; use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; use tokio::io::split; use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::io::ReadHalf; use tokio::io::WriteHalf; use tokio::net::TcpStream; +use tokio::sync::Mutex; +/// This is a message that can be sent to the Connection. #[derive(Message)] #[rtype(result = "()")] -enum ConnectionMessage { +pub(crate) enum ConnectionMessage { SendData(String), CloseConnection, } @@ -28,8 +37,8 @@ enum ConnectionMessage { #[derive(Message)] #[rtype(result = "()")] pub(crate) enum ConnectionOuput { - RecvData(String), - NoMessage, + RecvData(Addr, SocketAddr, String), + ConnectionClosed(Addr), } #[derive(Message)] @@ -49,7 +58,7 @@ enum SelfMessage { /// - loop_future: the future holding the receiving loop. pub(crate) struct Connection { read_half: Option>, - write_half: WriteHalf, + write_half: Arc>>, address: SocketAddr, observers: Vec>, loop_future: Option, @@ -63,7 +72,7 @@ impl Connection { let (read_half, write_half) = split(stream); Connection { read_half: Some(read_half), - write_half, + write_half: Arc::new(Mutex::new(write_half)), address, observers: Vec::new(), loop_future: None, @@ -128,12 +137,21 @@ impl Handler for Connection { fn handle( &mut self, msg: ConnectionMessage, - _ctx: &mut Self::Context, + ctx: &mut Self::Context, ) -> Self::Result { use ConnectionMessage::{CloseConnection, SendData}; + let writer = self.write_half.clone(); + match msg { - SendData(d) => {} - CloseConnection => {} + SendData(d) => { + ctx.spawn(wrap_future(async move { + let mut lock = writer.lock().await; + let mut buffer = Vec::new(); + writeln!(&mut buffer, "{}", d.as_str()); + lock.write_all(&buffer).await; + })); + } + CloseConnection => ctx.stop(), }; } } @@ -143,15 +161,31 @@ impl Handler for Connection { fn handle( &mut self, msg: SelfMessage, - _ctx: &mut Self::Context, + ctx: &mut Self::Context, ) -> Self::Result { use ConnectionOuput::RecvData; use SelfMessage::UpdateObserversWithData; match msg { UpdateObserversWithData(data) => { - for o in self.observers.clone() { - o.do_send(RecvData(data.clone())); - } + let send = ctx.address(); + let addr = self.address.clone(); + // this is a mess + let futs: Vec + Send>>> = self + .observers + .iter() + .cloned() + .map(|r| { + let send = send.clone(); + let data = data.clone(); + async move { + let _ = r.send(RecvData(send, addr, data)).await; + } + .boxed() + }) + .collect(); + let _ = ctx.spawn(wrap_future(async { + join_all(futs).await; + })); } }; } diff --git a/server/src/network/connection_initiator.rs b/server/src/network/connection_initiator.rs new file mode 100644 index 0000000..705d571 --- /dev/null +++ b/server/src/network/connection_initiator.rs @@ -0,0 +1,139 @@ +use crate::network::connection::ConnectionOuput; +use crate::network::Connection; +use crate::prelude::ObservableMessage; +use actix::fut::wrap_future; +use actix::Actor; +use actix::ActorContext; +use actix::Addr; +use actix::AsyncContext; +use actix::Context; +use actix::Handler; +use actix::Message; +use actix::Recipient; +use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; +use foundation::ClientDetails; +use serde_json::{from_str, to_string}; +use std::net::SocketAddr; + +#[derive(Debug, Clone, Copy)] +enum ConnectionPhase { + Started, + Requested, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum InitiatorOutput { + InfoRequest(Addr), + ClientRequest(Addr, ClientDetails), +} + +/// # ConnectionInitiator +/// Handles the initiatin of a new connection. +/// +/// This will do one of two things: +/// - Create a new client and send it to the network manager. +/// - Request the eserver info and send it to the connection. +pub(crate) struct ConnectionInitiator { + delegate: Recipient, + connection: Addr, +} + +impl ConnectionInitiator { + pub(crate) fn new( + delegate: Recipient, + connection: Addr, + ) -> Addr { + ConnectionInitiator { + connection, + delegate, + } + .start() + } + + fn handle_request( + &mut self, + sender: Addr, + ctx: &mut ::Context, + address: SocketAddr, + data: String, + ) { + use InitiatorOutput::{ClientRequest, InfoRequest}; + use NetworkSockIn::{Connect, Info}; + use NetworkSockOut::{Connecting, GotInfo}; + use ObservableMessage::Unsubscribe; + + let msg = from_str::(data.as_str()) + .expect("error deserialising incomming message"); + + match msg { + Info => self + .delegate + .do_send(InfoRequest(sender)) + .expect("Failed to send info request Message"), + Connect { + uuid, + username, + address, + } => self + .delegate + .do_send(ClientRequest( + sender, + ClientDetails { + uuid, + username, + address, + public_key: None, + }, + )) + .expect("Failed to send connect request"), + }; + ctx.stop(); + } +} + +impl Actor for ConnectionInitiator { + type Context = Context; + + /// on start initiate the protocol. + /// also add self as a subscriber to the connection. + fn started(&mut self, ctx: &mut Self::Context) { + use super::ConnectionMessage::SendData; + use NetworkSockOut::Request; + use ObservableMessage::Subscribe; + + self + .connection + .do_send(Subscribe(ctx.address().recipient())); + + self + .connection + .do_send(SendData(to_string(&Request).unwrap())); + } + + /// once stopped remove self from the connection subscribers + fn stopped(&mut self, ctx: &mut Self::Context) { + use ObservableMessage::Unsubscribe; + self + .connection + .do_send(Unsubscribe(ctx.address().recipient())); + } +} + +impl Handler for ConnectionInitiator { + type Result = (); + fn handle( + &mut self, + msg: ConnectionOuput, + ctx: &mut Self::Context, + ) -> Self::Result { + use ConnectionOuput::{ConnectionClosed, RecvData}; + use ConnectionPhase::Requested; + match msg { + RecvData(sender, addr, data) => { + self.handle_request(sender, ctx, addr, data) + } + ConnectionClosed(_) => todo!(), + } + } +} diff --git a/server/src/network/listener.rs b/server/src/network/listener.rs index f05b73b..7d23e24 100644 --- a/server/src/network/listener.rs +++ b/server/src/network/listener.rs @@ -1,6 +1,6 @@ -/// # listener.rs -/// An actor for listening for new connections from the network use crate::network::connection::Connection; +use crate::network::ConnectionInitiator; +use crate::network::InitiatorOutput; use actix::fut::wrap_future; use actix::Actor; use actix::Addr; @@ -12,7 +12,6 @@ use actix::Recipient; use actix::SpawnHandle; use std::net::SocketAddr; use std::net::ToSocketAddrs; -use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::RwLock; @@ -26,46 +25,51 @@ pub(super) enum ListenerMessage { #[derive(Message)] #[rtype(result = "()")] pub(super) enum ListenerOutput { - Started, - StartFailed, NewConnection(Addr), - NoConnection, - Error, - Stopped, + InfoRequest(Addr), } pub(super) struct NetworkListener { address: SocketAddr, - // delegate: Arc>>, + delegate: Recipient, looper: Option, } impl NetworkListener { pub(crate) fn new( address: T, - // delegate: Recipient, + delegate: Recipient, ) -> Addr { NetworkListener { address: address .to_socket_addrs() .unwrap() .collect::>()[0], - // delegate: Arc::new(RwLock::new(delegate)), + delegate, looper: None, } .start() } + /// called when the actor is to start listening fn start_listening(&mut self, ctx: &mut ::Context) { + println!("Network listener started listening"); let addr = self.address.clone(); + let self_addr = ctx.address(); let loop_future = ctx.spawn(wrap_future(async move { let listener = TcpListener::bind(addr).await.unwrap(); while let Ok((stream, addr)) = listener.accept().await { + println!("new tcp connection"); let conn = Connection::new(stream, addr); + let connection_initiator = + ConnectionInitiator::new(self_addr.clone().recipient(), conn); } })); } + + /// called when the actor is to stop listening fn stop_listening(&mut self, ctx: &mut ::Context) { + println!("Network listener stopped listening"); if let Some(fut) = self.looper.take() { ctx.cancel_future(fut); } @@ -75,7 +79,9 @@ impl NetworkListener { impl Actor for NetworkListener { type Context = Context; - fn started(&mut self, ctx: &mut Self::Context) {} + fn started(&mut self, ctx: &mut Self::Context) { + println!("Network listener started"); + } } impl Handler for NetworkListener { @@ -92,3 +98,21 @@ impl Handler for NetworkListener { } } } + +impl Handler for NetworkListener { + type Result = (); + fn handle( + &mut self, + msg: InitiatorOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use InitiatorOutput::{ClientRequest, InfoRequest}; + match msg { + ClientRequest(addr, client_details) => {} + InfoRequest(addr) => { + println!("Got Info request"); + self.delegate.do_send(ListenerOutput::InfoRequest(addr)); + } + } + } +} diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index d47526f..cd45f30 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -1,7 +1,11 @@ mod connection; +mod connection_initiator; mod listener; mod network_manager; -use connection::Connection; -use listener::{ListenerMessage, NetworkListener}; -pub(crate) use network_manager::{NetworkManager, NetworkMessage, NetworkResponse}; +pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput}; +pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput}; +use listener::{ListenerMessage, ListenerOutput, NetworkListener}; +pub(crate) use network_manager::{ + NetworkManager, NetworkMessage, NetworkOutput, +}; diff --git a/server/src/network/network_manager.rs b/server/src/network/network_manager.rs index 10b21ad..cc8c905 100644 --- a/server/src/network/network_manager.rs +++ b/server/src/network/network_manager.rs @@ -1,82 +1,120 @@ +use crate::network::connection::ConnectionOuput; +use crate::network::listener::ListenerOutput; +use crate::network::Connection; use crate::network::ListenerMessage; use crate::network::NetworkListener; +use crate::prelude::ObservableMessage; +use actix::fut::wrap_future; use actix::Actor; - use actix::Addr; use actix::AsyncContext; use actix::Context; use actix::Handler; use actix::Message; -use actix::MessageResponse; -use actix::SpawnHandle; -use std::time::Duration; -use tokio::net::TcpListener; +use actix::Recipient; +use foundation::ClientDetails; -#[derive(Message)] -#[rtype(result = "NetworkResponse")] +#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)] +#[rtype(result = "()")] pub(crate) enum NetworkMessage { StartListening, StopListening, - Ping, } -#[derive(MessageResponse, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub(crate) enum NetworkResponse { - Listening, - NotListening, - Pong, - None, +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum NetworkOutput { + NewClient(Addr, ClientDetails), + InfoRequested(Addr), } pub(crate) struct NetworkManager { - listener_addr: Addr, + listener_addr: Option>, + delegate: Recipient, } impl NetworkManager { - pub(crate) fn new() -> Addr { + pub(crate) fn new( + delegate: Recipient, + ) -> Addr { NetworkManager { - listener_addr: NetworkListener::new("0.0.0.0:5600"), + listener_addr: None, + delegate, } .start() } - fn start_listener( - &mut self, - _ctx: &mut ::Context, - ) -> NetworkResponse { - NetworkResponse::Listening + fn start_listener(&mut self, _ctx: &mut ::Context) { + use ListenerMessage::StartListening; + if let Some(addr) = self.listener_addr.as_ref() { + addr.do_send(StartListening); + } } - fn stop_listener( - &mut self, - _ctx: &mut ::Context, - ) -> NetworkResponse { + fn stop_listener(&mut self, _ctx: &mut ::Context) { use ListenerMessage::StopListening; - use NetworkResponse::NotListening; - self.listener_addr.do_send(StopListening); - NotListening + if let Some(addr) = self.listener_addr.as_ref() { + addr.do_send(StopListening); + } + } + + fn new_connection( + &mut self, + ctx: &mut ::Context, + connection: Addr, + ) { + println!("Got new connection"); + let delegate = self.delegate.clone(); + ctx.spawn(wrap_future(async move { + // delegate.send(NewConnection(recipient)) + // delegate.send().await; + // delegate.send().await; + })); } } impl Actor for NetworkManager { type Context = Context; - fn started(&mut self, _ctx: &mut Self::Context) {} + fn started(&mut self, ctx: &mut Self::Context) { + println!("started network manager"); + let recipient = ctx.address().recipient(); + self + .listener_addr + .replace(NetworkListener::new("0.0.0.0:5600", recipient)); + } } impl Handler for NetworkManager { - type Result = NetworkResponse; + type Result = (); fn handle( &mut self, msg: NetworkMessage, ctx: &mut ::Context, ) -> >::Result { - use NetworkMessage::{Ping, StartListening, StopListening}; - use NetworkResponse::{None, Pong}; + use NetworkMessage::{StartListening, StopListening}; match msg { StartListening => self.start_listener(ctx), - StopListening => None, - Ping => Pong, + StopListening => self.stop_listener(ctx), } } } + +impl Handler for NetworkManager { + type Result = (); + fn handle( + &mut self, + msg: ListenerOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use ListenerOutput::{InfoRequest, NewConnection}; + match msg { + NewConnection(connection) => self.new_connection(ctx, connection), + InfoRequest(connection) => self + .delegate + .do_send(NetworkOutput::InfoRequested(connection)) + .expect("failed to send message"), + _ => todo!(), + }; + } +}