merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
11 changed files with 449 additions and 160 deletions
Showing only changes of commit d7c47f3b3b - Show all commits

View File

@ -6,16 +6,16 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "serverlib"
path = "src/lib.rs"
# [lib]
# name = "serverlib"
# path = "src/lib.rs"
# [[bin]]
# name = "server"
# path = "src/main.rs"
[[bin]]
name = "server"
path = "src/main.rs"
[[bin]]
name = "server-actix"
path = "src/actor.rs"
[dependencies]
@ -30,8 +30,9 @@ openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
async-trait = "0.1.52"
actix = "0.12"
actix = "0.13"
mlua = { version = "0.7.3", features=["lua54", "async", "serde", "macros"] }
libloading = "0.7"
aquamarine = "0.1.11"
foundation = {path = '../foundation'}

135
server/src/actix_server.rs Normal file
View File

@ -0,0 +1,135 @@
//! # actix_server
//! this holds the server actor
//! the server acts as teh main actor
//! and supervisor to the actor system.
use crate::client_management::Client;
use crate::client_management::ClientManager;
use crate::client_management::ClientManagerOutput;
use crate::network::Connection;
use crate::network::ConnectionInitiator;
use crate::network::ConnectionMessage;
use crate::network::NetworkOutput;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorFutureExt;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use crate::client_management::ClientManagerMessage;
use foundation::messages::network::NetworkSockOut;
use foundation::ClientDetails;
use crate::network::{NetworkManager, NetworkMessage};
/// This struct is the main actor of the server.
/// all other actors are ran through here.
pub struct ServerActor {
network_manager: Option<Addr<NetworkManager>>,
client_management: Option<Addr<ClientManager>>,
}
impl ServerActor {
pub(crate) fn new() -> Addr<Self> {
ServerActor {
network_manager: None,
client_management: None,
}
.start()
}
pub(crate) fn client_request(
&mut self,
ctx: &mut <Self as Actor>::Context,
addr: Addr<Connection>,
details: ClientDetails
) {
use ClientManagerMessage::{AddClient};
if let Some(mgr) = self.client_management.as_ref() {
let client = Client::new(addr, details);
mgr.do_send(AddClient(client));
}
}
pub(crate) fn info_request(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
use NetworkSockOut::GotInfo;
use ConnectionMessage::{CloseConnection, SendData};
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
server_name: "String".to_owned(),
server_owner: "String".to_owned(),
})
.expect("Failed to serialise"),
)),
)
// equivalent to using .then() in js
.map(move |_out, _act: &mut Self, _ctx| {
sender.do_send(CloseConnection);
});
ctx.spawn(fut);
}
}
impl Actor for ServerActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let recp = ctx.address();
self
.network_manager
.replace(NetworkManager::new(recp.clone().recipient().downgrade()));
self
.client_management
.replace(ClientManager::new(recp.clone().recipient().downgrade()));
if let Some(net_mgr) = self.network_manager.as_ref() {
net_mgr.do_send(NetworkMessage::StartListening);
}
}
}
impl Handler<NetworkOutput> for ServerActor {
type Result = ();
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkOutput::{InfoRequested, NewClient,NewConnection};
use NetworkSockOut::GotInfo;
println!("[ServerActor] received message");
match msg {
// This uses promise like funcionality to queue
// a set of async operations,
// so they occur in the right order
InfoRequested(sender) => self.info_request(ctx, sender),
// A new client is to be added
NewClient(addr, details) => {
}
// A new client is to be added
NewConnection(_) => todo!(),
};
}
}
impl Handler<ClientManagerOutput> for ServerActor {
type Result = ();
fn handle(
&mut self,
_msg: ClientManagerOutput,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerOutput::{};
}
}

View File

@ -1,89 +1,17 @@
mod actix_server;
mod client_management;
mod network;
pub(crate) mod prelude;
use crate::network::ConnectionMessage;
use crate::network::NetworkOutput;
use actix::clock::sleep;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorFutureExt;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use foundation::messages::network::NetworkSockOut;
use network::{NetworkManager, NetworkMessage};
use std::time::Duration;
pub(crate) use actix_server::ServerActor;
/// This struct is the main actor of teh server.
/// all other actors are ran through here.
struct Server {
network_manager: Option<Addr<NetworkManager>>,
}
impl Server {
pub(crate) fn new() -> Addr<Self> {
Server {
network_manager: None,
}
.start()
}
}
impl Actor for Server {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self
.network_manager
.replace(NetworkManager::new(ctx.address().recipient()));
if let Some(net_mgr) = self.network_manager.as_ref() {
net_mgr.do_send(NetworkMessage::StartListening);
}
}
}
impl Handler<NetworkOutput> for Server {
type Result = ();
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkOutput::{InfoRequested, NewClient};
use NetworkSockOut::GotInfo;
println!("server received message");
match msg {
// This uses promise like funcionality to queue
// a set of async operations,
// so they occur in the right order
InfoRequested(sender) => {
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
server_name: "String".to_owned(),
server_owner: "String".to_owned(),
})
.expect("Failed to serialise"),
)),
)
// equivalent to using .then() in js
.map(move |out, act: &mut Self, ctx| {
sender.do_send(CloseConnection);
});
ctx.spawn(fut);
}
NewClient(_, _) => todo!(),
};
}
}
use tokio::time::sleep;
use tokio::time::Duration;
#[actix::main()]
async fn main() {
let server = Server::new();
let _server = ServerActor::new();
loop {
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(1000)).await;
}
}

View File

@ -0,0 +1,58 @@
use crate::network::Connection;
use crate::prelude::ObservableMessage;
use actix::{Actor, Addr, Context, Handler, Message, WeakAddr};
use foundation::ClientDetails;
use uuid::Uuid;
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientMessage {
AddClient(Uuid, Addr<Client>),
RemoveClient(Uuid),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientObservableMessage {
SendRequest(WeakAddr<Client>, Uuid, String),
}
/// # Client
/// This represents a connected client.
/// it will handle received message from a connection.
pub(crate) struct Client {
connection: Addr<Connection>,
details: ClientDetails,
}
impl Client {
pub(crate) fn new(
connection: Addr<Connection>,
details: ClientDetails,
) -> Addr<Self> {
Client {
connection,
details,
}
.start()
}
}
impl Actor for Client {
type Context = Context<Self>;
}
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(
&mut self,
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
_ => todo!(),
}
}
}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {}

View File

@ -0,0 +1,70 @@
use crate::client_management::Client;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::WeakRecipient;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerMessage {
AddClient(Uuid, Addr<Client>),
RemoveClient(Uuid),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerOutput {}
pub(crate) struct ClientManager {
clients: HashMap<Uuid, Addr<Client>>,
delegate: WeakRecipient<ClientManagerOutput>,
}
impl ClientManager {
pub(crate) fn new(
delegate: WeakRecipient<ClientManagerOutput>,
) -> Addr<Self> {
ClientManager {
delegate,
clients: HashMap::new(),
}
.start()
}
fn add_client(&mut self, ctx: Context<Self>, uuid: Uuid, addr: Addr<Client>) {
use crate::prelude::ObservableMessage::Subscribe;
let recp = ctx.address().recipient().downgrade();
addr.do_send(Subscribe(recp));
self.clients.insert(uuid, addr)
}
}
impl Actor for ClientManager {
type Context = Context<Self>;
}
impl Handler<ClientManagerMessage> for ClientManager {
type Result = ();
fn handle(
&mut self,
msg: ClientManagerMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerMessage::{AddClient, RemoveClient};
match msg {
// todo: Add subscription to the client.
AddClient(uuid, addr) => self.add_client(uuid, addr),
// todo: remove subscription to client.
RemoveClient(addr) => {
if let Some(index) = self.clients.iter().position(|i| i == &addr) {
self.clients.remove(index);
}
}
}
}
}

View File

@ -0,0 +1,7 @@
mod client;
mod client_manager;
pub(crate) use client::Client;
pub(crate) use client_manager::{
ClientManager, ClientManagerMessage, ClientManagerOutput,
};

View File

@ -88,6 +88,7 @@ impl Actor for Connection {
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let addr = ctx.address();
let read_half = self
.read_half
@ -98,6 +99,7 @@ impl Actor for Connection {
let mut buffer_string = String::new();
while let Ok(_) = reader.read_line(&mut buffer_string).await {
println!("[Connection] read line");
use SelfMessage::UpdateObserversWithData;
addr
.send(UpdateObserversWithData(buffer_string.clone()))
@ -106,6 +108,14 @@ impl Actor for Connection {
}
}));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionOuput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
recp.do_send(ConnectionClosed(ctx.address()));
}
}
}
impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
@ -118,9 +128,11 @@ impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
self.observers = self
.observers
.clone()
@ -145,6 +157,7 @@ impl Handler<ConnectionMessage> for Connection {
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
writeln!(&mut buffer, "{}", d.as_str());

View File

@ -10,6 +10,7 @@ use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::WeakRecipient;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::ClientDetails;
use serde_json::{from_str, to_string};
@ -24,8 +25,8 @@ enum ConnectionPhase {
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(Addr<Connection>),
ClientRequest(Addr<Connection>, ClientDetails),
InfoRequest(Addr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(Addr<ConnectionInitiator>, Addr<Connection>, ClientDetails),
}
/// # ConnectionInitiator
@ -35,13 +36,13 @@ pub(crate) enum InitiatorOutput {
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub(crate) struct ConnectionInitiator {
delegate: Recipient<InitiatorOutput>,
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: Recipient<InitiatorOutput>,
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
@ -64,20 +65,17 @@ impl ConnectionInitiator {
use ObservableMessage::Unsubscribe;
let msg = from_str::<NetworkSockIn>(data.as_str())
.expect("error deserialising incomming message");
match msg {
Info => self
.delegate
.do_send(InfoRequest(sender))
.expect("Failed to send info request Message"),
Connect {
uuid,
username,
address,
} => self
.delegate
.do_send(ClientRequest(
.expect("[ConnectionInitiator] error deserialising incomming message");
println!("[ConnectionInitiator] matching request");
if let Some(delegate) = self.delegate.upgrade() {
match msg {
Info => delegate.do_send(InfoRequest(ctx.address(), sender)),
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address(),
sender,
ClientDetails {
uuid,
@ -85,10 +83,10 @@ impl ConnectionInitiator {
address,
public_key: None,
},
))
.expect("Failed to send connect request"),
};
ctx.stop();
)),
};
ctx.stop();
}
}
}
@ -102,6 +100,8 @@ impl Actor for ConnectionInitiator {
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient()));
@ -114,6 +114,7 @@ impl Actor for ConnectionInitiator {
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient()));
@ -129,11 +130,8 @@ impl Handler<ConnectionOuput> for ConnectionInitiator {
) -> Self::Result {
use ConnectionOuput::{ConnectionClosed, RecvData};
use ConnectionPhase::Requested;
match msg {
RecvData(sender, addr, data) => {
self.handle_request(sender, ctx, addr, data)
}
ConnectionClosed(_) => todo!(),
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}

View File

@ -13,7 +13,6 @@ use actix::SpawnHandle;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
#[derive(Message)]
#[rtype(result = "()")]
@ -26,7 +25,6 @@ pub(super) enum ListenerMessage {
#[rtype(result = "()")]
pub(super) enum ListenerOutput {
NewConnection(Addr<Connection>),
InfoRequest(Addr<Connection>),
}
pub(super) struct NetworkListener {
@ -53,23 +51,24 @@ impl NetworkListener {
/// called when the actor is to start listening
fn start_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("Network listener started listening");
println!("[NetworkListener] started listening");
let addr = self.address.clone();
let self_addr = ctx.address();
let delegate = self.delegate.clone();
let loop_future = ctx.spawn(wrap_future(async move {
use ListenerOutput::NewConnection;
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
println!("new tcp connection");
println!("[NetworkListener] accepted socket");
let conn = Connection::new(stream, addr);
let connection_initiator =
ConnectionInitiator::new(self_addr.clone().recipient(), conn);
delegate.do_send(NewConnection(conn));
}
}));
}
/// called when the actor is to stop listening
fn stop_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("Network listener stopped listening");
println!("[NetworkListener] stopped listening");
if let Some(fut) = self.looper.take() {
ctx.cancel_future(fut);
}
@ -79,8 +78,12 @@ impl NetworkListener {
impl Actor for NetworkListener {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("Network listener started");
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] started");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] stopped");
}
}
@ -98,21 +101,3 @@ impl Handler<ListenerMessage> for NetworkListener {
}
}
}
impl Handler<InitiatorOutput> for NetworkListener {
type Result = ();
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(addr, client_details) => {}
InfoRequest(addr) => {
println!("Got Info request");
self.delegate.do_send(ListenerOutput::InfoRequest(addr));
}
}
}
}

View File

@ -1,3 +1,32 @@
//! # Network
//!
//! This module contains network code for the server.
//!
//! This includes:
//! - The network manager: For that handles all server network connections.
//! - The network listener: For listening for connections on a port.
//! - The conneciton: An abstraction over sockets sockets, for actix.
//! - The connection initiator: For initiating new connections to the server
//!
//! ## Diagrams
//!
//! ```mermaid
//! sequenceDiagram
//! Server->>NetworkManager: creates
//! NetworkManager->>NetworkListener: create
//! NetworkManager->>+NetworkListener: start listening
//!
//! loop async tcp listen
//! NetworkListener->>NetworkListener: check for new connections
//! end
//!
//! NetworkListener->>Connection: create from socket
//! NetworkListener->>NetworkManager: new connection
//! NetworkManager->>Server: new connection
//!
//! Server->>ConnectionInitiator: create with connection
//! ```
mod connection;
mod connection_initiator;
mod listener;

View File

@ -1,17 +1,20 @@
use crate::network::connection::ConnectionOuput;
//! # network_manager
//! This module contains the network manager actor
//! it's role involves handling new oncomming network connections
use crate::network::listener::ListenerOutput;
use crate::network::Connection;
use crate::network::ConnectionInitiator;
use crate::network::InitiatorOutput;
use crate::network::ListenerMessage;
use crate::network::NetworkListener;
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::WeakRecipient;
use foundation::ClientDetails;
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
@ -24,22 +27,25 @@ pub(crate) enum NetworkMessage {
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum NetworkOutput {
NewConnection(Addr<Connection>),
NewClient(Addr<Connection>, ClientDetails),
InfoRequested(Addr<Connection>),
}
pub(crate) struct NetworkManager {
listener_addr: Option<Addr<NetworkListener>>,
delegate: Recipient<NetworkOutput>,
delegate: WeakRecipient<NetworkOutput>,
initiators: Vec<Addr<ConnectionInitiator>>,
}
impl NetworkManager {
pub(crate) fn new(
delegate: Recipient<NetworkOutput>,
delegate: WeakRecipient<NetworkOutput>,
) -> Addr<NetworkManager> {
NetworkManager {
listener_addr: None,
delegate,
initiators: Vec::new(),
}
.start()
}
@ -58,18 +64,65 @@ impl NetworkManager {
}
}
/// Handles a new connection from the Listener.
/// This creates a new ConnectionInitaliser.
/// This completes the first part of the protocol.
#[inline]
fn new_connection(
&mut self,
ctx: &mut <Self as Actor>::Context,
connection: Addr<Connection>,
) {
println!("Got new connection");
let delegate = self.delegate.clone();
ctx.spawn(wrap_future(async move {
// delegate.send(NewConnection(recipient))
// delegate.send().await;
// delegate.send().await;
}));
println!("[NetworkManager] Got new connection");
let init = ConnectionInitiator::new(
ctx.address().recipient().downgrade(),
connection,
);
self.initiators.push(init);
}
#[inline]
fn remove_initiator(&mut self, sender: Addr<ConnectionInitiator>) {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
self.initiators.remove(index);
}
/// handles a initiator client request
/// this will, forward the conenction and client details
/// to the server actor to be dispatched to the appropriate
/// manager
#[inline]
#[allow(unreachable_code, unused_variables)]
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
_connection: Addr<Connection>,
_client_details: ClientDetails,
) {
println!("[NetworkManager] recieved client request");
todo!();
self.remove_initiator(sender);
}
/// This sends the connection to the server
/// which will in turn take over the protocol by sending
/// the servers infomation.
#[inline]
fn info_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
connection: Addr<Connection>,
) {
use NetworkOutput::InfoRequested;
println!("[NetworkManager] Got recieved info request");
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(InfoRequested(connection));
}
self.remove_initiator(sender);
}
}
@ -107,14 +160,26 @@ impl Handler<ListenerOutput> for NetworkManager {
msg: ListenerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ListenerOutput::{InfoRequest, NewConnection};
use ListenerOutput::NewConnection;
match msg {
NewConnection(connection) => self.new_connection(ctx, connection),
InfoRequest(connection) => self
.delegate
.do_send(NetworkOutput::InfoRequested(connection))
.expect("failed to send message"),
_ => todo!(),
};
}
}
impl Handler<InitiatorOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(sender, addr, client_details) => {
self.client_request(ctx, sender, addr, client_details)
}
InfoRequest(sender, addr) => self.info_request(ctx, sender, addr),
}
}
}