Merge branch 'feature/actix' into develop

This commit is contained in:
michael-bailey 2022-06-16 20:05:52 +02:00
commit 306526e1c2
42 changed files with 1743 additions and 1274 deletions

View File

@ -1,33 +1,33 @@
mod worker;
mod managers;
mod worker;
mod worker_message;
use cursive::{
menu::{Item, Tree},
traits::Nameable,
views::{Dialog, TextView},
Cursive,
CursiveExt,
};
use worker::Worker;
use cursive::{Cursive, CursiveExt};
use cursive::menu::{Item, Tree};
use cursive::traits::Nameable;
use cursive::views::{Dialog, TextView};
fn main() {
let mut app = Cursive::default();
let worker_stream =
Worker::new(app.cb_sink().clone()).start();
let worker_stream = Worker::new(app.cb_sink().clone()).start();
app.set_user_data(worker_stream);
app.add_layer(Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()));
app.add_layer(
Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()),
);
app.menubar().autohide = false;
app.menubar().add_subtree(
"Application",
Tree::new()
.item(
Item::leaf("About", |s| s.quit())
).delimiter().item(
Item::leaf("Quit",|s| s.quit())
)
.item(Item::leaf("About", |s| s.quit()))
.delimiter()
.item(Item::leaf("Quit", |s| s.quit())),
);
app.set_fps(30);
app.run();

View File

@ -1,18 +1,25 @@
use std::{
io::{Error, ErrorKind},
mem,
sync::{atomic::AtomicBool, Arc},
};
use async_trait::async_trait;
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use foundation::{
connection::Connection,
messages::{
client::{ClientStreamIn, ClientStreamOut},
network::{NetworkSockIn, NetworkSockOut},
},
prelude::IManager,
};
use tokio::{
net::ToSocketAddrs,
sync::{mpsc::Sender, Mutex},
};
use uuid::Uuid;
use crate::managers::NetworkManagerMessage;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager<M>
where
@ -144,13 +151,14 @@ where
#[cfg(test)]
mod test {
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
use serverlib::Server;
use std::future::Future;
use serverlib::Server;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use crate::managers::{network::NetworkManagerMessage, NetworkManager};
async fn wrap_setup<T, F>(test: T)
where
T: FnOnce(u16) -> F,

View File

@ -1,5 +1,4 @@
use foundation::ClientDetails;
use foundation::messages::network::NetworkSockOut;
use foundation::{messages::network::NetworkSockOut, ClientDetails};
#[derive(Debug)]
pub enum NetworkManagerMessage {
@ -9,16 +8,22 @@ pub enum NetworkManagerMessage {
server_name: String,
server_owner: String,
},
Error(&'static str)
Error(&'static str),
}
impl From<NetworkSockOut> for NetworkManagerMessage {
fn from(other: NetworkSockOut) -> Self {
use NetworkSockOut::{GotInfo as OldInfo};
use NetworkManagerMessage::{Info as NewInfo, Error};
use NetworkManagerMessage::{Error, Info as NewInfo};
use NetworkSockOut::GotInfo as OldInfo;
match other {
OldInfo {server_name,server_owner} => NewInfo {server_name,server_owner},
_ => Error("Error occurred with conversion")
OldInfo {
server_name,
server_owner,
} => NewInfo {
server_name,
server_owner,
},
_ => Error("Error occurred with conversion"),
}
}
}
@ -27,13 +32,21 @@ impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::Info;
match self {
Info {server_owner, server_name} => {
if let Info {server_owner: other_owner,server_name: other_name} = other {
return server_owner == other_owner && server_name == other_name;
Info {
server_owner,
server_name,
} => {
if let Info {
server_owner: other_owner,
server_name: other_name,
} = other
{
return server_owner == other_owner
&& server_name == other_name;
}
false
}
_ => {false}
_ => false,
}
}
}
}

View File

@ -3,5 +3,5 @@ mod network;
#[path = "message.rs"]
mod message;
pub use network::NetworkManager;
pub use message::NetworkManagerMessage;
pub use network::NetworkManager;

View File

@ -1,18 +1,25 @@
use std::{
io::{Error, ErrorKind},
mem,
sync::{atomic::AtomicBool, Arc},
};
use async_trait::async_trait;
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use foundation::{
connection::Connection,
messages::{
client::{ClientStreamIn, ClientStreamOut},
network::{NetworkSockIn, NetworkSockOut},
},
prelude::IManager,
};
use tokio::{
net::ToSocketAddrs,
sync::{mpsc::Sender, Mutex},
};
use uuid::Uuid;
use crate::managers::NetworkManagerMessage;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager<M>
where
@ -144,13 +151,14 @@ where
#[cfg(test)]
mod test {
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
use serverlib::Server;
use std::future::Future;
use serverlib::Server;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use crate::managers::{network::NetworkManagerMessage, NetworkManager};
async fn wrap_setup<T, F>(test: T)
where
T: FnOnce(u16) -> F,

View File

@ -1,27 +1,30 @@
use std::sync::Arc;
use std::thread::spawn;
use std::time::Duration;
use std::{sync::Arc, thread::spawn, time::Duration};
use crossbeam_channel::Sender as CrossSender;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Sender as TokioSender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use foundation::ClientDetails;
use crate::{Cursive, TextView};
use crate::managers::{NetworkManager};
use crate::worker_message::WorkerMessage;
use tokio::{
runtime::Runtime,
sync::{
mpsc::{channel, Sender as TokioSender},
Mutex,
},
time::sleep,
};
use crate::{
managers::NetworkManager,
worker_message::WorkerMessage,
Cursive,
TextView,
};
pub type CursiveSender = CrossSender<Box<dyn FnOnce(&mut Cursive) + Send>>;
pub struct Worker
{
pub struct Worker {
cursive_sender: CursiveSender,
network_manager: Arc<NetworkManager<WorkerMessage>>,
number: Arc<Mutex<usize>>,
#[allow(unused)]
@ -31,24 +34,22 @@ pub struct Worker
impl Worker {
pub fn new(sender: CursiveSender) -> Worker {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
let (tx, rx) = channel::<WorkerMessage>(16);
Worker {
network_manager: NetworkManager::new(tx.clone()),
number: Arc::new(Mutex::new(0)),
user_details: Mutex::new(None),
cursive_sender: sender
cursive_sender: sender,
}
}
pub fn start(self) -> TokioSender<WorkerMessage> {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
let (tx, rx) = channel::<WorkerMessage>(16);
spawn(move || {
let sender = self.cursive_sender.clone();
let rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let tmp_num = self.number.clone();
#[allow(unused)]
let network_manager = self.network_manager.clone();
@ -56,17 +57,19 @@ impl Worker {
let a = &tmp_num;
loop {
let num = Arc::clone(&a);
sleep(Duration::new(1,0)).await;
let _ = sender.send(Box::new( move |s| {
sleep(Duration::new(1, 0)).await;
let _ = sender.send(Box::new(move |s| {
let num = &num.clone();
let mut num_lock = num.blocking_lock();
*num_lock += 1;
let a = *num_lock;
s.find_name::<TextView>("TextView").unwrap().set_content(a.to_string());
s.find_name::<TextView>("TextView")
.unwrap()
.set_content(a.to_string());
}));
}
})
});
tx
}
}
}

View File

@ -12,13 +12,18 @@ pub enum WorkerMessage {
impl From<NetworkManagerMessage> for WorkerMessage {
fn from(other: NetworkManagerMessage) -> Self {
#[allow(unused)]
use WorkerMessage::{Info as NewInfo, Error as NewError};
use NetworkManagerMessage::{Error, Info as OldInfo};
#[allow(unused)]
use NetworkManagerMessage::{Info as OldInfo, Error};
use WorkerMessage::{Error as NewError, Info as NewInfo};
match other {
OldInfo {server_name, server_owner}
=> NewInfo {server_owner,server_name},
_ => todo!()
OldInfo {
server_name,
server_owner,
} => NewInfo {
server_owner,
server_name,
},
_ => todo!(),
}
}
}

View File

@ -1,63 +1,63 @@
use futures::lock::Mutex;
use serverlib::plugin::WeakPluginInterface;
use std::sync::Mutex as StdMutex;
use std::thread::sleep;
use std::time::Duration;
// use futures::lock::Mutex;
// use serverlib::plugin::WeakPluginInterface;
// use std::sync::Mutex as StdMutex;
// use std::thread::sleep;
// use std::time::Duration;
use serverlib::plugin::IPlugin;
use serverlib::plugin::PluginDetails;
// use serverlib::plugin::IPlugin;
// use serverlib::plugin::PluginDetails;
#[derive(Debug)]
pub struct ExamplePlugin {
number: Mutex<u8>,
interface: StdMutex<Option<WeakPluginInterface>>,
}
// #[derive(Debug)]
// pub struct ExamplePlugin {
// number: Mutex<u8>,
// interface: StdMutex<Option<WeakPluginInterface>>,
// }
impl Default for ExamplePlugin {
fn default() -> Self {
ExamplePlugin {
number: Mutex::new(0),
interface: StdMutex::default(),
}
}
}
// impl Default for ExamplePlugin {
// fn default() -> Self {
// ExamplePlugin {
// number: Mutex::new(0),
// interface: StdMutex::default(),
// }
// }
// }
#[async_trait::async_trait]
impl IPlugin for ExamplePlugin {
fn details(&self) -> PluginDetails {
PluginDetails {
display_name: "ExamplePlugin",
id: "io.github.michael-bailey.ExamplePlugin",
version: "0.0.1",
contacts: vec!["bailey-michael1@outlook.com"],
}
}
// #[async_trait::async_trait]
// impl IPlugin for ExamplePlugin {
// fn details(&self) -> PluginDetails {
// PluginDetails {
// display_name: "ExamplePlugin",
// id: "io.github.michael-bailey.ExamplePlugin",
// version: "0.0.1",
// contacts: vec!["bailey-michael1@outlook.com"],
// }
// }
fn set_interface(&self, interface: WeakPluginInterface) {
if let Ok(mut lock) = self.interface.lock() {
*lock = Some(interface);
}
}
// fn set_interface(&self, interface: WeakPluginInterface) {
// if let Ok(mut lock) = self.interface.lock() {
// *lock = Some(interface);
// }
// }
async fn event(&self) {
println!("Not Implemented");
}
// async fn event(&self) {
// println!("Not Implemented");
// }
fn init(&self) {
println!("[ExamplePlugin]: example init")
}
// fn init(&self) {
// println!("[ExamplePlugin]: example init")
// }
async fn run(&self) {
println!("Example!!!");
sleep(Duration::new(1, 0));
let mut a = self.number.lock().await;
*a = a.overflowing_add(1).0;
println!("[ExamplePlugin]: example run {}", *a);
}
// async fn run(&self) {
// println!("Example!!!");
// sleep(Duration::new(1, 0));
// let mut a = self.number.lock().await;
// *a = a.overflowing_add(1).0;
// println!("[ExamplePlugin]: example run {}", *a);
// }
fn deinit(&self) {
if let Some(mut lock) = self.number.try_lock() {
*lock = 0;
}
}
}
// fn deinit(&self) {
// if let Some(mut lock) = self.number.try_lock() {
// *lock = 0;
// }
// }
// }

View File

@ -2,13 +2,13 @@ mod example;
use std::sync::Arc;
use serverlib::plugin::Plugin;
// use serverlib::plugin::Plugin;
use crate::example::ExamplePlugin;
use serverlib::plugin::plugin::Plugin;
use std::sync::Arc;
// use crate::example::ExamplePlugin;
// use serverlib::plugin::plugin::Plugin;
// use std::sync::Arc;
#[no_mangle]
pub extern "C" fn get_plugin() -> Plugin {
Arc::new(ExamplePlugin::default())
}
// #[no_mangle]
// pub extern "C" fn get_plugin() -> Plugin {
// Arc::new(ExamplePlugin::default())
// }

View File

@ -1,13 +1,16 @@
use std::io::{Error, ErrorKind};
use std::io::Write;
use std::mem;
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io;
use tokio::io::{AsyncWriteExt, BufReader, AsyncBufReadExt, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::Mutex;
use std::{
io::{Error, ErrorKind, Write},
mem,
sync::Arc,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
#[derive(Debug)]
pub struct Connection {
@ -22,24 +25,28 @@ impl Connection {
stream_tx: Mutex::new(None),
})
}
pub async fn connect<T: ToSocketAddrs>(&self, host: T) -> Result<(), Error> {
pub async fn connect<T: ToSocketAddrs>(
&self,
host: T,
) -> Result<(), Error> {
let connection = TcpStream::connect(host).await?;
let (rd, wd) = io::split(connection);
let mut writer_lock = self.stream_tx.lock().await;
let mut reader_lock = self.stream_rx.lock().await;
let _ = mem::replace(&mut *writer_lock, Some(wd));
let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
Ok(())
}
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where T: Serialize {
let mut out_buffer = Vec::new();
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where
T: Serialize,
{
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
@ -56,11 +63,13 @@ impl Connection {
Ok(())
} else {
Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"))
}
};
}
pub async fn read<T>(&self) -> Result<T,Error>
where T: DeserializeOwned {
pub async fn read<T>(&self) -> Result<T, Error>
where
T: DeserializeOwned,
{
let mut buffer = String::new();
let mut reader_lock = self.stream_rx.lock().await;
let old = mem::replace(&mut *reader_lock, None);
@ -87,48 +96,49 @@ impl From<TcpStream> for Connection {
#[cfg(test)]
mod test {
use std::future::Future;
use std::io::Error;
use std::panic;
use std::{future::Future, io::Error, panic};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use serde::{Serialize,Deserialize};
use crate::connection::Connection;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum TestMessages {
Ping,
Pong
Pong,
}
#[tokio::test]
async fn a() -> Result<(), Error> {
wrap_setup(|port| {
async move {
println!("{}", port);
let connection = Connection::new();
connection.connect(format!("localhost:{}", &port)).await.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
}
}).await
wrap_setup(|port| async move {
println!("{}", port);
let connection = Connection::new();
connection
.connect(format!("localhost:{}", &port))
.await
.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
})
.await
}
async fn wrap_setup<T,F>(test: T) -> Result<(), std::io::Error>
where T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future
async fn wrap_setup<T, F>(test: T) -> Result<(), std::io::Error>
where
T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future,
{
let server = TcpListener::bind("localhost:0").await?;
let addr = server.local_addr()?;
// create tokio server execution
tokio::spawn(async move {
while let Ok((stream, addr)) = server.accept().await {
use TestMessages::{Ping,Pong};
use TestMessages::{Ping, Pong};
println!("[server]: Connected {}", &addr);
let connection = Connection::from(stream);
if let Ok(Ping) = connection.read::<TestMessages>().await {
@ -136,7 +146,7 @@ mod test {
}
}
});
test(addr.port()).await;
Ok(())
}

View File

@ -3,8 +3,10 @@
#[cfg(test)]
mod test {
use openssl::sha::sha256;
use openssl::symm::{Cipher, Crypter, Mode};
use openssl::{
sha::sha256,
symm::{Cipher, Crypter, Mode},
};
#[test]
fn testEncryption() {

View File

@ -1,10 +1,13 @@
use crate::event::event_result::EventResultBuilder;
use crate::event::EventResult;
use crate::event::EventResultType;
use std::collections::HashMap;
use futures::channel::oneshot::{channel, Receiver, Sender};
use crate::event::{
event_result::EventResultBuilder,
EventResult,
EventResultType,
};
/// # Eventw
/// Object that holds details about an event being passed through the application.
///
@ -70,7 +73,11 @@ impl<T> EventBuilder<T> {
}
}
pub fn add_arg<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
pub fn add_arg<K: Into<String>, V: Into<String>>(
mut self,
key: K,
value: V,
) -> Self {
self.args.insert(key.into(), value.into());
self
}

View File

@ -1,6 +1,7 @@
use futures::channel::oneshot::Sender;
use std::collections::HashMap;
use futures::channel::oneshot::Sender;
pub enum EventResultType {
Success,
NoResponse,
@ -15,7 +16,10 @@ pub struct EventResult {
}
impl EventResult {
pub fn create(result_type: EventResultType, sender: Sender<EventResult>) -> EventResultBuilder {
pub fn create(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> EventResultBuilder {
EventResultBuilder::new(result_type, sender)
}
}
@ -29,7 +33,10 @@ pub struct EventResultBuilder {
}
impl EventResultBuilder {
pub(self) fn new(result_type: EventResultType, sender: Sender<EventResult>) -> Self {
pub(self) fn new(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> Self {
Self {
code: result_type,
args: HashMap::default(),
@ -43,8 +50,7 @@ impl EventResultBuilder {
}
pub fn send(self) {
self
.sender
self.sender
.send(EventResult {
code: self.code,
args: self.args,

View File

@ -3,6 +3,7 @@ mod event;
mod event_result;
mod responder;
pub use self::responder::IResponder;
pub use event::{Event, EventBuilder};
pub use event_result::{EventResult, EventResultType};
pub use self::responder::IResponder;

View File

@ -1,9 +1,11 @@
use crate::event::Event;
use std::sync::Weak;
use crate::event::Event;
pub trait IResponder<T>
where
T: Sync + Send {
T: Sync + Send,
{
fn post_event(&self, event: Event<T>) {
if let Some(next) = self.get_next() {
if let Some(next) = next.upgrade() {

View File

@ -1,10 +1,9 @@
extern crate core;
pub mod connection;
pub mod encryption;
pub mod event;
pub mod messages;
pub mod prelude;
pub mod connection;
pub mod test;
use serde::{Deserialize, Serialize};

View File

@ -1,36 +1,35 @@
use crate::ClientDetails;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// # ClientMessage
/// This enum defined the message that a client can receive from the server
use crate::ClientDetails;
/// This enum defined the message that the server will receive from a client
/// This uses the serde library to transform to and from json.
///
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ClientStreamIn {
Connected,
Update,
SendMessage { to: Uuid, content: String },
SendGlobalMessage { content: String },
Disconnect,
}
/// This enum defined the message that the server will send to a client
/// This uses the serde library to transform to and from json.
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ClientStreamOut {
Connected,
ConnectedClients { clients: Vec<ClientDetails> },
UserMessage { from: Uuid, content: String },
GlobalMessage { from: Uuid, content: String },
ConnectedClients { clients: Vec<ClientDetails> },
Disconnected,
Connected,
// error cases
Error,
}
@ -40,7 +39,7 @@ impl PartialEq for ClientStreamOut {
match (self, other) {
(Connected, Connected) => true,
(Disconnected, Disconnected) => true,
_ => false
_ => false,
}
}
}

View File

@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Message the server will receive from a socket
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum NetworkSockIn {
@ -12,6 +13,7 @@ pub enum NetworkSockIn {
},
}
/// Message the server will send through a socket
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum NetworkSockOut {
@ -22,19 +24,26 @@ pub enum NetworkSockOut {
server_owner: String,
},
Connecting,
Error
Error,
}
impl PartialEq for NetworkSockOut {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(NetworkSockOut::Request, NetworkSockOut::Request) => true,
(NetworkSockOut::GotInfo {server_name,server_owner},
NetworkSockOut::GotInfo {server_owner: owner_other,server_name: name_other})
=> server_name == name_other && server_owner == owner_other,
(
NetworkSockOut::GotInfo {
server_name,
server_owner,
},
NetworkSockOut::GotInfo {
server_owner: owner_other,
server_name: name_other,
},
) => server_name == name_other && server_owner == owner_other,
(NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true,
_ => false
_ => false,
}
}
}

View File

@ -1,6 +1,9 @@
use std::{
sync::{Arc, Weak},
time::Duration,
};
use async_trait::async_trait;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::sleep;
/// # IManager

View File

@ -1,23 +1,25 @@
use std::io::{Error};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::join;
use tokio::net::{TcpStream,TcpListener};
use std::{io::Error, net::SocketAddr, sync::Arc};
use tokio::{
join,
net::{TcpListener, TcpStream},
};
use crate::connection::Connection;
pub async fn create_connection_pair()
-> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr )), Error> {
pub async fn create_connection_pair(
) -> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr)), Error> {
let listener: TcpListener = TcpListener::bind("localhost:0000").await?;
let port = listener.local_addr()?.port();
let (server_res,client_res) = join!(
let (server_res, client_res) = join!(
async { TcpStream::connect(format!("localhost:{}", port)).await },
async { listener.accept().await }
);
let (client,addr) = client_res?;
let (client, addr) = client_res?;
let server = Arc::new(Connection::from(server_res?));
let client = Arc::new(Connection::from(client));
Ok((server,(client,addr)))
}
Ok((server, (client, addr)))
}

View File

@ -1,3 +1,3 @@
mod connection_pair;
pub use connection_pair::create_connection_pair;
pub use connection_pair::create_connection_pair;

View File

@ -1,5 +1,5 @@
hard_tabs = true
max_width = 100
max_width = 80
imports_indent = "Block"
imports_layout = "HorizontalVertical"
imports_granularity = "Crate"

View File

@ -6,9 +6,13 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "serverlib"
path = "src/lib.rs"
# [lib]
# name = "serverlib"
# path = "src/lib.rs"
# [[bin]]
# name = "server"
# path = "src/main.rs"
[[bin]]
name = "server"
@ -26,8 +30,10 @@ openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
async-trait = "0.1.52"
actix = "0.12"
actix = "0.13"
mlua = { version = "0.7.3", features=["lua54", "async", "serde", "macros"] }
libloading = "0.7"
aquamarine = "0.1.11"
tokio-stream = "0.1.9"
foundation = {path = '../foundation'}

View File

@ -1,80 +0,0 @@
use crate::client::Client;
use crate::messages::ServerMessage;
use std::sync::{Arc, Weak};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Message {
content: String,
sender: Weak<Client<>>,
}
impl Message {
#[allow(unused)]
pub fn new(content: String, sender: Weak<Client<>>) -> Message {
Message { content, sender }
}
}
enum ChatManagerMessage {
AddMessage {sender: Weak<Client<>>, content: String}
}
pub struct ChatManager {
messages: Mutex<Vec<Message>>,
server_channel: Sender<ServerMessage>,
#[allow(unused)]
tx: Sender<ChatManagerMessage>,
rx: Mutex<Receiver<ChatManagerMessage>>,
}
impl ChatManager {
#[allow(unused)]
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (tx, rx) = channel::<ChatManagerMessage>(1024);
let manager = Arc::new(ChatManager {
messages: Mutex::new(Vec::new()),
server_channel,
tx,
rx: Mutex::new(rx),
});
manager.start();
manager
}
#[allow(unused)]
fn start(self: &Arc<ChatManager>) {
let manager = self.clone();
tokio::spawn(async move {
use ServerMessage::{BroadcastGlobalMessage};
use ChatManagerMessage::{AddMessage};
while let Some(message) = manager.rx.lock().await.recv().await {
match message {
AddMessage { content,sender } => {
let sender = &sender.upgrade().unwrap().details.uuid;
manager.server_channel.send(
BroadcastGlobalMessage {sender: sender.clone(), content}
).await.unwrap();
}
}
}
});
}
#[allow(unused)]
pub async fn add_message(self: &Arc<Self>, sender: Weak<Client>, content: String) {
let mut a = self.messages.lock().await;
a.push(Message::new(content, sender))
}
#[allow(unused)]
pub async fn get_all_messages(self: &Arc<Self>) -> Vec<Message> {
self.messages.lock().await.clone()
}
}

View File

@ -1,247 +0,0 @@
use std::cmp::Ordering;
use std::io::Error;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use async_trait::async_trait;
use mlua::prelude::LuaUserData;
use mlua::{UserDataFields, UserDataMethods};
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::ClientDetails;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use crate::messages::{ClientMessage};
/// # ClientInMessage
///
/// Messages that are sent internally
/// when functions are called on the client
#[derive(Serialize, Deserialize)]
enum ClientInMessage {
MessageTo,
UpdateRequest,
}
/// # Client
/// This struct represents a connected user.
///
/// ## Attributes
/// - details: store of the clients infomation.
///
/// - stream: The socket for the connected client.
/// - stream_reader: the buffered reader used to receive messages
/// - stream_writer: the buffered writer used to send messages
/// - owner: An optional reference to the owning object.
#[derive(Debug)]
pub struct Client<Out: 'static>
where
Out: From<ClientMessage> + Send
{
pub details: ClientDetails,
out_channel: Sender<Out>,
connection: Arc<Connection>,
}
impl<Out> Client<Out>
where
Out: From<ClientMessage> + Send {
pub fn new(
uuid: Uuid,
username: String,
address: String,
out_channel: Sender<Out>,
connection: Arc<Connection>
) -> Arc<Client<Out>> {
Arc::new(Client {
details: ClientDetails {
uuid,
username,
address: address.to_string(),
public_key: None,
},
connection,
out_channel,
})
}
async fn handle_connection(&self, value: Result<ClientStreamIn, Error>) {
match value {
Ok(ClientStreamIn::Disconnect) => {
println!(
"[Client {:?}]: Disconnect received",
self.details.uuid
);
self.disconnect().await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
let _ = self.out_channel.send(
ClientMessage::IncomingMessage {from: self.details.uuid, to, content}.into()
).await;
}
Ok(ClientStreamIn::SendGlobalMessage { content }) => {
let _ = self.out_channel.send(
ClientMessage::IncomingGlobalMessage {from: self.details.uuid, content}.into()
).await;
}
_ => {
self.error("Command not found").await;
}
}
}
pub async fn broadcast_message(&self, from: Uuid, content: String) -> Result<(), Error> {
self.connection.write(ClientStreamOut::GlobalMessage { from, content }).await?;
Ok(())
}
pub async fn user_message(&self, from: Uuid, content: String) -> Result<(), Error> {
self.connection.write(ClientStreamOut::UserMessage { from, content }).await?;
Ok(())
}
async fn disconnect(&self) {
let _ = self.out_channel
.send(ClientMessage::Disconnect {
id: self.details.uuid,
}.into()).await;
}
async fn error(&self, msg: &str) {
let _ = self.connection.write(ClientStreamOut::Error).await;
}
}
#[async_trait]
impl<Out> IManager for Client<Out>
where
Out: From<ClientMessage> + Send
{
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{
let _ = self.connection.write(Connected).await;
}
async fn run(self: &Arc<Self>) {
let client = self.clone();
select! {
val = self.connection.read::<ClientStreamIn>() => {
tokio::spawn(async move {
client.handle_connection(val).await;
});
}
}
}
}
// MARK: - use to handle disconnecting
impl<Out> Drop for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn drop(&mut self) {
let connection = self.connection.clone();
let id = self.details.uuid.clone();
tokio::spawn(async move {
let _ = connection.write(Disconnected).await;
});
}
}
// MARK: - used for sorting.
impl<Out> PartialEq for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn eq(&self, other: &Self) -> bool {
self.details.uuid == other.details.uuid
}
}
impl<Out> Eq for Client<Out>
where
Out: From<ClientMessage> + Send
{}
impl<Out> PartialOrd for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<Out: 'static> Ord for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn cmp(&self, other: &Self) -> Ordering {
self.details.uuid.cmp(&other.details.uuid)
}
}
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use foundation::test::create_connection_pair;
use crate::client::{Client};
use crate::messages::ClientMessage;
use crate::messages::ClientMessage::Disconnect;
#[tokio::test]
async fn create_client_and_drop() -> Result<(), Error> {
let (sender, mut receiver) =
channel::<ClientMessage>(1024);
let (server, (client_conn, addr)) =
create_connection_pair().await?;
// client details
let uuid = Uuid::new_v4();
let username = "TestUser".to_string();
let client = Client::new(
uuid,
username,
addr.to_string(),
sender.clone(),
server
);
client.start();
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Connected);
drop(client);
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Disconnected);
// fetch from out_channel
let disconnect_msg = receiver.recv().await.unwrap();
assert_eq!(disconnect_msg, Disconnect {id: uuid});
Ok(())
}
}

View File

@ -0,0 +1,275 @@
use std::net::SocketAddr;
use actix::{
Actor,
Addr,
ArbiterHandle,
AsyncContext,
Context,
Handler,
Message,
MessageResponse,
Recipient,
Running,
WeakAddr,
};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamOut},
ClientDetails,
};
use serde_json::{from_str, to_string};
use uuid::Uuid;
use crate::{
client_management::client::ClientObservableMessage::{
SendGlobalMessageRequest,
SendMessageRequest,
UpdateRequest,
},
network::{
Connection,
ConnectionMessage,
ConnectionMessage::SendData,
ConnectionOuput,
},
prelude::{
ObservableMessage,
ObservableMessage::{Subscribe, Unsubscribe},
},
};
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientMessage {
SendUpdate(Vec<ClientDetails>),
SendMessage { from: Uuid, content: String },
SendGlobalMessage { from: Uuid, content: String },
}
#[derive(Message)]
#[rtype(result = "ClientDetailsResponse")]
pub struct ClientDataMessage;
#[derive(MessageResponse)]
pub struct ClientDetailsResponse(pub ClientDetails);
/// messages the client will send to itself
enum SelfMessage {
ReceivedMessage(ClientStreamIn),
}
/// message that is sent to all observers of the current client.
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub enum ClientObservableMessage {
SendMessageRequest(WeakAddr<Client>, Uuid, String),
SendGlobalMessageRequest(WeakAddr<Client>, String),
UpdateRequest(WeakAddr<Client>),
}
/// # Client
/// This represents a connected client.
/// it will handle received message from a connection.
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>,
}
impl Client {
pub(crate) fn new(
connection: Addr<Connection>,
details: ClientDetails,
) -> Addr<Self> {
Client {
connection,
details,
observers: Vec::default(),
}
.start()
}
fn handle_request(
&mut self,
ctx: &mut Context<Client>,
sender: Addr<Connection>,
addr: SocketAddr,
data: String,
) {
use ClientStreamIn::{
Disconnect,
SendGlobalMessage,
SendMessage,
Update,
};
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
Update => self.handle_update(ctx),
SendMessage { to, content } => self.handle_send(ctx, to, content),
SendGlobalMessage { content } => {
self.handle_global_send(ctx, content)
}
Disconnect => self.handle_disconnect(ctx),
_ => todo!(),
}
}
#[inline]
fn handle_update(&self, ctx: &mut Context<Client>) {
self.broadcast(UpdateRequest(ctx.address().downgrade()));
}
#[inline]
fn handle_send(
&self,
ctx: &mut Context<Client>,
to: Uuid,
content: String,
) {
self.broadcast(SendMessageRequest(
ctx.address().downgrade(),
to,
content,
));
}
#[inline]
fn handle_global_send(&self, ctx: &mut Context<Client>, content: String) {
self.broadcast(SendGlobalMessageRequest(
ctx.address().downgrade(),
content,
));
}
#[inline]
fn handle_disconnect(&self, ctx: &mut Context<Client>) {
todo!()
}
#[inline]
fn broadcast(&self, message: ClientObservableMessage) {
for recp in &self.observers {
recp.do_send(message.clone());
}
}
}
impl Actor for Client {
type Context = Context<Self>;
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Connected;
use ConnectionMessage::SendData;
println!("[Client] started");
self.connection
.do_send(Subscribe(ctx.address().recipient()));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Connected).unwrap(),
));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Disconnected;
use ConnectionMessage::SendData;
self.connection
.do_send(Unsubscribe(ctx.address().recipient()));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Disconnected).unwrap(),
));
}
}
impl Handler<ClientDataMessage> for Client {
type Result = ClientDetailsResponse;
fn handle(
&mut self,
msg: ClientDataMessage,
ctx: &mut Self::Context,
) -> Self::Result {
ClientDetailsResponse(self.details.clone())
}
}
// Handles incoming messages to the client.
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(
&mut self,
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientMessage::{SendGlobalMessage, SendMessage, SendUpdate};
use ClientStreamOut::{ConnectedClients, GlobalMessage, UserMessage};
match msg {
SendUpdate(clients) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&ConnectedClients { clients })
.expect("[Client] Failed to encode string"),
)),
SendMessage { content, from } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
SendGlobalMessage { from, content } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage {
from,
content,
})
.expect("[Client] Failed to encode string"),
))
}
_ => todo!(),
}
}
}
// Handles outputs from the connection.
impl Handler<ConnectionOuput> for Client {
type Result = ();
fn handle(
&mut self,
msg: ConnectionOuput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::RecvData;
match msg {
RecvData(sender, addr, data) => {
self.handle_request(ctx, sender, addr, data)
}
_ => todo!(),
}
}
}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ClientObservableMessage>,
ctx: &mut Self::Context,
) -> Self::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Client] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Client] removing subscriber");
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
}
}
}

View File

@ -0,0 +1,233 @@
use std::collections::HashMap;
use actix::{
fut::{wrap_future, wrap_stream},
Actor,
ActorFutureExt,
ActorStreamExt,
Addr,
ArbiterHandle,
AsyncContext,
Context,
Handler,
MailboxError,
Message,
MessageResponse,
Recipient,
Running,
StreamHandler,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamIn::SendGlobalMessage},
ClientDetails,
};
use futures::{SinkExt, TryStreamExt};
use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::{
client_management::{
client::{
ClientDataMessage,
ClientMessage,
ClientMessage::SendMessage,
ClientObservableMessage,
},
Client,
},
network::NetworkOutput,
prelude::ObservableMessage,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerMessage {
AddClient(Uuid, Addr<Client>),
RemoveClient(Uuid),
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientManagerOutput {
UpdateRequest(Addr<ClientManager>),
}
pub struct ClientManager {
clients: HashMap<Uuid, Addr<Client>>,
delegate: WeakRecipient<ClientManagerOutput>,
}
impl ClientManager {
pub(crate) fn send_update(
&mut self,
ctx: &mut Context<Self>,
addr: WeakAddr<Client>,
) {
println!("[ClientManager] sending update to client");
use ClientMessage::SendUpdate;
let self_addr = ctx.address();
if let Some(to_send) = addr.upgrade() {
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
// .filter(|val| )
.collect();
let fut = wrap_future(async move {
let a: Vec<_> = collection.await;
to_send.send(SendUpdate(a)).await;
});
ctx.spawn(fut);
}
}
pub(crate) fn send_message_request(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
uuid: Uuid,
content: String,
) {
println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
.collect();
let fut = wrap_future(async move {
if let Some(sender) = sender.upgrade() {
let from: Uuid =
sender.send(ClientDataMessage).await.unwrap().0.uuid;
let client_details: Vec<ClientDetails> = collection.await;
let pos = client_details.iter().position(|i| i.uuid == from);
if let Some(pos) = pos {
sender.send(SendMessage { content, from }).await;
}
}
});
ctx.spawn(fut);
}
pub(crate) fn send_global_message_request(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
content: String,
) {
use ClientMessage::SendGlobalMessage;
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
if let Some(sender) = sender.upgrade() {
let fut = wrap_future(async move {
let from: Uuid =
sender.send(ClientDataMessage).await.unwrap().0.uuid;
let collection = tokio_stream::iter(client_addr)
.then(move |addr| {
addr.send(SendGlobalMessage {
content: content.clone(),
from,
})
})
.collect();
let a: Vec<_> = collection.await;
});
ctx.spawn(fut);
}
}
}
impl ClientManager {
pub(crate) fn new(
delegate: WeakRecipient<ClientManagerOutput>,
) -> Addr<Self> {
ClientManager {
delegate,
clients: HashMap::new(),
}
.start()
}
fn add_client(
&mut self,
ctx: &mut Context<ClientManager>,
uuid: Uuid,
addr: Addr<Client>,
) {
println!("[ClientManager] adding client");
use crate::prelude::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
addr.do_send(Subscribe(recp));
self.clients.insert(uuid, addr);
}
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] removing client");
use crate::prelude::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp));
}
}
}
impl Actor for ClientManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("[ClientManager] started");
}
}
impl Handler<ClientManagerMessage> for ClientManager {
type Result = ();
fn handle(
&mut self,
msg: ClientManagerMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerMessage::{AddClient, RemoveClient};
match msg {
// todo: Add subscription to the client.
AddClient(uuid, addr) => self.add_client(ctx, uuid, addr),
// todo: remove subscription to client.
RemoveClient(uuid) => self.remove_client(ctx, uuid),
}
}
}
impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(
&mut self,
msg: ClientObservableMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ClientObservableMessage::{
SendGlobalMessageRequest,
SendMessageRequest,
UpdateRequest,
};
match msg {
SendMessageRequest(addr, uuid, content) => {
self.send_message_request(ctx, addr, uuid, content)
}
SendGlobalMessageRequest(addr, content) => {
self.send_global_message_request(ctx, addr, content)
}
UpdateRequest(addr) => self.send_update(ctx, addr),
_ => todo!(),
}
}
}

View File

@ -0,0 +1,9 @@
mod client;
mod client_manager;
pub(crate) use client::Client;
pub(crate) use client_manager::{
ClientManager,
ClientManagerMessage,
ClientManagerOutput,
};

View File

@ -1,209 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use futures::future::join_all;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use uuid::Uuid;
use async_trait::async_trait;
use foundation::connection::Connection;
use foundation::prelude::IManager;
use crate::client::Client;
use crate::messages::ClientMessage;
#[derive(Debug)]
pub enum ClientMgrMessage {
#[allow(dead_code)]
Remove {
id: Uuid,
},
#[allow(dead_code)]
SendClients {
to: Uuid,
},
SendMessage {
from: Uuid,
to: Uuid,
content: String,
},
BroadcastGlobalMessage {
from: Uuid,
content: String,
},
}
impl From<ClientMessage> for ClientMgrMessage {
fn from(msg: ClientMessage) -> Self {
use ClientMessage::{Disconnect, IncomingGlobalMessage, IncomingMessage};
match msg {
IncomingMessage { from, to, content } => ClientMgrMessage::SendMessage { from, to, content },
IncomingGlobalMessage { from, content } => {
ClientMgrMessage::BroadcastGlobalMessage { from, content }
}
Disconnect { id } => ClientMgrMessage::Remove { id },
_ => unimplemented!(),
}
}
}
/// # ClientManager
/// This struct manages all users connected to the server.
///
/// ## Attributes
/// - clients: a vector of all clients being managed.
/// - server_channel: a channel to the parent that manages this object.
/// - tx: the sender that clients will send their messages to.
/// - rx: the receiver where messages are sent to.
pub struct ClientManager<Out: 'static>
where
Out: From<ClientMgrMessage> + Send,
{
pub clients: Mutex<HashMap<Uuid, Arc<Client<ClientMgrMessage>>>>,
#[allow(dead_code)]
server_channel: Mutex<Sender<Out>>,
tx: Sender<ClientMgrMessage>,
rx: Mutex<Receiver<ClientMgrMessage>>,
}
impl<Out> ClientManager<Out>
where
Out: From<ClientMgrMessage> + Send,
{
pub fn new(out_channel: Sender<Out>) -> Arc<Self> {
let (tx, rx) = channel(1024);
Arc::new(ClientManager {
clients: Mutex::default(),
server_channel: Mutex::new(out_channel),
tx,
rx: Mutex::new(rx),
})
}
#[allow(dead_code)]
pub async fn get_count(&self) -> usize {
self.clients.lock().await.len()
}
pub async fn add_client(
&self,
id: Uuid,
username: String,
address: String,
connection: Arc<Connection>,
) {
let client = Client::new(id, username, address, self.tx.clone(), connection);
client.start();
let mut lock = self.clients.lock().await;
lock.insert(client.details.uuid, client);
}
#[allow(dead_code)]
pub async fn remove_client(&self, id: Uuid) {
let mut lock = self.clients.lock().await;
lock.remove(&id);
}
pub async fn handle_channel(&self, message: Option<ClientMgrMessage>) {
use ClientMgrMessage::{BroadcastGlobalMessage, Remove, SendClients, SendMessage};
println!("Handling channel");
match message {
Some(Remove { id }) => {
println!("[Client Manager]: removing client: {:?}", &id);
let mut lock = self.clients.lock().await;
lock.remove(&id);
}
Some(SendClients { to: _ }) => {
let lock = self.clients.lock().await;
let futures = lock
.iter()
.map(|(_, _)| async { println!("Send message to Client") });
join_all(futures).await;
}
Some(BroadcastGlobalMessage { from, content }) => {
let lock = self.clients.lock().await;
let futures =
lock
.iter()
.map(|(_, c)| (c.clone(), content.clone()))
.map(|(c, s)| async move {
c.broadcast_message(from, s).await.unwrap();
});
join_all(futures).await;
}
Some(SendMessage { from, to, content }) => {
let lock = self.clients.lock().await;
let client = lock.get(&to).unwrap();
let _ = client.user_message(from, content).await;
}
Some(Remove { id }) => {
self.clients.lock().await.remove(&id);
}
_ => {
unimplemented!()
}
}
}
}
#[async_trait]
impl<Out> IManager for ClientManager<Out>
where
Out: From<ClientMgrMessage> + Send,
{
async fn run(self: &Arc<Self>) {
loop {
let mut receiver = self.rx.lock().await;
select! {
val = receiver.recv() => {
self.handle_channel(val).await;
}
}
}
}
}
#[cfg(test)]
mod test {
use crate::client_manager::{ClientManager, ClientMgrMessage};
use foundation::messages::client::ClientStreamOut;
use foundation::prelude::IManager;
use foundation::test::create_connection_pair;
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
#[tokio::test]
async fn add_new_client_to_manager() -> Result<(), Error> {
let (sender, mut receiver) = channel::<ClientMgrMessage>(1024);
let (server, (client, addr)) = create_connection_pair().await?;
let client_manager = ClientManager::new(sender);
client_manager.start();
let id = Uuid::new_v4();
let username = "TestUser".to_string();
client_manager
.add_client(id, username.clone(), addr.to_string(), server)
.await;
assert_eq!(client_manager.get_count().await, 1);
let msg = client.read::<ClientStreamOut>().await?;
assert_eq!(msg, ClientStreamOut::Connected);
Ok(())
}
}

View File

@ -1,10 +0,0 @@
use crate::client::Client;
use std::sync::Arc;
use uuid::Uuid;
pub enum EventType<'a> {
NewConnection,
// Todo: - change client to use traits
ClientAdded(Uuid),
Custom(&'a str),
}

View File

@ -1,11 +0,0 @@
// mod chat_manager;
mod client;
mod client_manager;
mod event_type;
mod lua;
mod messages;
mod network_manager;
// pub mod plugin;
mod server;
pub use server::Server;

View File

@ -1,23 +1,20 @@
// pub mod chat_manager;
pub mod client;
pub mod client_manager;
mod event_type;
mod lua;
pub mod messages;
pub mod network_manager;
// mod plugin;
pub mod server;
//! # actor
//! This is the main module of the actix server.
//! It starts the actor runtime and then sleeps
//! for the duration of the program.
use std::io;
use clap::{App, Arg};
pub(crate) mod server;
pub(crate) mod client_management;
pub(crate) mod network;
pub(crate) mod prelude;
use server::Server;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> io::Result<()> {
let server = Server::new().await.unwrap();
server.start().await;
Ok(())
#[actix::main()]
async fn main() {
let _server = Server::new();
loop {
sleep(Duration::from_millis(1000)).await;
}
}

View File

@ -1,52 +0,0 @@
use uuid::Uuid;
/// # ClientMessage
///
/// These messages are send from the client to a receiver
/// when events from the client happen that need to be delegated
///
/// ## Variants
///
///
/// ## Methods
///
#[derive(Debug)]
pub enum ClientMessage {
#[allow(dead_code)]
Connected,
#[allow(dead_code)]
IncomingMessage {
from: Uuid,
to: Uuid,
content: String,
},
#[allow(dead_code)]
IncomingGlobalMessage {
from: Uuid,
content: String,
},
#[allow(dead_code)]
RequestedUpdate {
from: Uuid,
},
Disconnect {
id: Uuid,
},
Error,
}
impl PartialEq for ClientMessage {
fn eq(&self, other: &Self) -> bool {
use ClientMessage::{Connected, Disconnect, Error};
match (self, other) {
(Connected, Connected) => true,
(Error, Error) => true,
(Disconnect { id, .. }, Disconnect { id: other_id, .. }) => id == other_id,
_ => false,
}
}
}

View File

@ -0,0 +1,216 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc};
use actix::{
fut::wrap_future,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use futures::{future::join_all, Future, FutureExt};
use serde::Serialize;
use tokio::{
io::{
split,
AsyncBufReadExt,
AsyncWriteExt,
BufReader,
ReadHalf,
WriteHalf,
},
net::TcpStream,
sync::Mutex,
};
use crate::prelude::ObservableMessage;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionOuput {
RecvData(Addr<Connection>, SocketAddr, String),
ConnectionClosed(Addr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
enum SelfMessage {
UpdateObserversWithData(String),
}
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<Recipient<ConnectionOuput>>,
loop_future: Option<SpawnHandle>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(super) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
loop_future: None,
}
.start()
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let addr = ctx.address();
let read_half = self
.read_half
.take()
.expect("What the hell did yu do wrong");
ctx.spawn(wrap_future(async move {
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
while let Ok(len) = reader.read_line(&mut buffer_string).await {
use ConnectionMessage::CloseConnection;
use SelfMessage::UpdateObserversWithData;
if len == 0 {
println!("[Connection] connection closed");
addr.send(CloseConnection).await.expect(
"[Connection] failed to send close message to self",
);
return;
}
println!("[Connection] read line");
addr.send(UpdateObserversWithData(buffer_string.clone()))
.await;
buffer_string.clear();
}
}));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionOuput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
recp.do_send(ConnectionClosed(ctx.address()));
}
}
}
impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionOuput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionOuput>>>::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = self.write_half.clone();
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
writeln!(&mut buffer, "{}", d.as_str());
lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
impl Handler<SelfMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: SelfMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::RecvData;
use SelfMessage::UpdateObserversWithData;
match msg {
UpdateObserversWithData(data) => {
let send = ctx.address();
let addr = self.address.clone();
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let send = send.clone();
let data = data.clone();
async move {
let _ = r.send(RecvData(send, addr, data)).await;
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
};
}
}

View File

@ -0,0 +1,164 @@
use std::net::SocketAddr;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::{connection::ConnectionOuput, Connection, ConnectionMessage},
prelude::ObservableMessage,
};
#[derive(Debug, Clone, Copy)]
enum ConnectionPhase {
Started,
Requested,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(Addr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(Addr<ConnectionInitiator>, Addr<Connection>, ClientDetails),
}
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: Addr<Connection>,
ctx: &mut <Self as Actor>::Context,
address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
use NetworkSockOut::{Connecting, GotInfo};
use ObservableMessage::Unsubscribe;
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let Some(delegate) = self.delegate.upgrade() {
match msg {
Info => delegate.do_send(InfoRequest(ctx.address(), sender)),
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error)
.expect("failed to convert error to string"),
));
sender.do_send(CloseConnection);
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
use super::ConnectionMessage::SendData;
println!("[ConnectionInitiator] started");
self.connection
.do_send(Subscribe(ctx.address().recipient()));
self.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self.connection
.do_send(Unsubscribe(ctx.address().recipient()));
}
}
impl Handler<ConnectionOuput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionOuput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::{ConnectionClosed, RecvData};
use ConnectionPhase::Requested;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}

View File

@ -0,0 +1,108 @@
use std::net::{SocketAddr, ToSocketAddrs};
use actix::{
fut::wrap_future,
Actor,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use tokio::net::TcpListener;
use crate::network::{
connection::Connection,
ConnectionInitiator,
InitiatorOutput,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerOutput {
NewConnection(Addr<Connection>),
}
pub(super) struct NetworkListener {
address: SocketAddr,
delegate: Recipient<ListenerOutput>,
looper: Option<SpawnHandle>,
}
impl NetworkListener {
pub(crate) fn new<T: ToSocketAddrs>(
address: T,
delegate: Recipient<ListenerOutput>,
) -> Addr<NetworkListener> {
NetworkListener {
address: address
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0],
delegate,
looper: None,
}
.start()
}
/// called when the actor is to start listening
fn start_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("[NetworkListener] started listening");
let addr = self.address.clone();
let self_addr = ctx.address();
let delegate = self.delegate.clone();
let loop_future = ctx.spawn(wrap_future(async move {
use ListenerOutput::NewConnection;
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
println!("[NetworkListener] accepted socket");
let conn = Connection::new(stream, addr);
delegate.do_send(NewConnection(conn));
}
}));
}
/// called when the actor is to stop listening
fn stop_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("[NetworkListener] stopped listening");
if let Some(fut) = self.looper.take() {
ctx.cancel_future(fut);
}
}
}
impl Actor for NetworkListener {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] started");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] stopped");
}
}
impl Handler<ListenerMessage> for NetworkListener {
type Result = ();
fn handle(
&mut self,
msg: ListenerMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> Self::Result {
use ListenerMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listening(ctx),
StopListening => self.stop_listening(ctx),
}
}
}

42
server/src/network/mod.rs Normal file
View File

@ -0,0 +1,42 @@
//! # Network
//!
//! This module contains network code for the server.
//!
//! This includes:
//! - The network manager: For that handles all server network connections.
//! - The network listener: For listening for connections on a port.
//! - The conneciton: An abstraction over sockets sockets, for actix.
//! - The connection initiator: For initiating new connections to the server
//!
//! ## Diagrams
//!
//! ```mermaid
//! sequenceDiagram
//! Server->>NetworkManager: creates
//! NetworkManager->>NetworkListener: create
//! NetworkManager->>+NetworkListener: start listening
//!
//! loop async tcp listen
//! NetworkListener->>NetworkListener: check for new connections
//! end
//!
//! NetworkListener->>Connection: create from socket
//! NetworkListener->>NetworkManager: new connection
//! NetworkManager->>Server: new connection
//!
//! Server->>ConnectionInitiator: create with connection
//! ```
mod connection;
mod connection_initiator;
mod listener;
mod network_manager;
pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{
NetworkManager,
NetworkMessage,
NetworkOutput,
};

View File

@ -0,0 +1,189 @@
//! # network_manager
//! This module contains the network manager actor
//! it's role involves handling new oncomming network connections
use actix::{
Actor,
Addr,
AsyncContext,
Context,
Handler,
Message,
WeakRecipient,
};
use foundation::ClientDetails;
use crate::network::{
listener::ListenerOutput,
Connection,
ConnectionInitiator,
InitiatorOutput,
InitiatorOutput::ClientRequest,
ListenerMessage,
NetworkListener,
};
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub enum NetworkMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum NetworkOutput {
NewClient(Addr<Connection>, ClientDetails),
InfoRequested(Addr<Connection>),
}
pub struct NetworkManager {
listener_addr: Option<Addr<NetworkListener>>,
delegate: WeakRecipient<NetworkOutput>,
initiators: Vec<Addr<ConnectionInitiator>>,
}
impl NetworkManager {
pub fn new(delegate: WeakRecipient<NetworkOutput>) -> Addr<NetworkManager> {
NetworkManager {
listener_addr: None,
delegate,
initiators: Vec::new(),
}
.start()
}
fn start_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StartListening;
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StartListening);
}
}
fn stop_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StopListening;
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StopListening);
}
}
/// Handles a new connection from the Listener.
/// This creates a new ConnectionInitaliser.
/// This completes the first part of the protocol.
#[inline]
fn new_connection(
&mut self,
ctx: &mut <Self as Actor>::Context,
connection: Addr<Connection>,
) {
println!("[NetworkManager] Got new connection");
let init = ConnectionInitiator::new(
ctx.address().recipient().downgrade(),
connection,
);
self.initiators.push(init);
}
#[inline]
fn remove_initiator(&mut self, sender: Addr<ConnectionInitiator>) {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
self.initiators.remove(index);
}
/// handles a initiator client request
/// this will, forward the conenction and client details
/// to the server actor to be dispatched to the appropriate
/// manager
#[inline]
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
connection: Addr<Connection>,
client_details: ClientDetails,
) {
use NetworkOutput::NewClient;
println!("[NetworkManager] recieved client request");
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(NewClient(connection, client_details));
}
self.remove_initiator(sender);
}
/// This sends the connection to the server
/// which will in turn take over the protocol by sending
/// the servers infomation.
#[inline]
fn info_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
connection: Addr<Connection>,
) {
use NetworkOutput::InfoRequested;
println!("[NetworkManager] Got recieved info request");
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(InfoRequested(connection));
}
self.remove_initiator(sender);
}
}
impl Actor for NetworkManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("started network manager");
let recipient = ctx.address().recipient();
self.listener_addr
.replace(NetworkListener::new("0.0.0.0:5600", recipient));
}
}
impl Handler<NetworkMessage> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: NetworkMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> <Self as Handler<NetworkMessage>>::Result {
use NetworkMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listener(ctx),
StopListening => self.stop_listener(ctx),
}
}
}
impl Handler<ListenerOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: ListenerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ListenerOutput::NewConnection;
match msg {
NewConnection(connection) => self.new_connection(ctx, connection),
};
}
}
impl Handler<InitiatorOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(sender, addr, client_details) => {
self.client_request(ctx, sender, addr, client_details)
}
InfoRequest(sender, addr) => self.info_request(ctx, sender, addr),
}
}
}

View File

@ -1,257 +0,0 @@
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use uuid::Uuid;
use async_trait::async_trait;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use foundation::connection::Connection;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
#[derive(Debug)]
pub enum NetworkManagerMessage {
ClientConnecting {
uuid: Uuid,
address: String,
username: String,
connection: Arc<Connection>,
},
}
impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::ClientConnecting;
match (self, other) {
(
ClientConnecting {
uuid,
address,
username,
..
},
ClientConnecting {
uuid: other_uuid,
address: other_address,
username: other_username,
..
},
) => uuid == other_uuid && address == other_address && username == other_username,
#[allow(unreachable_patterns)]
_ => false,
}
}
}
/// # NetworkManager
///
/// This handles all new incoming connections to the server, involved with the chat services.
///
/// ## Fields
/// - address: the socket address that the server is listening on.
/// - listener: the TcpListener that is receiving connections.
/// - out_channel: the channel that will be sent events from NetworkManager.
pub struct NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send,
{
listener: Mutex<TcpListener>,
out_channel: Sender<Out>,
}
impl<Out> NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send,
{
pub async fn new(
address: &str,
out_channel: Sender<Out>,
) -> Result<Arc<NetworkManager<Out>>, Error> {
let listener = TcpListener::bind(address).await?;
Ok(Arc::new(NetworkManager {
listener: Mutex::new(listener),
out_channel,
}))
}
/// This fetches the port from the NetworkManager
pub async fn port(&self) -> u16 {
self.listener.lock().await.local_addr().unwrap().port()
}
/// This fetches the IP address from the NetworkManager
#[allow(dead_code)]
pub async fn address(&self) -> String {
self
.listener
.lock()
.await
.local_addr()
.unwrap()
.ip()
.to_string()
}
async fn handle_connection(&self, connection: Arc<Connection>) -> Result<(), Error> {
use NetworkSockIn::{Connect, Info};
use NetworkSockOut::{Connecting, GotInfo, Request};
connection.write(Request).await?;
match connection.read().await? {
Info => {
connection
.write(GotInfo {
server_name: "TestServer".into(),
server_owner: "Michael".into(),
})
.await?
}
Connect {
uuid,
address,
username,
} => {
connection.write(Connecting).await?;
let _ = self
.out_channel
.send(
NetworkManagerMessage::ClientConnecting {
uuid,
address,
username,
connection,
}
.into(),
)
.await;
}
#[allow(unreachable_patterns)]
_ => {
return Err(Error::new(
ErrorKind::InvalidData,
"Did not receive valid message",
));
}
}
Ok(())
}
}
#[async_trait]
impl<Out: 'static> IManager for NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send,
{
async fn run(self: &Arc<Self>) {
let lock = self.listener.lock().await;
select! {
val = lock.accept() => {
if let Ok((stream, _addr)) = val {
let conn = self.clone();
tokio::spawn(async move {
let _ = conn.handle_connection(Arc::new(stream.into())).await;
});
}
}
}
}
}
#[cfg(test)]
mod test {
use crate::network_manager::{
NetworkManager, NetworkManagerMessage, NetworkManagerMessage::ClientConnecting,
};
use foundation::connection::Connection;
use foundation::messages::network::NetworkSockIn::{Connect, Info};
use foundation::messages::network::NetworkSockOut;
use foundation::messages::network::NetworkSockOut::{Connecting, GotInfo, Request};
use foundation::prelude::IManager;
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
#[tokio::test]
async fn test_network_fetch_info() -> Result<(), Error> {
let (tx, _rx) = channel::<NetworkManagerMessage>(16);
let network_manager = NetworkManager::new("localhost:0", tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
client.write(Info).await?;
let out = client.read::<NetworkSockOut>().await?;
assert_eq!(
out,
GotInfo {
server_owner: "Michael".into(),
server_name: "TestServer".into()
}
);
Ok(())
}
#[tokio::test]
async fn test_network_login() -> Result<(), Error> {
let (tx, mut rx) = channel::<NetworkManagerMessage>(16);
let network_manager = NetworkManager::new("localhost:0", tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
// construct client data
let uuid = Uuid::new_v4();
let address = "localhost";
let username = "TestUser";
client
.write(Connect {
uuid,
address: address.to_string(),
username: username.to_string(),
})
.await?;
let res: NetworkSockOut = client.read().await?;
assert_eq!(res, Connecting);
let network_out = rx.recv().await.unwrap();
assert_eq!(
network_out,
ClientConnecting {
uuid,
address: address.to_string(),
username: username.to_string(),
connection: client
}
);
Ok(())
}
}

18
server/src/prelude/mod.rs Normal file
View File

@ -0,0 +1,18 @@
//! # prelude
//! A module that coalesces different types into one module of defined structure
mod observer;
pub mod actors {
//! exports all actors used in the program.
pub use crate::server::Server;
pub(crate) use crate::network::{Connection, ConnectionInitiator, NetworkManager};
pub(crate) use crate::client_management::{Client,ClientManager};
}
pub mod messages {
//! exports all messages used in the program.
pub(crate) use super::observer::ObservableMessage;
pub(crate) use crate::network::{NetworkMessage,NetworkOutput,ConnectionMessage,ConnectionOuput};
pub(crate) use crate::client_management::{ClientManagerOutput,ClientManagerMessage};
}

View File

@ -0,0 +1,17 @@
//! # observer.rs
//! crates a message type for the observer pattern.
use actix::{Message, Recipient};
/// # ObservableMessage
/// represents common messages for observers
#[derive(Message)]
#[rtype(result = "()")]
pub enum ObservableMessage<M>
where
M: Message + Send,
M::Result: Send,
{
Subscribe(Recipient<M>),
Unsubscribe(Recipient<M>),
}

View File

@ -1,154 +1,138 @@
use std::io::Error;
use std::sync::Arc;
//! # actix_server
//! this holds the server actor
//! the server acts as teh main actor
//! and supervisor to the actor system.
use uuid::Uuid;
use tokio::sync::{
mpsc::{channel, Receiver},
Mutex,
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
};
use foundation::{messages::network::NetworkSockOut, ClientDetails};
// use crate::plugin::{PluginManager, PluginManagerMessage};
use crate::{
client_manager::{ClientManager, ClientMgrMessage},
network_manager::{NetworkManager, NetworkManagerMessage},
client_management::{
Client,
ClientManager,
ClientManagerMessage,
ClientManagerOutput,
},
network::{
Connection,
ConnectionInitiator,
ConnectionMessage,
NetworkManager,
NetworkMessage,
NetworkOutput,
},
};
#[derive(Debug, Clone)]
pub enum ServerMessage {
ClientConnected {
uuid: Uuid,
address: String,
username: String,
connection: Arc<Connection>,
},
BroadcastGlobalMessage {
from: Uuid,
content: String,
},
}
impl From<NetworkManagerMessage> for ServerMessage {
fn from(msg: NetworkManagerMessage) -> Self {
use NetworkManagerMessage::ClientConnecting;
match msg {
ClientConnecting {
uuid,
address,
username,
connection,
} => ServerMessage::ClientConnected {
uuid,
address,
username,
connection,
},
#[allow(unreachable_patterns)]
_ => unimplemented!(),
}
}
}
impl From<ClientMgrMessage> for ServerMessage {
fn from(msg: ClientMgrMessage) -> Self {
use ClientMgrMessage::BroadcastGlobalMessage;
match msg {
BroadcastGlobalMessage { from, content } => {
ServerMessage::BroadcastGlobalMessage { from, content }
}
_ => unimplemented!(),
}
}
}
// impl From<PluginManagerMessage> for ServerMessage {
// fn from(_: PluginManagerMessage) -> Self {
// todo!()
// }
// }
/// # Server
/// authors: @michael-bailey, @Mitch161
/// This Represents a server instance.
/// It is composed of a client manager and a network manager.
///
/// # Attributes
/// - client_manager: The servers client manager.
/// - network_manager: The servers network manager.
/// - receiver: The servers channel for communication by managers.
/// - lua: The servers lua context, used for running lua scripts.
///
/// This struct is the main actor of the server.
/// all other actors are ran through here.
pub struct Server {
pub client_manager: Arc<ClientManager<ServerMessage>>,
network_manager: Arc<NetworkManager<ServerMessage>>,
// plugin_manager: Arc<PluginManager<ServerMessage>>,
receiver: Mutex<Receiver<ServerMessage>>,
network_manager: Option<Addr<NetworkManager>>,
client_management: Option<Addr<ClientManager>>,
}
impl Server {
/// Create a new server object
pub async fn new() -> Result<Arc<Server>, Error> {
let (sender, receiver) = channel(1024);
pub(crate) fn new() -> Addr<Self> {
Server {
network_manager: None,
client_management: None,
}
.start()
}
let server = Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("0.0.0.0:5600", sender.clone()).await?,
// plugin_manager: PluginManager::new(sender),
receiver: Mutex::new(receiver),
pub(crate) fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
addr: Addr<Connection>,
details: ClientDetails,
) {
use ClientManagerMessage::AddClient;
if let Some(mgr) = self.client_management.as_ref() {
let client = Client::new(addr, details.clone());
mgr.do_send(AddClient(details.uuid, client));
}
}
pub(crate) fn info_request(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkSockOut::GotInfo;
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
server_name: "String".to_owned(),
server_owner: "String".to_owned(),
})
.expect("Failed to serialise"),
)),
)
// equivalent to using .then() in js
.map(move |_out, _act: &mut Self, _ctx| {
sender.do_send(CloseConnection);
});
Ok(server)
ctx.spawn(fut);
}
}
pub async fn port(self: &Arc<Server>) -> u16 {
self.network_manager.port().await
}
impl Actor for Server {
type Context = Context<Self>;
pub async fn start(self: &Arc<Server>) {
// start client manager and network manager
self.network_manager.clone().start();
self.client_manager.clone().start();
// let _ = self.plugin_manager.clone().load().await;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
// clone block items
let server = self.clone();
self.network_manager
.replace(NetworkManager::new(addr.clone().recipient().downgrade()));
loop {
let mut lock = server.receiver.lock().await;
if let Some(message) = lock.recv().await {
println!("[server]: received message {:?}", &message);
self.client_management.replace(ClientManager::new(
addr.clone().recipient::<ClientManagerOutput>().downgrade(),
));
match message {
ServerMessage::ClientConnected {
uuid,
address,
username,
connection,
} => {
server
.client_manager
.add_client(uuid, username, address, connection)
.await
}
ServerMessage::BroadcastGlobalMessage {
from: _,
content: _,
} => {
// server
// .client_manager
// .clone()
// .send_message(
// ClientMgrMessage::BroadcastGlobalMessage {sender, content}
// ).await
}
#[allow(unreachable_patterns)]
_ => {
unimplemented!()
}
}
}
if let Some(net_mgr) = self.network_manager.as_ref() {
net_mgr.do_send(NetworkMessage::StartListening);
}
}
}
impl Handler<NetworkOutput> for Server {
type Result = ();
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkOutput::{InfoRequested, NewClient};
use NetworkSockOut::GotInfo;
println!("[ServerActor] received message");
match msg {
// This uses promise like funcionality to queue
// a set of async operations,
// so they occur in the right order
InfoRequested(sender) => self.info_request(ctx, sender),
// A new client is to be added
NewClient(addr, details) => self.client_request(ctx, addr, details),
};
}
}
impl Handler<ClientManagerOutput> for Server {
type Result = ();
fn handle(
&mut self,
msg: ClientManagerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
todo!()
}
}