diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..a8e7835 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +.gitignore + +/target +/docs +/.vscode +/.idea +/.github \ No newline at end of file diff --git a/.github/workflows/create-docker-image.yml b/.github/workflows/create-docker-image.yml new file mode 100644 index 0000000..96e61eb --- /dev/null +++ b/.github/workflows/create-docker-image.yml @@ -0,0 +1,48 @@ +# +name: create-docker-image + +# Configures this workflow to run every time a change is pushed to the branch called `release`. +on: + push: + branches: ['master'] + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. +jobs: + build-and-push-image: + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + # + steps: + - name: Checkout repository + uses: actions/checkout@v4 + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. + # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. + # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. + - name: Build and push Docker image + uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml deleted file mode 100644 index edf72fb..0000000 --- a/.github/workflows/rust.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: Rust - -on: - push: - branches: [ ref-method ] - pull_request: - branches: [ master ] - -env: - CARGO_TERM_COLOR: always - -jobs: - build: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - - steps: - - uses: actions/checkout@v2 - - name: check - run: cargo check --verbose - - name: Build - run: cargo build --verbose - diff --git a/.gitignore b/.gitignore index ad414a9..a7ac9da 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ Cargo.lock *.cer *.pem .vscode/settings.json +*.dylib +config_file.toml diff --git a/Cargo.toml b/Cargo.toml index 00e10ba..fca85e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ 'foundation', 'server', 'client', - 'serverctl' -] \ No newline at end of file + 'serverctl', + 'example_plugin' +] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8f5148c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +# First stage: build the server file. +FROM rust:alpine AS build + +RUN apk add musl-dev + + +RUN apk upgrade --update-cache --available && \ + apk add openssl-dev && \ + rm -rf /var/cache/apk/* + +COPY . . +CMD ["cargo", "run", "--release", "--bin", "server"] diff --git a/README.md b/README.md index 260be15..de85e4c 100644 --- a/README.md +++ b/README.md @@ -1,32 +1 @@ -# Rust-chat-server - -A Chat server writen in rust to allow communication between peers. - ---- - -## Features: -- implemented: - - json based API. - - Server introspection. - - Peer discovery. - - sending messages to connected clients. - - -- todo: - - Encryption to server. - - server to server meshing. - - asynchronous client managment instead of threaded approach. - -## Goals: -- Learn the rust programming lanaguage. - - Ownership: how that affects normal programming styles. - - Borrowing and references: how this affects shared state. - - Lifetimes: how this affects data retention and sharing. -- Learn how to create networked programs. - - Application level protocol: how to get two programs to communicate via TCP sockets. - - Socket handling: Discovering ways to handle multiple socket connections without affecting performance. -- Learn common encryption protocols. - - Adding support for encrypted sockets. - - Pros and cons of symetric and asymetric encryption. - - resolving common encryption flaws - -> Questions: For questions please add a issue with the question label. It will eventually be responded to +# rust-chat-server \ No newline at end of file diff --git a/client/Cargo.toml b/client/Cargo.toml index bd28415..c36c13a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -7,3 +7,16 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +cursive = "0.17" +uuid = {version = "1.1.2", features = ["serde", "v4"]} +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +crossbeam = "0.8.0" +crossbeam-channel = "0.5.0" +tokio = { version = "1.9.0", features = ["full"] } +futures = "0.3.16" + +async-trait = "0.1.52" + +server = {path = '../server'} +foundation = {path = '../foundation'} diff --git a/client/src/main.rs b/client/src/main.rs index a30eb95..f88edec 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,3 +1,34 @@ +mod managers; +mod worker; +mod worker_message; + +use cursive::{ + menu::{Item, Tree}, + traits::Nameable, + views::{Dialog, TextView}, + Cursive, + CursiveExt, +}; +use worker::Worker; + fn main() { - println!("Hello, world!"); + let mut app = Cursive::default(); + let worker_stream = Worker::new(app.cb_sink().clone()).start(); + + app.set_user_data(worker_stream); + app.add_layer( + Dialog::new() + .content(TextView::new("Hello world").with_name("TextView")) + .button("close", |s| s.quit()), + ); + app.menubar().autohide = false; + app.menubar().add_subtree( + "Application", + Tree::new() + .item(Item::leaf("About", |s| s.quit())) + .delimiter() + .item(Item::leaf("Quit", |s| s.quit())), + ); + app.set_fps(30); + app.run(); } diff --git a/client/src/managers/Network.rs b/client/src/managers/Network.rs new file mode 100644 index 0000000..ceb6f1b --- /dev/null +++ b/client/src/managers/Network.rs @@ -0,0 +1,221 @@ +use std::{ + io::{Error, ErrorKind}, + mem, + sync::{atomic::AtomicBool, Arc}, +}; + +use async_trait::async_trait; +use foundation::{ + connection::Connection, + messages::{ + client::{ClientStreamIn, ClientStreamOut}, + network::{NetworkSockIn, NetworkSockOut}, + }, + prelude::IManager, +}; +use tokio::{ + net::ToSocketAddrs, + sync::{mpsc::Sender, Mutex}, +}; +use uuid::Uuid; + +use crate::managers::NetworkManagerMessage; + +pub struct NetworkManager +where + M: From, +{ + #[allow(unused)] + server_connection: Mutex>>, + + #[allow(unused)] + cursive: Sender, + + is_logged_in: AtomicBool, +} + +impl NetworkManager +where + M: From, +{ + pub fn new(sender: Sender) -> Arc { + Arc::new(NetworkManager { + server_connection: Mutex::new(None), + cursive: sender, + is_logged_in: AtomicBool::new(false), + }) + } + + #[allow(unused)] + pub async fn info( + self: &Arc, + host: T, + ) -> Result { + let connection = Connection::new(); + println!("Created connection"); + connection.connect(host).await?; + let req = connection.read().await?; + + println!("request: {:?}", req); + + if let NetworkSockOut::Request = req { + connection + .write::(NetworkSockIn::Info) + .await?; + return Ok(connection.read::().await?.into()); + } else { + Err(Error::new( + ErrorKind::ConnectionAborted, + "Request not received", + )) + } + } + + #[allow(unused)] + pub async fn login( + self: &Arc, + host: String, + uuid: Uuid, + username: String, + address: String, + ) -> Result<(), Error> { + let connection = Connection::new(); + + let _ = connection.connect(host).await?; + + println!("created connection"); + + let req = connection.read().await?; + + println!("read request"); + + return if let NetworkSockOut::Request = req { + println!("got request"); + + connection + .write(NetworkSockIn::Connect { + username, + uuid, + address, + }) + .await?; + let res = connection.read().await?; + + // switch over to ClientStreamOut + if let ClientStreamOut::Connected = res { + let mut connection_lock = self.server_connection.lock().await; + let _ = mem::replace(&mut *connection_lock, Some(connection)); + Ok(()) + } else { + Err(Error::new( + ErrorKind::ConnectionRefused, + format!("expected connecting received: {:?}", res), + )) + } + } else { + println!("request not found"); + Err(Error::new( + ErrorKind::ConnectionAborted, + "Server did not send request", + )) + }; + } + + #[allow(unused)] + pub async fn logout(self: &Arc) -> Result<(), Error> { + let mut connection_lock = self.server_connection.lock().await; + let connection = mem::replace(&mut *connection_lock, None).unwrap(); + + connection.write(ClientStreamIn::Disconnect).await?; + + return if let ClientStreamOut::Disconnected = connection.read().await? { + Ok(()) + } else { + Err(Error::new( + ErrorKind::InvalidData, + "disconnect failed, forcing disconnect", + )) + }; + } +} + +#[async_trait] +impl IManager for NetworkManager +where + M: From + Send, +{ + async fn run(self: &Arc) { + println!("networkManager tick") + } +} + +#[cfg(test)] +mod test { + use std::future::Future; + + use serverlib::Server; + use tokio::sync::mpsc::channel; + use uuid::Uuid; + + use crate::managers::{network::NetworkManagerMessage, NetworkManager}; + + async fn wrap_setup(test: T) + where + T: FnOnce(u16) -> F, + F: Future, + { + let server = Server::new().await.unwrap(); + let port = server.port().await; + + tokio::spawn(async move { + server.start().await; + }); + test(port).await; + } + #[tokio::test] + async fn test_fetch_server_info() { + use NetworkManagerMessage::Info; + #[allow(unused)] + let (tx, rx) = channel::(16); + + wrap_setup(|port| async move { + let network = NetworkManager::new(tx); + let info = network + .info(format!("localhost:{}", port)) + .await + .expect("Failed to fetch info"); + assert_eq!( + info, + Info { + server_name: "oof".to_string(), + server_owner: "michael".to_string() + } + ); + }) + .await; + } + #[tokio::test] + async fn test_login_and_logout_to_server() { + #[allow(unused)] + let (tx, rx) = channel::(16); + + let network = NetworkManager::new(tx); + + println!("created network manger"); + + wrap_setup(|port| async move { + network + .login( + format!("localhost:{}", port), + Uuid::default(), + "user1".to_string(), + "localhost".to_string(), + ) + .await + .expect("login failed"); + + network.logout().await.expect("logout failed"); + }) + .await; + } +} diff --git a/client/src/managers/message.rs b/client/src/managers/message.rs new file mode 100644 index 0000000..39c43af --- /dev/null +++ b/client/src/managers/message.rs @@ -0,0 +1,51 @@ +use foundation::{messages::network::NetworkSockOut, ClientDetails}; + +#[derive(Debug)] +pub enum NetworkManagerMessage { + #[allow(unused)] + Users(Vec), + Info { + server_name: String, + server_owner: String, + }, + Error(&'static str), +} + +impl From for NetworkManagerMessage { + fn from(other: NetworkSockOut) -> Self { + use NetworkManagerMessage::{Error, Info as NewInfo}; + use NetworkSockOut::GotInfo as OldInfo; + match other { + OldInfo { + server_name, + server_owner, + } => NewInfo { + server_name, + server_owner, + }, + _ => Error("Error occurred with conversion"), + } + } +} + +impl PartialEq for NetworkManagerMessage { + fn eq(&self, other: &Self) -> bool { + use NetworkManagerMessage::Info; + match self { + Info { + server_owner, + server_name, + } => { + if let Info { + server_owner: other_owner, + server_name: other_name, + } = other + { + return server_owner == other_owner && server_name == other_name; + } + false + } + _ => false, + } + } +} diff --git a/client/src/managers/mod.rs b/client/src/managers/mod.rs new file mode 100644 index 0000000..52b3057 --- /dev/null +++ b/client/src/managers/mod.rs @@ -0,0 +1,7 @@ +mod network; + +#[path = "message.rs"] +mod message; + +pub use message::NetworkManagerMessage; +pub use network::NetworkManager; diff --git a/client/src/managers/network.rs b/client/src/managers/network.rs new file mode 100644 index 0000000..ceb6f1b --- /dev/null +++ b/client/src/managers/network.rs @@ -0,0 +1,221 @@ +use std::{ + io::{Error, ErrorKind}, + mem, + sync::{atomic::AtomicBool, Arc}, +}; + +use async_trait::async_trait; +use foundation::{ + connection::Connection, + messages::{ + client::{ClientStreamIn, ClientStreamOut}, + network::{NetworkSockIn, NetworkSockOut}, + }, + prelude::IManager, +}; +use tokio::{ + net::ToSocketAddrs, + sync::{mpsc::Sender, Mutex}, +}; +use uuid::Uuid; + +use crate::managers::NetworkManagerMessage; + +pub struct NetworkManager +where + M: From, +{ + #[allow(unused)] + server_connection: Mutex>>, + + #[allow(unused)] + cursive: Sender, + + is_logged_in: AtomicBool, +} + +impl NetworkManager +where + M: From, +{ + pub fn new(sender: Sender) -> Arc { + Arc::new(NetworkManager { + server_connection: Mutex::new(None), + cursive: sender, + is_logged_in: AtomicBool::new(false), + }) + } + + #[allow(unused)] + pub async fn info( + self: &Arc, + host: T, + ) -> Result { + let connection = Connection::new(); + println!("Created connection"); + connection.connect(host).await?; + let req = connection.read().await?; + + println!("request: {:?}", req); + + if let NetworkSockOut::Request = req { + connection + .write::(NetworkSockIn::Info) + .await?; + return Ok(connection.read::().await?.into()); + } else { + Err(Error::new( + ErrorKind::ConnectionAborted, + "Request not received", + )) + } + } + + #[allow(unused)] + pub async fn login( + self: &Arc, + host: String, + uuid: Uuid, + username: String, + address: String, + ) -> Result<(), Error> { + let connection = Connection::new(); + + let _ = connection.connect(host).await?; + + println!("created connection"); + + let req = connection.read().await?; + + println!("read request"); + + return if let NetworkSockOut::Request = req { + println!("got request"); + + connection + .write(NetworkSockIn::Connect { + username, + uuid, + address, + }) + .await?; + let res = connection.read().await?; + + // switch over to ClientStreamOut + if let ClientStreamOut::Connected = res { + let mut connection_lock = self.server_connection.lock().await; + let _ = mem::replace(&mut *connection_lock, Some(connection)); + Ok(()) + } else { + Err(Error::new( + ErrorKind::ConnectionRefused, + format!("expected connecting received: {:?}", res), + )) + } + } else { + println!("request not found"); + Err(Error::new( + ErrorKind::ConnectionAborted, + "Server did not send request", + )) + }; + } + + #[allow(unused)] + pub async fn logout(self: &Arc) -> Result<(), Error> { + let mut connection_lock = self.server_connection.lock().await; + let connection = mem::replace(&mut *connection_lock, None).unwrap(); + + connection.write(ClientStreamIn::Disconnect).await?; + + return if let ClientStreamOut::Disconnected = connection.read().await? { + Ok(()) + } else { + Err(Error::new( + ErrorKind::InvalidData, + "disconnect failed, forcing disconnect", + )) + }; + } +} + +#[async_trait] +impl IManager for NetworkManager +where + M: From + Send, +{ + async fn run(self: &Arc) { + println!("networkManager tick") + } +} + +#[cfg(test)] +mod test { + use std::future::Future; + + use serverlib::Server; + use tokio::sync::mpsc::channel; + use uuid::Uuid; + + use crate::managers::{network::NetworkManagerMessage, NetworkManager}; + + async fn wrap_setup(test: T) + where + T: FnOnce(u16) -> F, + F: Future, + { + let server = Server::new().await.unwrap(); + let port = server.port().await; + + tokio::spawn(async move { + server.start().await; + }); + test(port).await; + } + #[tokio::test] + async fn test_fetch_server_info() { + use NetworkManagerMessage::Info; + #[allow(unused)] + let (tx, rx) = channel::(16); + + wrap_setup(|port| async move { + let network = NetworkManager::new(tx); + let info = network + .info(format!("localhost:{}", port)) + .await + .expect("Failed to fetch info"); + assert_eq!( + info, + Info { + server_name: "oof".to_string(), + server_owner: "michael".to_string() + } + ); + }) + .await; + } + #[tokio::test] + async fn test_login_and_logout_to_server() { + #[allow(unused)] + let (tx, rx) = channel::(16); + + let network = NetworkManager::new(tx); + + println!("created network manger"); + + wrap_setup(|port| async move { + network + .login( + format!("localhost:{}", port), + Uuid::default(), + "user1".to_string(), + "localhost".to_string(), + ) + .await + .expect("login failed"); + + network.logout().await.expect("logout failed"); + }) + .await; + } +} diff --git a/client/src/worker.rs b/client/src/worker.rs new file mode 100644 index 0000000..a5ef6ac --- /dev/null +++ b/client/src/worker.rs @@ -0,0 +1,75 @@ +use std::{sync::Arc, thread::spawn, time::Duration}; + +use crossbeam_channel::Sender as CrossSender; +use foundation::ClientDetails; +use tokio::{ + runtime::Runtime, + sync::{ + mpsc::{channel, Sender as TokioSender}, + Mutex, + }, + time::sleep, +}; + +use crate::{ + managers::NetworkManager, + worker_message::WorkerMessage, + Cursive, + TextView, +}; + +pub type CursiveSender = CrossSender>; + +pub struct Worker { + cursive_sender: CursiveSender, + + network_manager: Arc>, + + number: Arc>, + + #[allow(unused)] + user_details: Mutex>, +} + +impl Worker { + pub fn new(sender: CursiveSender) -> Worker { + #[allow(unused)] + let (tx, rx) = channel::(16); + + Worker { + network_manager: NetworkManager::new(tx.clone()), + number: Arc::new(Mutex::new(0)), + user_details: Mutex::new(None), + cursive_sender: sender, + } + } + + pub fn start(self) -> TokioSender { + #[allow(unused)] + let (tx, rx) = channel::(16); + spawn(move || { + let sender = self.cursive_sender.clone(); + let rt = Runtime::new().unwrap(); + let tmp_num = self.number.clone(); + #[allow(unused)] + let network_manager = self.network_manager.clone(); + rt.block_on(async move { + let a = &tmp_num; + loop { + let num = Arc::clone(&a); + sleep(Duration::new(1, 0)).await; + let _ = sender.send(Box::new(move |s| { + let num = &num.clone(); + let mut num_lock = num.blocking_lock(); + *num_lock += 1; + let a = *num_lock; + s.find_name::("TextView") + .unwrap() + .set_content(a.to_string()); + })); + } + }) + }); + tx + } +} diff --git a/client/src/worker_message.rs b/client/src/worker_message.rs new file mode 100644 index 0000000..485dece --- /dev/null +++ b/client/src/worker_message.rs @@ -0,0 +1,29 @@ +use crate::managers::NetworkManagerMessage; + +pub enum WorkerMessage { + Info { + server_name: String, + server_owner: String, + }, + #[allow(unused)] + Error(String), +} + +impl From for WorkerMessage { + fn from(other: NetworkManagerMessage) -> Self { + #[allow(unused)] + use NetworkManagerMessage::{Error, Info as OldInfo}; + #[allow(unused)] + use WorkerMessage::{Error as NewError, Info as NewInfo}; + match other { + OldInfo { + server_name, + server_owner, + } => NewInfo { + server_owner, + server_name, + }, + _ => todo!(), + } + } +} diff --git a/example_plugin/Cargo.toml b/example_plugin/Cargo.toml new file mode 100644 index 0000000..30ea348 --- /dev/null +++ b/example_plugin/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "example_plugin" +version = "0.1.0" +authors = ["michael-bailey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["dylib"] +name = "ExamplePlugin" +path = "src/lib.rs" + + +[dependencies] +uuid = {version = "0.8", features = ["serde", "v4"]} +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +zeroize = "1.1.0" +futures = "0.3.16" +async-trait = "0.1.52" + +server = {path = "../server"} \ No newline at end of file diff --git a/example_plugin/src/example.rs b/example_plugin/src/example.rs new file mode 100644 index 0000000..bd69a02 --- /dev/null +++ b/example_plugin/src/example.rs @@ -0,0 +1,63 @@ +// use futures::lock::Mutex; +// use serverlib::plugin::WeakPluginInterface; +// use std::sync::Mutex as StdMutex; +// use std::thread::sleep; +// use std::time::Duration; + +// use serverlib::plugin::IPlugin; +// use serverlib::plugin::PluginDetails; + +// #[derive(Debug)] +// pub struct ExamplePlugin { +// number: Mutex, +// interface: StdMutex>, +// } + +// impl Default for ExamplePlugin { +// fn default() -> Self { +// ExamplePlugin { +// number: Mutex::new(0), +// interface: StdMutex::default(), +// } +// } +// } + +// #[async_trait::async_trait] +// impl IPlugin for ExamplePlugin { +// fn details(&self) -> PluginDetails { +// PluginDetails { +// display_name: "ExamplePlugin", +// id: "io.github.michael-bailey.ExamplePlugin", +// version: "0.0.1", +// contacts: vec!["bailey-michael1@outlook.com"], +// } +// } + +// fn set_interface(&self, interface: WeakPluginInterface) { +// if let Ok(mut lock) = self.interface.lock() { +// *lock = Some(interface); +// } +// } + +// async fn event(&self) { +// println!("Not Implemented"); +// } + +// fn init(&self) { +// println!("[ExamplePlugin]: example init") +// } + +// async fn run(&self) { +// println!("Example!!!"); +// sleep(Duration::new(1, 0)); +// let mut a = self.number.lock().await; +// *a = a.overflowing_add(1).0; +// println!("[ExamplePlugin]: example run {}", *a); +// } + +// fn deinit(&self) { +// if let Some(mut lock) = self.number.try_lock() { +// *lock = 0; +// } +// } +// } diff --git a/example_plugin/src/lib.rs b/example_plugin/src/lib.rs new file mode 100644 index 0000000..6bdc4d3 --- /dev/null +++ b/example_plugin/src/lib.rs @@ -0,0 +1,14 @@ +mod example; + +use std::sync::Arc; + +// use serverlib::plugin::Plugin; + +// use crate::example::ExamplePlugin; +// use serverlib::plugin::plugin::Plugin; +// use std::sync::Arc; + +// #[no_mangle] +// pub extern "C" fn get_plugin() -> Plugin { +// Arc::new(ExamplePlugin::default()) +// } diff --git a/foundation/Cargo.toml b/foundation/Cargo.toml index a20e1ab..4c7fc5b 100644 --- a/foundation/Cargo.toml +++ b/foundation/Cargo.toml @@ -8,18 +8,19 @@ edition = "2018" [lib] [dependencies] +chrono = {version = "0.4", features = ["serde", "rustc-serialize"] } +async-trait = "0.1.52" regex = "1" crossbeam = "0.8.0" crossbeam-channel = "0.5.0" crossbeam-queue = "0.3.1" -parking_lot = "0.11.1" -dashmap = "4.0.2" -rayon = "1.3.1" +rayon = "1.2.0" zeroize = "1.1.0" -crossterm = "0.19.0" log = "0.4" url = "2.2.0" -uuid = {version = "0.8", features = ["serde", "v4"]} -serde = { version = "1.0", features = ["derive"] } +futures = "0.3.16" serde_json = "1.0" - +openssl = "0.10" +uuid = {version = "1.1.2", features = ["serde", "v4"]} +tokio = { version = "1.9.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/foundation/src/connection.rs b/foundation/src/connection.rs new file mode 100644 index 0000000..fe3064b --- /dev/null +++ b/foundation/src/connection.rs @@ -0,0 +1,146 @@ +use std::{ + io::{Error, ErrorKind, Write}, + mem, + sync::Arc, +}; + +use serde::{de::DeserializeOwned, Serialize}; +use tokio::{ + io, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, + net::{TcpStream, ToSocketAddrs}, + sync::Mutex, +}; + +#[derive(Debug)] +pub struct Connection { + stream_rx: Mutex>>>, + stream_tx: Mutex>>, +} + +impl Connection { + pub fn new() -> Arc { + Arc::new(Connection { + stream_rx: Mutex::new(None), + stream_tx: Mutex::new(None), + }) + } + + pub async fn connect(&self, host: T) -> Result<(), Error> { + let connection = TcpStream::connect(host).await?; + let (rd, wd) = io::split(connection); + + let mut writer_lock = self.stream_tx.lock().await; + let mut reader_lock = self.stream_rx.lock().await; + + let _ = mem::replace(&mut *writer_lock, Some(wd)); + let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd))); + + Ok(()) + } + + pub async fn write(&self, message: T) -> Result<(), Error> + where + T: Serialize, + { + let mut out_buffer = Vec::new(); + let out = serde_json::to_string(&message).unwrap(); + let mut writer_lock = self.stream_tx.lock().await; + let old = mem::replace(&mut *writer_lock, None); + writeln!(&mut out_buffer, "{}", out)?; + + let Some(mut writer) = old else { + return Err(Error::new(ErrorKind::Interrupted, "Writer does not exist")); + }; + + writer.write_all(&out_buffer).await?; + writer.flush().await?; + let _ = mem::replace(&mut *writer_lock, Some(writer)); + Ok(()) + } + + pub async fn read(&self) -> Result + where + T: DeserializeOwned, + { + let mut buffer = String::new(); + let mut reader_lock = self.stream_rx.lock().await; + let old = mem::replace(&mut *reader_lock, None); + + if let Some(mut reader) = old { + let _ = reader.read_line(&mut buffer).await?; + let _ = mem::replace(&mut *reader_lock, Some(reader)); + Ok(serde_json::from_str(&buffer).unwrap()) + } else { + Err(Error::new(ErrorKind::Interrupted, "Reader does not exist")) + } + } +} + +impl From for Connection { + fn from(stream: TcpStream) -> Self { + let (rd, wd) = io::split(stream); + Connection { + stream_tx: Mutex::new(Some(wd)), + stream_rx: Mutex::new(Some(BufReader::new(rd))), + } + } +} + +#[cfg(test)] +mod test { + use std::{future::Future, io::Error, panic}; + + use serde::{Deserialize, Serialize}; + use tokio::net::TcpListener; + + use crate::connection::Connection; + + #[derive(Serialize, Deserialize, Debug, PartialEq)] + enum TestMessages { + Ping, + Pong, + } + + #[tokio::test] + async fn a() -> Result<(), Error> { + wrap_setup(|port| async move { + println!("{}", port); + let connection = Connection::new(); + connection + .connect(format!("localhost:{}", &port)) + .await + .unwrap(); + connection.write(&TestMessages::Ping).await.unwrap(); + let res = connection.read::().await.unwrap(); + + assert_eq!(res, TestMessages::Pong); + }) + .await + } + + async fn wrap_setup(test: T) -> Result<(), std::io::Error> + where + T: FnOnce(u16) -> F + panic::UnwindSafe, + F: Future, + { + let server = TcpListener::bind("localhost:0").await?; + let addr = server.local_addr()?; + + // create tokio server execution + tokio::spawn(async move { + while let Ok((stream, addr)) = server.accept().await { + use TestMessages::{Ping, Pong}; + + println!("[server]: Connected {}", &addr); + let connection = Connection::from(stream); + if let Ok(Ping) = connection.read::().await { + connection.write::(Pong).await.unwrap() + } + } + }); + + test(addr.port()).await; + Ok(()) + } +} diff --git a/foundation/src/encryption/mod.rs b/foundation/src/encryption/mod.rs new file mode 100644 index 0000000..0374cd8 --- /dev/null +++ b/foundation/src/encryption/mod.rs @@ -0,0 +1,41 @@ +// use openssl::sha::sha256; +// use openssl::symm::{Cipher, Crypter, Mode}; + +#[cfg(test)] +mod test { + use openssl::{ + sha::sha256, + symm::{Cipher, Crypter, Mode}, + }; + + #[test] + fn testEncryption() { + let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes(); + let key = sha256(b"This is a key"); + let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + let encrypter = + Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter + .unwrap() + .update(plaintext, ciphertext.as_mut_slice()) + .unwrap(); + + let decrypter = + Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV)); + let mut decrypted = vec![0u8; 1024]; + decrypter + .unwrap() + .update(&ciphertext[..cipherlen], decrypted.as_mut_slice()) + .unwrap(); + + println!("{:?}", plaintext); + println!("{:?}", ciphertext.as_slice()); + println!("{:?}", decrypted.as_slice()); + + println!("{:?}", plaintext.len()); + println!("{:?}", ciphertext.len()); + println!("{:?}", decrypted.len()); + } +} diff --git a/foundation/src/event/event.rs b/foundation/src/event/event.rs new file mode 100644 index 0000000..166328f --- /dev/null +++ b/foundation/src/event/event.rs @@ -0,0 +1,97 @@ +use std::collections::HashMap; + +use futures::channel::oneshot::{channel, Receiver, Sender}; + +use crate::event::{ + event_result::EventResultBuilder, + EventResult, + EventResultType, +}; + +/// # Eventw +/// Object that holds details about an event being passed through the application. +/// +/// ## Properties +/// - r#type: The event type +/// - args: A hashmap of arguments to be carried by the event +/// - sender: The sender to send the result for the event. +/// - receiver: The reciever of the event result from the event. +pub struct Event +where + T: Sync + Send, +{ + pub r#type: T, + args: HashMap, + sender: Sender, + receiver: Option>, +} + +impl Event +where + T: Sync + Send, +{ + /// Fetches an argument from the arguments of the event. + pub fn get_arg(&self, key: String) -> Option { + self.args.get(&key).cloned() + } + + /// Creates an event result using the sender of the event. + /// This consumes the event. + pub fn respond(self, result_type: EventResultType) -> EventResultBuilder { + EventResult::create(result_type, self.sender) + } + + /// Used to await the result of the event if required. + pub fn get_reciever(&mut self) -> Receiver { + self.receiver.take().unwrap() + } +} + +pub struct EventBuilder { + #[allow(dead_code)] + r#type: T, + + #[allow(dead_code)] + args: HashMap, + + #[allow(dead_code)] + sender: Sender, + + #[allow(dead_code)] + receiver: Option>, +} + +impl EventBuilder { + #[allow(dead_code)] + pub(super) fn new(r#type: T) -> EventBuilder { + let (sender, receiver) = channel(); + EventBuilder { + r#type, + args: HashMap::new(), + sender, + receiver: Some(receiver), + } + } + + pub fn add_arg, V: Into>( + mut self, + key: K, + value: V, + ) -> Self { + self.args.insert(key.into(), value.into()); + self + } + + #[allow(dead_code)] + pub(crate) fn build(self) -> Event + where + T: Sync + Send, + { + Event { + r#type: self.r#type, + args: self.args, + sender: self.sender, + receiver: self.receiver, + } + } +} diff --git a/foundation/src/event/event_result.rs b/foundation/src/event/event_result.rs new file mode 100644 index 0000000..3a7e57e --- /dev/null +++ b/foundation/src/event/event_result.rs @@ -0,0 +1,61 @@ +use std::collections::HashMap; + +use futures::channel::oneshot::Sender; + +pub enum EventResultType { + Success, + NoResponse, + InvalidArgs, + InvalidCode, + Other(String), +} + +pub struct EventResult { + code: EventResultType, + args: HashMap, +} + +impl EventResult { + pub fn create( + result_type: EventResultType, + sender: Sender, + ) -> EventResultBuilder { + EventResultBuilder::new(result_type, sender) + } +} + +/// # EventResultBuilder +/// Builds the result of an event +pub struct EventResultBuilder { + code: EventResultType, + args: HashMap, + sender: Sender, +} + +impl EventResultBuilder { + pub(self) fn new( + result_type: EventResultType, + sender: Sender, + ) -> Self { + Self { + code: result_type, + args: HashMap::default(), + sender, + } + } + + pub fn add_arg(mut self, key: String, value: String) -> Self { + self.args.insert(key, value); + self + } + + pub fn send(self) { + self + .sender + .send(EventResult { + code: self.code, + args: self.args, + }) + .ok(); + } +} diff --git a/foundation/src/event/mod.rs b/foundation/src/event/mod.rs new file mode 100644 index 0000000..caa0d30 --- /dev/null +++ b/foundation/src/event/mod.rs @@ -0,0 +1,9 @@ +#[allow(clippy::module_inception)] +mod event; +mod event_result; +mod responder; + +pub use event::{Event, EventBuilder}; +pub use event_result::{EventResult, EventResultType}; + +pub use self::responder::IResponder; diff --git a/foundation/src/event/responder.rs b/foundation/src/event/responder.rs new file mode 100644 index 0000000..0d1753d --- /dev/null +++ b/foundation/src/event/responder.rs @@ -0,0 +1,21 @@ +use std::sync::Weak; + +use crate::event::Event; + +pub trait IResponder +where + T: Sync + Send, +{ + fn post_event(&self, event: Event) { + if let Some(next) = self.get_next() { + if let Some(next) = next.upgrade() { + next.post_event(event); + return; + } + } + self.r#final(event); + } + fn get_next(&self) -> Option>>; + fn on_event(&self, event: Event); + fn r#final(&self, _event: Event) {} +} diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index 3ff3748..8e63b55 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -1,12 +1,27 @@ +extern crate core; +pub mod connection; +pub mod encryption; +pub mod event; pub mod messages; +pub mod models; pub mod prelude; +pub mod test; use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive(Deserialize, Serialize, Debug, Clone)] +/** + * #ClientDetails. + * This defines the fileds a client would want to send when connecitng + * uuid: the unique id of the user. + * username: the users user name. + * address: the ip address of the connected user. + * public_key: the public key used when sending messages to the user. + */ +#[derive(Deserialize, Serialize, Debug, Clone, Default)] pub struct ClientDetails { - pub uuid: Uuid, - pub username: String, - pub address: String, -} \ No newline at end of file + pub uuid: Uuid, + pub username: String, + pub address: String, + pub public_key: Option>, +} diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs index cabe3bc..065a54e 100644 --- a/foundation/src/messages/client.rs +++ b/foundation/src/messages/client.rs @@ -1,31 +1,42 @@ -use crate::ClientDetails; use serde::{Deserialize, Serialize}; - use uuid::Uuid; -/// # ClientMessage -/// This enum defined the message that a client can receive from the server -/// This uses the serde library to transform to and from json. -/// -#[derive(Serialize, Deserialize)] -pub enum ClientStreamIn { - Connected, +use crate::{models::message::Message, ClientDetails}; + +/// This enum defined the message that the server will receive from a client +/// This uses the serde library to transform to and from json. +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ClientStreamIn { + GetClients, + GetMessages, - Update, SendMessage { to: Uuid, content: String }, SendGlobalMessage { content: String }, Disconnect, } -#[derive(Serialize, Deserialize)] +/// This enum defined the message that the server will send to a client +/// This uses the serde library to transform to and from json. +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "type")] pub enum ClientStreamOut { Connected, - UserMessage { from: Uuid, content: String }, - GlobalMessage { content: String }, + // get reequest messages + ConnectedClients { clients: Vec }, + GlobalChatMessages { messages: Vec }, - ConnectedClients {clients: Vec}, + // event messges + UserMessage { from: Uuid, content: String }, + GlobalMessage { from: Uuid, content: String }, + + ClientConnected { id: Uuid, username: String }, + ClientRemoved { id: Uuid }, Disconnected, + + // error cases + Error { msg: String }, } diff --git a/foundation/src/messages/network.rs b/foundation/src/messages/network.rs index 98a2683..0cff3b6 100644 --- a/foundation/src/messages/network.rs +++ b/foundation/src/messages/network.rs @@ -1,22 +1,49 @@ use serde::{Deserialize, Serialize}; +use uuid::Uuid; +/// Message the server will receive from a socket #[derive(Serialize, Deserialize)] +#[serde(tag = "type")] pub enum NetworkSockIn { Info, Connect { - uuid: String, + uuid: Uuid, username: String, address: String, }, } -#[derive(Serialize, Deserialize)] -pub enum NetworkSockOut<'a> { +/// Message the server will send through a socket +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +pub enum NetworkSockOut { Request, GotInfo { - server_name: &'a str, - server_owner: &'a str, + server_name: String, + server_owner: String, }, Connecting, + + Error, +} + +impl PartialEq for NetworkSockOut { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (NetworkSockOut::Request, NetworkSockOut::Request) => true, + ( + NetworkSockOut::GotInfo { + server_name, + server_owner, + }, + NetworkSockOut::GotInfo { + server_owner: owner_other, + server_name: name_other, + }, + ) => server_name == name_other && server_owner == owner_other, + (NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true, + _ => false, + } + } } diff --git a/foundation/src/models/message.rs b/foundation/src/models/message.rs new file mode 100644 index 0000000..4e2021d --- /dev/null +++ b/foundation/src/models/message.rs @@ -0,0 +1,22 @@ +use chrono::{DateTime, Local}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + id: Uuid, + from: Uuid, + content: String, + time: DateTime, +} + +impl Message { + pub fn new(from: Uuid, content: String) -> Self { + Self { + id: Uuid::new_v4(), + from, + content, + time: Local::now(), + } + } +} diff --git a/foundation/src/models/mod.rs b/foundation/src/models/mod.rs new file mode 100644 index 0000000..e216a50 --- /dev/null +++ b/foundation/src/models/mod.rs @@ -0,0 +1 @@ +pub mod message; diff --git a/foundation/src/prelude.rs b/foundation/src/prelude.rs index 92ff1d7..8908ab9 100644 --- a/foundation/src/prelude.rs +++ b/foundation/src/prelude.rs @@ -1,15 +1,55 @@ -use std::sync::Arc; +use std::{ + sync::{Arc, Weak}, + time::Duration, +}; -pub trait IMessagable { - fn send_message(&self, msg: TMessage); - fn set_sender(&self, sender: TSender); +use async_trait::async_trait; +use tokio::time::sleep; + +/// # IManager +/// This is used with all managers to implement multitasking +/// +/// ## Methods +/// - init: gets executed once before a tokio task is created +/// - run: gets called once every tick in a tokio task +/// - start: runs the init function then creates the tokio task for the run function +#[async_trait] +pub trait IManager { + /// This defines some setup before the tokio loop is started + async fn init(self: &Arc) + where + Self: Send + Sync + 'static, + { + } + + /// this is used to get a future that can be awaited + async fn run(self: &Arc); + + /// This is used to start a future through tokio + fn start(self: &Arc) + where + Self: Send + Sync + 'static, + { + let weak_self: Weak = Arc::downgrade(self); + + // this looks horrid but works + tokio::spawn(async move { + let weak_self = weak_self.clone(); + + let a = weak_self.upgrade().unwrap(); + a.init().await; + drop(a); + + loop { + sleep(Duration::new(1, 0)).await; + if let Some(manager) = Weak::upgrade(&weak_self) { + manager.run().await + } + } + }); + } } -pub trait ICooperative { - fn tick(&self); -} - -pub trait IPreemptive { - fn run(arc: &Arc); - fn start(arc: &Arc); +trait Visitor { + fn visit(&self, message: T); } diff --git a/foundation/src/test/connection_pair.rs b/foundation/src/test/connection_pair.rs new file mode 100644 index 0000000..00c7000 --- /dev/null +++ b/foundation/src/test/connection_pair.rs @@ -0,0 +1,25 @@ +use std::{io::Error, net::SocketAddr, sync::Arc}; + +use tokio::{ + join, + net::{TcpListener, TcpStream}, +}; + +use crate::connection::Connection; + +pub async fn create_connection_pair( +) -> Result<(Arc, (Arc, SocketAddr)), Error> { + let listener: TcpListener = TcpListener::bind("localhost:0000").await?; + + let port = listener.local_addr()?.port(); + + let (server_res, client_res) = join!( + async { TcpStream::connect(format!("localhost:{}", port)).await }, + async { listener.accept().await } + ); + + let (client, addr) = client_res?; + let server = Arc::new(Connection::from(server_res?)); + let client = Arc::new(Connection::from(client)); + Ok((server, (client, addr))) +} diff --git a/foundation/src/test/mod.rs b/foundation/src/test/mod.rs new file mode 100644 index 0000000..9e53c5c --- /dev/null +++ b/foundation/src/test/mod.rs @@ -0,0 +1,3 @@ +mod connection_pair; + +pub use connection_pair::create_connection_pair; diff --git a/rustfmt.toml b/rustfmt.toml index 779de58..400595e 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1,8 @@ +max_width = 80 hard_tabs = true -max_width = 90 \ No newline at end of file +tab_spaces = 2 +imports_layout = "HorizontalVertical" +imports_granularity = "Crate" +merge_imports = true +reorder_imports = true +group_imports = "StdExternalCrate" \ No newline at end of file diff --git a/scripts/test.lua b/scripts/test.lua new file mode 100644 index 0000000..83f40d6 --- /dev/null +++ b/scripts/test.lua @@ -0,0 +1,6 @@ +print("Test Script") + +print(Server.ClientManager:getCount()) + + +print("Test Script") \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml index a91dd21..47c5f03 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -6,13 +6,37 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +# [lib] +# name = "serverlib" +# path = "src/lib.rs" + +# [[bin]] +# name = "server" +# path = "src/main.rs" + +[[bin]] +name = "server" +path = "src/main.rs" + [dependencies] -clap = "2.33.3" -uuid = {version = "0.8", features = ["serde", "v4"]} +chrono = "0.4" +clap = {version = "4.4.8", features = ["derive"]} +uuid = {version = "1.1.2", features = ["serde", "v4"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" crossbeam = "0.8.0" crossbeam-channel = "0.5.0" +zeroize = "1.1.0" +openssl = "0.10.33" +tokio = { version = "1.9.0", features = ["full"] } +futures = "0.3.16" +async-trait = "0.1.52" +actix = "0.13" +rhai = {version = "1.7.0"} +mlua = { version = "0.9.2", features=["lua54", "async", "serde", "macros", "vendored"] } +libloading = "0.8.1" +toml = "0.8.8" +aquamarine = "0.3.2" +tokio-stream = "0.1.9" -[dependencies.foundation] -path = '../foundation' \ No newline at end of file +foundation = {path = '../foundation'} \ No newline at end of file diff --git a/server/src/client.rs b/server/src/client.rs deleted file mode 100644 index d5e1efd..0000000 --- a/server/src/client.rs +++ /dev/null @@ -1,280 +0,0 @@ -use crate::messages::ClientMessage; -use crate::messages::ServerMessage; -use foundation::prelude::IPreemptive; -use std::cmp::Ordering; -use std::io::BufRead; -use std::io::Write; -use std::io::{BufReader, BufWriter}; -use std::mem::replace; -use std::net::TcpStream; -use std::sync::Arc; -use std::sync::Mutex; - -use crossbeam_channel::{unbounded, Receiver, Sender}; -use serde::Serialize; -use uuid::Uuid; - -use foundation::ClientDetails; -use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; -use foundation::prelude::IMessagable; - -/// # Client -/// This struct represents a connected user. -/// -/// ## Attrubutes -/// - uuid: The id of the connected user. -/// - username: The username of the connected user. -/// - address: The the address of the connected client. -/// -/// - stream: The socket for the connected client. -/// - stream_reader: the buffered reader used to receive messages -/// - stream_writer: the buffered writer used to send messages -/// - owner: An optional reference to the owning object. -#[derive(Debug, Serialize)] -pub struct Client { - pub uuid: Uuid, - username: String, - address: String, - pub details: ClientDetails, - - // non serializable - #[serde(skip)] - server_channel: Mutex>>, - - #[serde(skip)] - input: Sender, - - #[serde(skip)] - output: Receiver, - - #[serde(skip)] - stream: Mutex>, - - #[serde(skip)] - stream_reader: Mutex>>, - - #[serde(skip)] - stream_writer: Mutex>>, -} - -// client funciton implmentations -impl Client { - pub fn new( - uuid: String, - username: String, - address: String, - stream: TcpStream, - server_channel: Sender, - ) -> Arc { - let (sender, receiver) = unbounded(); - - let out_stream = stream.try_clone().unwrap(); - let in_stream = stream.try_clone().unwrap(); - - Arc::new(Client { - username: username.clone(), - uuid: Uuid::parse_str(&uuid).expect("invalid id"), - address: address.clone(), - - details: ClientDetails { - uuid: Uuid::parse_str(&uuid).expect("invalid id"), - username, - address, - }, - - server_channel: Mutex::new(Some(server_channel)), - - input: sender, - output: receiver, - - stream: Mutex::new(Some(stream)), - - stream_reader: Mutex::new(Some(BufReader::new(in_stream))), - stream_writer: Mutex::new(Some(BufWriter::new(out_stream))), - }) - } -} - -impl IMessagable> for Client { - fn send_message(&self, msg: ClientMessage) { - self.input - .send(msg) - .expect("failed to send message to client."); - } - fn set_sender(&self, sender: Sender) { - let mut server_lock = self.server_channel.lock().unwrap(); - let _ = replace(&mut *server_lock, Some(sender)); - } -} - -// cooperative multitasking implementation -impl IPreemptive for Client { - fn run(arc: &Arc) { - let arc1 = arc.clone(); - let arc2 = arc.clone(); - - // read thread - let _ = std::thread::Builder::new() - .name(format!("client thread recv [{:?}]", &arc.uuid)) - .spawn(move || { - use ClientMessage::{Disconnect}; - let arc = arc1; - - let mut buffer = String::new(); - let mut reader_lock = arc.stream_reader.lock().unwrap(); - let reader = reader_lock.as_mut().unwrap(); - - 'main: while let Ok(size) = reader.read_line(&mut buffer) { - if size == 0 { - arc.send_message(Disconnect); - break 'main; - } - - let command = serde_json::from_str::(buffer.as_str()); - match command { - Ok(ClientStreamIn::Disconnect) => { - println!("[Client {:?}]: Disconnect recieved", &arc.uuid); - arc.send_message(Disconnect); - break 'main; - } - Ok(ClientStreamIn::SendMessage { to, content }) => { - println!( - "[Client {:?}]: send message to: {:?}", - &arc.uuid, &to - ); - let lock = arc.server_channel.lock().unwrap(); - let sender = lock.as_ref().unwrap(); - let _ = sender.send(ServerMessage::ClientSendMessage { - from: arc.uuid, - to, - content, - }); - } - _ => println!("[Client {:?}]: command not found", &arc.uuid), - } - } - println!("[Client {:?}] exited thread 1", &arc.uuid); - }); - - // write thread - let _ = std::thread::Builder::new() - .name(format!("client thread msg [{:?}]", &arc.uuid)) - .spawn(move || { - let arc = arc2; - let mut writer_lock = arc.stream_writer.lock().unwrap(); - let writer = writer_lock.as_mut().unwrap(); - let mut buffer: Vec = Vec::new(); - - let _ = writeln!( - buffer, - "{}", - serde_json::to_string(&ClientStreamOut::Connected).unwrap() - ); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); - - 'main: loop { - for message in arc.output.iter() { - use ClientMessage::{Disconnect,Message, Update}; - println!("[Client {:?}]: {:?}", &arc.uuid, message); - match message { - Disconnect => { - arc.server_channel - .lock() - .unwrap() - .as_mut() - .unwrap() - .send(ServerMessage::ClientDisconnected(arc.uuid)) - .unwrap(); - break 'main; - } - Message { from, content } => { - let _ = writeln!( - buffer, - "{}", - serde_json::to_string( - &ClientStreamOut::UserMessage { from, content } - ) - .unwrap() - ); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); - } - Update {clients} => { - let client_details_vec: Vec = clients.iter().map(|client| &client.details).cloned().collect(); - let _ = writeln!( - buffer, - "{}", - serde_json::to_string( - &ClientStreamOut::ConnectedClients {clients: client_details_vec} - ).unwrap() - ); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); - } - } - } - } - println!("[Client {:?}]: exited thread 2", &arc.uuid); - }); - } - - fn start(arc: &Arc) { - Client::run(arc) - } -} - -// default value implementation -impl Default for Client { - fn default() -> Self { - let (sender, reciever) = unbounded(); - Client { - username: "generic_client".to_string(), - uuid: Uuid::new_v4(), - address: "127.0.0.1".to_string(), - - details: ClientDetails { - uuid: Uuid::new_v4(), - username: "generic_client".to_string(), - address: "127.0.0.1".to_string(), - }, - - output: reciever, - input: sender, - - server_channel: Mutex::new(None), - - stream: Mutex::new(None), - - stream_reader: Mutex::new(None), - stream_writer: Mutex::new(None), - } - } -} - -// MARK: - used for sorting. -impl PartialEq for Client { - fn eq(&self, other: &Self) -> bool { - self.uuid == other.uuid - } -} - -impl Eq for Client {} - -impl PartialOrd for Client { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for Client { - fn cmp(&self, other: &Self) -> Ordering { - self.uuid.cmp(&other.uuid) - } -} - -impl Drop for Client { - fn drop(&mut self) { - println!("[Client] dropped!"); - } -} diff --git a/server/src/client_management/chat_manager/actor.rs b/server/src/client_management/chat_manager/actor.rs new file mode 100644 index 0000000..6460ab7 --- /dev/null +++ b/server/src/client_management/chat_manager/actor.rs @@ -0,0 +1,87 @@ +use actix::{Actor, Addr, Context, Handler}; +use foundation::models::message::Message; +use uuid::Uuid; + +use crate::client_management::chat_manager::messages::{ + ChatManagerDataMessage, + ChatManagerDataResponse, + ChatManagerMessage, +}; + +pub(crate) struct ChatManager { + messages: Vec, +} + +impl ChatManager { + pub fn new() -> Addr { + Self { + messages: Vec::new(), + } + .start() + } + + // no need for a remove methods because this is a read only system + fn add_message( + &mut self, + _ctx: &mut Context, + id: Uuid, + content: String, + ) { + println!( + "[ChatManager] add_message id: {:?} content: {:?}", + id, content + ); + self.messages.push(Message::new(id, content)) + } + + fn get_messages(&self, _ctx: &mut Context) -> ChatManagerDataResponse { + println!("[ChatManager] getting messages"); + ChatManagerDataResponse::GotMessages(self.messages.clone()) + } + + fn get_message( + &self, + _ctx: &mut Context, + index: usize, + ) -> ChatManagerDataResponse { + println!("[ChatManager] getting message index: {:?}", index); + ChatManagerDataResponse::GotMessage(self.messages.get(index).cloned()) + } +} + +impl Actor for ChatManager { + type Context = Context; +} + +impl Handler for ChatManager { + type Result = (); + + fn handle( + &mut self, + msg: ChatManagerMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + println!("[ChatManager] got message: {:?}", msg); + match msg { + ChatManagerMessage::AddMessage(id, content) => { + self.add_message(ctx, id, content) + } + } + } +} + +impl Handler for ChatManager { + type Result = ChatManagerDataResponse; + + fn handle( + &mut self, + msg: ChatManagerDataMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + println!("[ChatManager] got message: {:?}", msg); + match msg { + ChatManagerDataMessage::GetMessages => self.get_messages(ctx), + ChatManagerDataMessage::GetMessage(index) => self.get_message(ctx, index), + } + } +} diff --git a/server/src/client_management/chat_manager/messages.rs b/server/src/client_management/chat_manager/messages.rs new file mode 100644 index 0000000..67d70de --- /dev/null +++ b/server/src/client_management/chat_manager/messages.rs @@ -0,0 +1,23 @@ +use actix::{Message as ActixMessage, MessageResponse}; +use foundation::models::message::Message; +use uuid::Uuid; + +#[derive(ActixMessage, Debug)] +#[rtype(result = "()")] +pub enum ChatManagerMessage { + AddMessage(Uuid, String), +} + +#[allow(dead_code)] +#[derive(ActixMessage, Debug)] +#[rtype(result = "ChatManagerDataResponse")] +pub enum ChatManagerDataMessage { + GetMessages, + GetMessage(usize), +} + +#[derive(MessageResponse)] +pub enum ChatManagerDataResponse { + GotMessages(Vec), + GotMessage(Option), +} diff --git a/server/src/client_management/chat_manager/mod.rs b/server/src/client_management/chat_manager/mod.rs new file mode 100644 index 0000000..e8913b0 --- /dev/null +++ b/server/src/client_management/chat_manager/mod.rs @@ -0,0 +1,16 @@ +//! Contains all the structures for managing chat storage. +//! it contains: +//! - ChatManager +//! - Messages +//! - Mesage type + +mod actor; + +mod messages; + +pub(crate) use actor::ChatManager; +pub(crate) use messages::{ + ChatManagerDataMessage, + ChatManagerDataResponse, + ChatManagerMessage, +}; diff --git a/server/src/client_management/client/actor.rs b/server/src/client_management/client/actor.rs new file mode 100644 index 0000000..c553c7a --- /dev/null +++ b/server/src/client_management/client/actor.rs @@ -0,0 +1,281 @@ +use actix::{Actor, Addr, AsyncContext, Context, Handler, WeakRecipient}; +use foundation::{ + messages::client::{ClientStreamIn, ClientStreamOut}, + ClientDetails, +}; +use uuid::Uuid; + +use crate::{ + client_management::client::messages::{ + ClientDataMessage, + ClientDataResponse, + ClientMessage, + ClientObservableMessage, + }, + network::{Connection, ConnectionObservableOutput}, + prelude::messages::{ConnectionMessage, ObservableMessage}, +}; + +/// # Client +/// This represents a connected client. +/// it will handle received message from a connection. +pub struct Client { + connection: Addr, + details: ClientDetails, + observers: Vec>, +} + +impl Client { + pub(crate) fn new( + connection: Addr, + details: ClientDetails, + ) -> Addr { + Client { + connection, + details, + observers: Vec::default(), + } + .start() + } + + #[inline] + fn get_clients(&self, ctx: &mut Context) { + println!("[Client] getting clients"); + use ClientObservableMessage::GetClients; + self.broadcast(GetClients(ctx.address().downgrade())); + } + + #[inline] + fn get_messages(&self, ctx: &mut Context) { + println!("[Client] getting messages"); + use ClientObservableMessage::GetGlobalMessages; + self.broadcast(GetGlobalMessages(ctx.address().downgrade())); + } + + #[inline] + fn send_message(&self, ctx: &mut Context, to: Uuid, content: String) { + println!("[Client] sending message"); + use ClientObservableMessage::Message; + self.broadcast(Message(ctx.address().downgrade(), to, content)); + } + + #[inline] + fn send_gloal_message(&self, ctx: &mut Context, content: String) { + println!("[Client] sending global message"); + use ClientObservableMessage::GlobalMessage; + self.broadcast(GlobalMessage(ctx.address().downgrade(), content)); + } + + #[inline] + fn disconnect(&self, _ctx: &mut Context) { + println!("[Client] disconnecting"); + use ClientObservableMessage::Disconnecting; + self.broadcast(Disconnecting(self.details.uuid)); + } + + #[inline] + fn broadcast(&self, message: ClientObservableMessage) { + println!("[Client] broadcasting message"); + for recp in &self.observers { + if let Some(upgraded) = recp.upgrade() { + upgraded.do_send(message.clone()); + } + } + } + + pub(crate) fn error(&self, msg: String) { + println!("[Client] sending error: {}", msg); + use serde_json::to_string; + use ConnectionMessage::SendData; + + let msg = to_string::(&ClientStreamOut::Error { msg }) + .expect("[Client] This should not fail"); + + self.connection.do_send(SendData(msg)); + } +} + +impl Actor for Client { + type Context = Context; + + // tells the client that it has been connected. + fn started(&mut self, ctx: &mut Self::Context) { + use foundation::messages::client::ClientStreamOut::Connected; + use serde_json::to_string; + + use crate::{ + network::ConnectionMessage::SendData, + prelude::messages::ObservableMessage::Subscribe, + }; + println!("[Client] started"); + self + .connection + .do_send::>(Subscribe( + ctx.address().recipient().downgrade(), + )); + self + .connection + .do_send(SendData(to_string::(&Connected).unwrap())); + } + + fn stopped(&mut self, ctx: &mut Self::Context) { + use foundation::messages::client::ClientStreamOut::Disconnected; + use serde_json::to_string; + + use crate::{ + network::ConnectionMessage::SendData, + prelude::messages::ObservableMessage::Unsubscribe, + }; + + println!("[Client] stopped"); + + self + .connection + .do_send::>(Unsubscribe( + ctx.address().recipient().downgrade(), + )); + self.connection.do_send(SendData( + to_string::(&Disconnected).unwrap(), + )); + } +} + +impl Handler for Client { + type Result = ClientDataResponse; + fn handle( + &mut self, + msg: ClientDataMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + match msg { + ClientDataMessage::Details => { + ClientDataResponse::Details(self.details.clone()) + } + _ => todo!(), + } + } +} + +// Handles incoming messages to the client. +impl Handler for Client { + type Result = (); + fn handle( + &mut self, + msg: ClientMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + use foundation::messages::client::ClientStreamOut::{ + ConnectedClients, + GlobalChatMessages, + GlobalMessage, + UserMessage, + }; + use serde_json::to_string; + + use crate::{ + client_management::client::messages::ClientMessage::{ + ClientList, + ClientlySentMessage, + GloballySentMessage, + MessageList, + }, + network::ConnectionMessage::SendData, + }; + + match msg { + ClientList(clients) => self.connection.do_send(SendData( + to_string::(&ConnectedClients { clients }) + .expect("[Client] Failed to encode string"), + )), + + MessageList(messages) => self.connection.do_send(SendData( + to_string::(&GlobalChatMessages { messages }) + .expect("[Client] Failed to encode string"), + )), + + ClientlySentMessage { content, from } => { + self.connection.do_send(SendData( + to_string::(&UserMessage { from, content }) + .expect("[Client] Failed to encode string"), + )) + } + + GloballySentMessage { from, content } => { + self.connection.do_send(SendData( + to_string::(&GlobalMessage { from, content }) + .expect("[Client] Failed to encode string"), + )) + } + } + } +} + +// Handles outputs from the connection. +impl Handler for Client { + type Result = (); + + fn handle( + &mut self, + msg: ConnectionObservableOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use crate::network::ConnectionObservableOutput::RecvData; + if let RecvData(_sender, _addr, data) = msg { + use foundation::messages::client::ClientStreamIn::{ + Disconnect, + GetClients, + GetMessages, + SendGlobalMessage, + SendMessage, + }; + use serde_json::from_str; + if let Ok(msg) = from_str::(data.as_str()) { + match msg { + GetClients => self.get_clients(ctx), + GetMessages => self.get_messages(ctx), + SendMessage { to, content } => self.send_message(ctx, to, content), + SendGlobalMessage { content } => { + self.send_gloal_message(ctx, content) + } + Disconnect => self.disconnect(ctx), + } + } else { + self.error(format!("Failed to parse Message: {}", data)); + } + } + } +} + +impl Handler> for Client { + type Result = (); + + fn handle( + &mut self, + msg: ObservableMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + use crate::prelude::messages::ObservableMessage::{Subscribe, Unsubscribe}; + match msg { + Subscribe(r) => { + println!("[Client] adding subscriber"); + self.observers.push(r); + } + Unsubscribe(r) => { + println!("[Client] removing subscriber"); + let r = r.upgrade(); + self.observers = self + .observers + .clone() + .into_iter() + .filter(|a| a.upgrade() != r) + .collect(); + } + } + } +} + +impl Drop for Client { + fn drop(&mut self) { + println!("[Client] Dropping value") + } +} diff --git a/server/src/client_management/client/messages.rs b/server/src/client_management/client/messages.rs new file mode 100644 index 0000000..60e3d05 --- /dev/null +++ b/server/src/client_management/client/messages.rs @@ -0,0 +1,44 @@ +use actix::{Message, MessageResponse, WeakAddr}; +use foundation::{models::message::Message as StoredMessage, ClientDetails}; +use uuid::Uuid; + +use crate::client_management::client::Client; + +/// Message sent ot the clients delegate +#[derive(Message)] +#[rtype(result = "()")] +pub enum ClientMessage { + ClientList(Vec), + MessageList(Vec), + + ClientlySentMessage { from: Uuid, content: String }, + GloballySentMessage { from: Uuid, content: String }, +} + +#[derive(Message)] +#[rtype(result = "ClientDataResponse")] +pub enum ClientDataMessage { + Details, + Uuid, + Username, + Address, +} + +#[derive(MessageResponse)] +pub enum ClientDataResponse { + Details(ClientDetails), + Uuid(Uuid), + Username(String), + Address(String), +} + +/// message that is sent to all observers of the current client. +#[derive(Message, Clone)] +#[rtype(result = "()")] +pub enum ClientObservableMessage { + Message(WeakAddr, Uuid, String), + GlobalMessage(WeakAddr, String), + GetClients(WeakAddr), + GetGlobalMessages(WeakAddr), + Disconnecting(Uuid), +} diff --git a/server/src/client_management/client/mod.rs b/server/src/client_management/client/mod.rs new file mode 100644 index 0000000..0e94bc0 --- /dev/null +++ b/server/src/client_management/client/mod.rs @@ -0,0 +1,5 @@ +mod actor; +mod messages; + +pub use actor::Client; +pub use messages::*; diff --git a/server/src/client_management/client_manager.rs b/server/src/client_management/client_manager.rs new file mode 100644 index 0000000..d3c25b4 --- /dev/null +++ b/server/src/client_management/client_manager.rs @@ -0,0 +1,327 @@ +use std::collections::HashMap; + +use actix::{ + fut::wrap_future, + Actor, + ActorFutureExt, + Addr, + AsyncContext, + Context, + Handler, + WeakAddr, + WeakRecipient, +}; +use foundation::ClientDetails; +use tokio_stream::StreamExt; +use uuid::Uuid; + +use crate::client_management::{ + chat_manager::{ + ChatManager, + ChatManagerDataMessage, + ChatManagerDataResponse, + ChatManagerMessage, + }, + client::{ + Client, + ClientDataMessage, + ClientDataResponse, + ClientDataResponse::Details, + ClientMessage, + ClientObservableMessage, + }, + messages::{ + ClientManagerDataMessage, + ClientManagerDataResponse, + ClientManagerDataResponse::{ClientCount, Clients}, + ClientManagerMessage, + ClientManagerOutput, + }, +}; + +pub struct ClientManager { + clients: HashMap>, + chat_manager: Addr, + _delegate: WeakRecipient, +} + +impl ClientManager { + pub(crate) fn new( + delegate: WeakRecipient, + ) -> Addr { + ClientManager { + _delegate: delegate, + clients: HashMap::new(), + chat_manager: ChatManager::new(), + } + .start() + } + + pub(crate) fn send_client_list( + &mut self, + ctx: &mut Context, + sender: WeakAddr, + ) { + println!("[ClientManager] sending update to client"); + use crate::client_management::client::ClientMessage::ClientList; + if let Some(to_send) = sender.upgrade() { + let client_addr: Vec> = + self.clients.iter().map(|(_, v)| v).cloned().collect(); + + let collection = tokio_stream::iter(client_addr) + .then(|addr| addr.send(ClientDataMessage::Details)) + .map(|val| { + if let Details(details) = val.unwrap() { + details + } else { + ClientDetails::default() + } + }) + .collect(); + + let fut = wrap_future(async move { + let a: Vec<_> = collection.await; + let _ = to_send.send(ClientList(a)).await; + }); + + ctx.spawn(fut); + } + } + + pub(crate) fn send_global_messages( + &self, + ctx: &mut Context, + sender: WeakAddr, + ) { + if let Some(to_send) = sender.upgrade() { + let fut = wrap_future( + self.chat_manager.send(ChatManagerDataMessage::GetMessages), + ) + .map(move |out, _a, _ctx| { + if let Ok(ChatManagerDataResponse::GotMessages(res)) = out { + to_send.do_send(ClientMessage::MessageList(res)); + } + }); + ctx.spawn(fut); + }; + } + + pub(crate) fn send_message_request( + &self, + ctx: &mut Context, + sender: WeakAddr, + to: Uuid, + content: String, + ) { + println!("[ClientManager] sending message to client"); + let client_addr: Vec> = + self.clients.iter().map(|(_, v)| v).cloned().collect(); + + let collection = tokio_stream::iter(client_addr.clone()) + .then(|addr| addr.send(ClientDataMessage::Details)) + .map(|val| val.unwrap()) + .map(|val: ClientDataResponse| { + if let Details(details) = val { + details + } else { + ClientDetails::default() + } + }) + .collect(); + + let fut = wrap_future(async move { + if let Some(sender) = sender.upgrade() { + let sender_details: ClientDataResponse = + sender.send(ClientDataMessage::Details).await.unwrap(); + + let from = if let Details(details) = sender_details { + details.uuid + } else { + ClientDetails::default().uuid + }; + + let client_details: Vec = collection.await; + let pos = client_details.iter().position(|i| i.uuid == to); + if let Some(pos) = pos { + client_addr[pos] + .send(ClientMessage::ClientlySentMessage { content, from }) + .await + .expect("TODO: panic message"); + } + } + }); + + ctx.spawn(fut); + } + + pub(crate) fn send_global_message_request( + &self, + ctx: &mut Context, + sender: WeakAddr, + content: String, + ) { + println!("[ClientManager] sending message to client"); + use crate::client_management::client::ClientMessage::GloballySentMessage; + + let client_addr: Vec> = + self.clients.iter().map(|(_, v)| v).cloned().collect(); + + if let Some(sender) = sender.upgrade() { + let cm = self.chat_manager.clone(); + + let snd1 = sender.clone(); + let snd2 = sender; + + let cont1 = content.clone(); + let cont2 = content; + + let fut = wrap_future(async move { + println!("[ClientManager] sending to all clients"); + let details: ClientDataResponse = + snd1.send(ClientDataMessage::Details).await.unwrap(); + + let from = if let Details(details) = details { + details.uuid + } else { + ClientDetails::default().uuid + }; + + let collection = tokio_stream::iter(client_addr) + .then(move |addr| { + addr.send(GloballySentMessage { + content: cont1.clone(), + from, + }) + }) + .collect(); + let _: Vec<_> = collection.await; + }); + + let chat_manager_fut = wrap_future(async move { + println!("[ClientManager] storing in chat manager"); + let details: ClientDataResponse = + snd2.send(ClientDataMessage::Details).await.unwrap(); + + let from = if let Details(details) = details { + details.uuid + } else { + ClientDetails::default().uuid + }; + + let _ = cm.send(ChatManagerMessage::AddMessage(from, cont2)).await; + }); + ctx.spawn(fut); + ctx.spawn(chat_manager_fut); + } + } + + fn add_client( + &mut self, + ctx: &mut Context, + uuid: Uuid, + addr: Addr, + ) { + println!("[ClientManager] adding client"); + use crate::prelude::messages::ObservableMessage::Subscribe; + let recp = ctx.address().recipient::(); + println!("[ClientManager] sending subscribe message to client"); + addr.do_send(Subscribe(recp.downgrade())); + self.clients.insert(uuid, addr); + } + + fn remove_client(&mut self, ctx: &mut Context, uuid: Uuid) { + println!("[ClientManager] removing client"); + use crate::prelude::messages::ObservableMessage::Unsubscribe; + let recp = ctx.address().recipient::(); + if let Some(addr) = self.clients.remove(&uuid) { + println!("[ClientManager] sending unsubscribe message to client"); + addr.do_send(Unsubscribe(recp.downgrade())); + } + } + + fn disconnect_client( + &mut self, + ctx: &mut Context, + uuid: Uuid, + ) { + println!("[ClientManager] disconnecting client"); + use crate::prelude::messages::ObservableMessage::Unsubscribe; + let recp = ctx.address().recipient::(); + if let Some(addr) = self.clients.remove(&uuid) { + addr.do_send(Unsubscribe(recp.downgrade())); + } + } +} + +impl Actor for ClientManager { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + println!("[ClientManager] started"); + } +} + +impl Handler for ClientManager { + type Result = (); + fn handle( + &mut self, + msg: ClientManagerMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + use ClientManagerMessage::{AddClient, RemoveClient}; + match msg { + // todo: Add subscription to the client. + AddClient(uuid, addr) => self.add_client(ctx, uuid, addr), + // todo: remove subscription to client. + RemoveClient(uuid) => self.remove_client(ctx, uuid), + } + } +} + +impl Handler for ClientManager { + type Result = (); + + fn handle( + &mut self, + msg: ClientObservableMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + use crate::client_management::client::ClientObservableMessage::{ + Disconnecting, + GetClients, + GetGlobalMessages, + GlobalMessage, + Message, + }; + match msg { + Message(sender, to, content) => { + self.send_message_request(ctx, sender, to, content) + } + GlobalMessage(sender, content) => { + self.send_global_message_request(ctx, sender, content) + } + GetClients(sender) => self.send_client_list(ctx, sender), + GetGlobalMessages(sender) => self.send_global_messages(ctx, sender), + Disconnecting(uuid) => self.disconnect_client(ctx, uuid), + } + } +} + +impl Handler for ClientManager { + type Result = ClientManagerDataResponse; + + fn handle( + &mut self, + msg: ClientManagerDataMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + match msg { + ClientManagerDataMessage::ClientCount => { + ClientCount(self.clients.values().count()) + } + ClientManagerDataMessage::Clients => { + Clients(self.clients.values().map(|a| a.downgrade()).collect()) + } + } + } +} diff --git a/server/src/client_management/messages.rs b/server/src/client_management/messages.rs new file mode 100644 index 0000000..067dd85 --- /dev/null +++ b/server/src/client_management/messages.rs @@ -0,0 +1,32 @@ +use actix::{Addr, Message, MessageResponse, WeakAddr}; +use uuid::Uuid; + +use crate::client_management::{client::Client, ClientManager}; + +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum ClientManagerMessage { + AddClient(Uuid, Addr), + #[allow(dead_code)] + RemoveClient(Uuid), +} + +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum ClientManagerOutput { + #[allow(dead_code)] + UpdateRequest(Addr), +} + +#[derive(Message)] +#[rtype(result = "ClientManagerDataResponse")] +pub enum ClientManagerDataMessage { + ClientCount, + Clients, +} + +#[derive(MessageResponse)] +pub enum ClientManagerDataResponse { + ClientCount(usize), + Clients(Vec>), +} diff --git a/server/src/client_management/mod.rs b/server/src/client_management/mod.rs new file mode 100644 index 0000000..5e69979 --- /dev/null +++ b/server/src/client_management/mod.rs @@ -0,0 +1,28 @@ +//! Contains code that handles the lifecycle of connected clients +//! +//! This collects all parts used by the client manager actor +//! +//! It's responsibility is: +//! - to handle client to client communication. +//! - to handle server to client communication. +//! - to handler client lifecycle events such as dicconection. + +mod chat_manager; +pub mod client; +mod client_manager; +mod messages; + +#[allow(unused_imports)] +use chat_manager::{ + ChatManager, + ChatManagerDataMessage, + ChatManagerDataResponse, + ChatManagerMessage, +}; +pub(crate) use client_manager::ClientManager; +pub(crate) use messages::{ + ClientManagerDataMessage, + ClientManagerDataResponse, + ClientManagerMessage, + ClientManagerOutput, +}; diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs deleted file mode 100644 index a3f4d51..0000000 --- a/server/src/client_manager.rs +++ /dev/null @@ -1,114 +0,0 @@ -// use crate::lib::server::ServerMessages; -use foundation::prelude::IPreemptive; -use std::collections::HashMap; -use std::mem::replace; -use std::sync::Arc; -use std::sync::Mutex; - -use crossbeam_channel::{unbounded, Receiver, Sender}; -use uuid::Uuid; - -use crate::client::Client; -use crate::messages::ClientMessage; -use crate::messages::ClientMgrMessage; -use crate::messages::ServerMessage; -use foundation::prelude::IMessagable; - -/// # ClientManager -/// This struct manages all connected users -#[derive(Debug)] -pub struct ClientManager { - clients: Mutex>>, - - server_channel: Mutex>, - - sender: Sender, - receiver: Receiver, -} - -impl ClientManager { - pub fn new(server_channel: Sender) -> Arc { - let (sender, receiver) = unbounded(); - - Arc::new(ClientManager { - clients: Mutex::default(), - - server_channel: Mutex::new(server_channel), - - sender, - receiver, - }) - } -} - -impl IMessagable> for ClientManager { - fn send_message(&self, msg: ClientMgrMessage) { - self.sender.send(msg).unwrap(); - } - fn set_sender(&self, sender: Sender) { - let mut server_lock = self.server_channel.lock().unwrap(); - let _ = replace(&mut *server_lock, sender); - } -} - -impl IPreemptive for ClientManager { - fn run(arc: &Arc) { - loop { - std::thread::sleep(std::time::Duration::from_secs(1)); - - if !arc.receiver.is_empty() { - for message in arc.receiver.try_iter() { - println!("[Client manager]: recieved message: {:?}", message); - use ClientMgrMessage::{Add, Remove, SendMessage, SendClients}; - - match message { - Add(client) => { - println!("[Client Manager]: adding new client"); - Client::start(&client); - let mut lock = arc.clients.lock().unwrap(); - if lock.insert(client.uuid, client).is_none() { - println!("value is new"); - } - }, - Remove(uuid) => { - println!("[Client Manager]: removing client: {:?}", &uuid); - if let Some(client) = - arc.clients.lock().unwrap().remove(&uuid) - { - client.send_message(ClientMessage::Disconnect); - } - }, - SendMessage { to, from, content } => { - let lock = arc.clients.lock().unwrap(); - if let Some(client) = lock.get(&to) { - client.send_message(ClientMessage::Message { - from, - content, - }) - } - }, - SendClients {to} => { - let lock = arc.clients.lock().unwrap(); - if let Some(client) = lock.get(&to) { - let clients_vec: Vec> = lock.values().cloned().collect(); - - client.send_message(ClientMessage::Update { - clients: clients_vec, - }) - } - }, - - - #[allow(unreachable_patterns)] - _ => println!("[Client manager]: not implemented"), - } - } - } - } - } - - fn start(arc: &Arc) { - let arc = arc.clone(); - std::thread::spawn(move || ClientManager::run(&arc)); - } -} diff --git a/server/src/config_manager/arg_parser.rs b/server/src/config_manager/arg_parser.rs new file mode 100644 index 0000000..05519a3 --- /dev/null +++ b/server/src/config_manager/arg_parser.rs @@ -0,0 +1,14 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Arguments { + #[clap(short, long, value_parser = clap::value_parser!(u16).range(1..))] + pub port: Option, + + #[clap(short, long, value_parser)] + pub name: Option, + + #[clap(short, long, value_parser)] + pub owner: Option, +} diff --git a/server/src/config_manager/builder.rs b/server/src/config_manager/builder.rs new file mode 100644 index 0000000..e279fce --- /dev/null +++ b/server/src/config_manager/builder.rs @@ -0,0 +1,32 @@ +use actix::{Actor, Addr}; + +use crate::config_manager::{arg_parser::Arguments, ConfigManager}; + +pub(super) struct Builder { + pub(super) file_path: String, + pub(super) args: Option, +} + +impl Builder { + pub(super) fn new() -> Self { + Self { + file_path: "./config_file.toml".to_owned(), + args: None, + } + } + + #[allow(dead_code)] + pub fn config_path(mut self, path: impl Into) -> Self { + self.file_path = path.into(); + self + } + + pub fn args(mut self, args: Arguments) -> Self { + self.args.replace(args); + self + } + + pub(super) fn build(self) -> Addr { + ConfigManager::from(self).start() + } +} diff --git a/server/src/config_manager/config_manager.rs b/server/src/config_manager/config_manager.rs new file mode 100644 index 0000000..58d888b --- /dev/null +++ b/server/src/config_manager/config_manager.rs @@ -0,0 +1,175 @@ +use std::{ + collections::BTreeMap, + fs::{File, OpenOptions}, + io::Read, + sync::Once, +}; + +use actix::{Actor, Addr, Context, Handler, Recipient}; +use clap::Parser; +use toml::Value; + +use crate::{ + config_manager::{ + arg_parser::Arguments, + builder::Builder, + messages::{ + ConfigManagerDataMessage, + ConfigManagerDataResponse, + ConfigManagerOutput, + }, + types::ConfigValue::{Dict, Number, String as ConfigString}, + ConfigValue, + }, + prelude::messages::ObservableMessage, +}; + +static mut SHARED: Option> = None; +static INIT: Once = Once::new(); + +#[allow(dead_code)] +pub(crate) struct ConfigManager { + file: File, + stored: ConfigValue, + root: ConfigValue, + subscribers: Vec>>, +} + +// static methods +impl ConfigManager { + pub fn shared() -> Addr { + INIT.call_once(|| { + let builder = Self::create().args(Arguments::parse()).build(); + unsafe { SHARED = Some(builder) } + }); + unsafe { SHARED.clone().unwrap() } + } + + pub(super) fn create() -> Builder { + Builder::new() + } +} + +// instance methods +impl ConfigManager { + pub fn get_value(&self, key: String) -> Option { + if let Dict(dict) = &self.root { + dict.get(&key).cloned() + } else { + None + } + } + + pub fn set_value( + &mut self, + key: String, + value: Option, + ) -> Option { + value.and_then(|value| { + if let (Dict(stored), Dict(root)) = (&mut self.stored, &mut self.root) { + stored.insert(key.clone(), value.clone()); + root.insert(key.clone(), value.clone()); + Some(value) + } else { + None + } + }) + } + + // this doesn't work for now + pub fn soft_set_value( + &mut self, + key: String, + value: Option, + ) -> Option { + value.and_then(|value| { + if let Dict(root) = &mut self.root { + root.insert(key, value.clone()); + Some(value) + } else { + None + } + }) + } +} + +impl Actor for ConfigManager { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + println!("[ConfigManager] starting"); + println!("[ConfigManager] started"); + } +} + +impl Handler for ConfigManager { + type Result = ConfigManagerDataResponse; + + fn handle( + &mut self, + msg: ConfigManagerDataMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + use ConfigManagerDataResponse::{GotValue, SetValue, SoftSetValue}; + + match msg { + ConfigManagerDataMessage::GetValue(val) => GotValue(self.get_value(val)), + ConfigManagerDataMessage::SetValue(key, value) => { + SetValue(key.clone(), self.set_value(key, value)) + } + ConfigManagerDataMessage::SoftSetValue(key, value) => { + SoftSetValue(key.clone(), self.soft_set_value(key, value)) + } + } + } +} + +impl From for ConfigManager { + fn from(builder: Builder) -> Self { + println!("got args: {:#?}", builder.args); + + let mut file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(builder.file_path) + .ok() + .unwrap(); + + let mut output = String::new(); + file + .read_to_string(&mut output) + .expect("failed to read from file"); + + let stored = output + .parse::() + .map(|v| v.into()) + .ok() + .unwrap_or_else(|| Dict(BTreeMap::new())); + + println!("[ConfigManager] got stored: {:?}", stored); + + let mut root = stored.clone(); + if let Dict(root) = &mut root { + builder.args.map(|v| { + v.port + .map(|p| root.insert("Network.Port".to_owned(), Number(p.into()))); + + v.name.map(|n| { + root.insert("Server.Name".to_owned(), ConfigString(n.into())) + }); + + v.owner.map(|o| { + root.insert("Server.Owner".to_owned(), ConfigString(o.into())) + }); + }); + } + + Self { + file, + root, + stored, + subscribers: Vec::default(), + } + } +} diff --git a/server/src/config_manager/messages.rs b/server/src/config_manager/messages.rs new file mode 100644 index 0000000..6552415 --- /dev/null +++ b/server/src/config_manager/messages.rs @@ -0,0 +1,27 @@ +use actix::{Message, MessageResponse}; + +use crate::config_manager::types::ConfigValue; + +#[derive(Message, Debug)] +#[rtype(result = "()")] +pub enum ConfigManagerOutput { + #[allow(dead_code)] + ConfigUpdated(String, ConfigValue), +} + +#[derive(Message, Debug)] +#[rtype(result = "ConfigManagerDataResponse")] +pub enum ConfigManagerDataMessage { + GetValue(String), + #[allow(dead_code)] + SetValue(String, Option), + #[allow(dead_code)] + SoftSetValue(String, Option), +} + +#[derive(MessageResponse, Debug)] +pub enum ConfigManagerDataResponse { + GotValue(Option), + SetValue(String, Option), + SoftSetValue(String, Option), +} diff --git a/server/src/config_manager/mod.rs b/server/src/config_manager/mod.rs new file mode 100644 index 0000000..15096ea --- /dev/null +++ b/server/src/config_manager/mod.rs @@ -0,0 +1,16 @@ +//! # config_manager +//! This module contains all the code that deals with server configuration. +//! It tries to implement a singleton actor, that will be fetchable globaly. + +pub mod arg_parser; +mod builder; +mod config_manager; +mod messages; +mod types; + +pub(crate) use config_manager::ConfigManager; +pub(crate) use messages::{ + ConfigManagerDataMessage, + ConfigManagerDataResponse, +}; +pub(crate) use types::ConfigValue; diff --git a/server/src/config_manager/types.rs b/server/src/config_manager/types.rs new file mode 100644 index 0000000..4a9f8b0 --- /dev/null +++ b/server/src/config_manager/types.rs @@ -0,0 +1,51 @@ +use std::collections::BTreeMap; + +use toml::value::Value; + +/// # ConfigValue +/// Each value type that can be used within a config file. +/// gets used when reading and writing to a config file. +#[derive(Clone, Debug)] +pub enum ConfigValue { + Dict(BTreeMap), + Array(Vec), + String(String), + Number(i64), + Float(f64), + Bool(bool), +} + +impl From for Value { + fn from(v: ConfigValue) -> Self { + match v { + ConfigValue::Dict(dict) => { + Value::Table(dict.into_iter().map(|(k, v)| (k, v.into())).collect()) + } + ConfigValue::Array(arr) => { + Value::Array(arr.into_iter().map(|v| v.into()).collect()) + } + ConfigValue::String(s) => Value::String(s), + ConfigValue::Number(n) => Value::Integer(n), + ConfigValue::Float(f) => Value::Float(f), + ConfigValue::Bool(b) => Value::Boolean(b), + } + } +} + +impl From for ConfigValue { + fn from(v: Value) -> Self { + match v { + Value::Table(dict) => ConfigValue::Dict( + dict.into_iter().map(|(k, v)| (k, v.into())).collect(), + ), + Value::Array(arr) => { + ConfigValue::Array(arr.into_iter().map(|v| v.into()).collect()) + } + Value::String(s) => ConfigValue::String(s), + Value::Integer(n) => ConfigValue::Number(n), + Value::Float(f) => ConfigValue::Float(f), + Value::Boolean(b) => ConfigValue::Bool(b), + Value::Datetime(d) => ConfigValue::String(d.to_string()), + } + } +} diff --git a/server/src/lua/builder.rs b/server/src/lua/builder.rs new file mode 100644 index 0000000..c07e559 --- /dev/null +++ b/server/src/lua/builder.rs @@ -0,0 +1,32 @@ +use actix::{Addr, WeakAddr}; + +use crate::{ + client_management::ClientManager, + lua::lua_manager::LuaManager, + network::NetworkManager, + Server, +}; + +pub struct Builder { + pub(super) server: WeakAddr, + pub(super) network_manager: WeakAddr, + pub(super) client_manager: WeakAddr, +} + +impl Builder { + pub(super) fn new( + server: WeakAddr, + network_manager: WeakAddr, + client_manager: WeakAddr, + ) -> Self { + Builder { + server, + network_manager, + client_manager, + } + } + + pub(crate) fn build(self) -> Addr { + Addr::from(self) + } +} diff --git a/server/src/lua/lua_manager.rs b/server/src/lua/lua_manager.rs new file mode 100644 index 0000000..28d7faa --- /dev/null +++ b/server/src/lua/lua_manager.rs @@ -0,0 +1,81 @@ +//! # lua_manager.rs +//! +//! Holds the LuaManger struct and implements it's methods + +use actix::{fut::wrap_future, Actor, Addr, AsyncContext, Context, WeakAddr}; +use mlua::{Lua, Thread}; + +use crate::{ + client_management::ClientManager, + lua::builder::Builder, + network::NetworkManager, + scripting::scriptable_server::ScriptableServer, + Server, +}; + +/// # LuaManager +/// Holds common server objects +/// todo: change to weak references +pub struct LuaManager { + pub(super) server: WeakAddr, + pub(super) _network_manager: WeakAddr, + pub(super) _client_manager: WeakAddr, +} + +impl LuaManager { + pub fn create( + server: WeakAddr, + network_manager: WeakAddr, + client_manager: WeakAddr, + ) -> Builder { + Builder::new(server, network_manager, client_manager) + } + + fn create_lua(&self) -> Lua { + let engine = Lua::new(); + let server = ScriptableServer::from(self.server.clone()); + + let api = engine.create_table().unwrap(); + api.set::<&str, ScriptableServer>("server", server).unwrap(); + + engine.globals().set("chat", api).unwrap(); + engine + } +} + +impl Actor for LuaManager { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let engine = self.create_lua(); + + ctx.spawn(wrap_future(async move { + let coroutine: Thread = engine + .load( + r#" + coroutine.create(function () + print("hello lua") + print(chat.server:name()) + end) + "#, + ) + .eval() + .unwrap(); + let coroutine = coroutine.into_async::<(), ()>(()); + coroutine.await.expect("TODO: panic message"); + })); + } +} + +// by implementing it for the addr type, +// we enforce the actor model on the consumer of the api. +impl From for Addr { + fn from(b: Builder) -> Addr { + LuaManager { + server: b.server, + _network_manager: b.network_manager, + _client_manager: b.client_manager, + } + .start() + } +} diff --git a/server/src/lua/mod.rs b/server/src/lua/mod.rs new file mode 100644 index 0000000..e75a513 --- /dev/null +++ b/server/src/lua/mod.rs @@ -0,0 +1,4 @@ +mod builder; +mod lua_manager; + +pub use lua_manager::LuaManager; diff --git a/server/src/main.rs b/server/src/main.rs index dfc409f..a4b3ea1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,29 +1,23 @@ -pub mod client; -pub mod client_manager; -pub mod messages; -pub mod network_manager; -pub mod server; +//! This is the main module of the actix server. +//! It starts the server and sleeps for the remainder of the program -use clap::{App, Arg}; +pub(crate) mod client_management; +pub(crate) mod config_manager; +pub(crate) mod lua; +pub(crate) mod network; +pub(crate) mod prelude; +pub(crate) mod rhai; +pub(crate) mod scripting; +pub(crate) mod server; -use foundation::prelude::IPreemptive; use server::Server; +use tokio::time::{sleep, Duration}; -fn main() { - let _args = App::new("--rust chat server--") - .version("0.1.5") - .author("Mitchel Hardie , Michael Bailey ") - .about("this is a chat server developed in rust, depending on the version one of two implementations will be used") - .arg( - Arg::with_name("config") - .short("p") - .long("port") - .value_name("PORT") - .help("sets the port the server runs on.") - .takes_value(true)) - .get_matches(); - - let server = Server::new(); - - Server::run(&server); +/// The main function +#[actix::main()] +async fn main() { + let _init = Server::create().build(); + loop { + sleep(Duration::from_millis(1000)).await; + } } diff --git a/server/src/messages.rs b/server/src/messages.rs deleted file mode 100644 index f5d2e11..0000000 --- a/server/src/messages.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::sync::Arc; -use uuid::Uuid; - -use crate::client::Client; - -#[derive(Debug)] -pub enum ClientMessage { - Message { from: Uuid, content: String }, - - Update {clients: Vec>}, - - Disconnect, -} - -#[derive(Debug)] -pub enum ClientMgrMessage { - Remove(Uuid), - Add(Arc), - SendClients {to: Uuid}, - SendMessage { - from: Uuid, - to: Uuid, - content: String, - }, -} - -#[derive(Debug)] -pub enum ServerMessage { - ClientConnected(Arc), - ClientSendMessage { - from: Uuid, - to: Uuid, - content: String, - }, - ClientDisconnected(Uuid), - ClientUpdate(Uuid), -} diff --git a/server/src/network/connection/actor.rs b/server/src/network/connection/actor.rs new file mode 100644 index 0000000..98dbeb1 --- /dev/null +++ b/server/src/network/connection/actor.rs @@ -0,0 +1,262 @@ +use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; + +use actix::{ + clock::timeout, + fut::wrap_future, + Actor, + ActorContext, + ActorFutureExt, + Addr, + AsyncContext, + Context, + Handler, + SpawnHandle, + WeakRecipient, +}; +use futures::{future::join_all, stream::Buffered, Future, FutureExt}; +use tokio::{ + io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, + net::TcpStream, + sync::Mutex, +}; + +use super::{ConnectionMessage, ConnectionObservableOutput}; +use crate::{ + network::connection::messages::ConnectionPrivateMessage, + prelude::messages::ObservableMessage, +}; + +/// # Connection +/// This manages a TcpStream for a given connection. +/// +/// ## Fields +/// - read_half: A temporary store fr the read half of the connection. +/// - write_half: The write half of the connection. +/// - address: The socket address of the conneciton. +/// - observers: A list of observers to events created by the connection. +/// - loop_future: the future holding the receiving loop. +pub struct Connection { + write_half: Arc>>, + address: SocketAddr, + observers: Vec>, +} + +impl Connection { + /// Creates a new Conneciton actor from a Tokio TcpStream, + /// and start's its execution. + /// returns: the Addr of the connection. + pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr { + let (read_half, write_half) = split(stream); + let addr = Connection { + write_half: Arc::new(Mutex::new(write_half)), + address, + observers: Vec::new(), + } + .start(); + addr.do_send(ConnectionPrivateMessage::DoRead(BufReader::new(read_half))); + addr + } + + #[inline] + fn broadcast( + &self, + ctx: &mut ::Context, + data: ConnectionObservableOutput, + ) { + let futs: Vec + Send>>> = self + .observers + .iter() + .cloned() + .map(|r| { + let data = data.clone(); + async move { + if let Some(r) = r.upgrade() { + let _ = r.send(data).await; + } + } + .boxed() + }) + .collect(); + let _ = ctx.spawn(wrap_future(async { + join_all(futs).await; + })); + } + + #[inline] + fn do_read( + &mut self, + ctx: &mut ::Context, + mut buf_reader: BufReader>, + ) { + let address = self.address; + let weak_addr = ctx.address().downgrade(); + + let read_fut = async move { + let dur = Duration::from_millis(100); + let mut buffer_string: String = Default::default(); + + let read_fut = buf_reader.read_line(&mut buffer_string); + let Ok(Ok(len)) = timeout(dur, read_fut).await else { + println!("[Connection] timeout reached"); + if let Some(addr) = weak_addr.upgrade() { + addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); + } + return; + }; + + if len == 0 { + println!("[Connection] readline returned 0"); + return; + } + + if let Some(addr) = weak_addr.upgrade() { + let _ = addr + .send(ConnectionPrivateMessage::Broadcast( + ConnectionObservableOutput::RecvData( + addr.downgrade(), + address, + buffer_string.clone(), + ), + )) + .await; + } + + if let Some(addr) = weak_addr.upgrade() { + addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader)); + } + }; + ctx.spawn(wrap_future(read_fut)); + } +} + +impl Actor for Connection { + type Context = Context; + + /// runs when the actor is started. + /// takes out eh read_half ad turns it into a buffered reader + /// then eneters loop readling lines from the tcp stream + fn started(&mut self, ctx: &mut Self::Context) { + println!("[Connection] started"); + } + + fn stopped(&mut self, ctx: &mut Self::Context) { + use ConnectionObservableOutput::ConnectionClosed; + println!("[Connection] stopped"); + for recp in self.observers.iter() { + if let Some(recp) = recp.upgrade() { + recp.do_send(ConnectionClosed(ctx.address().downgrade())) + } + } + } +} + +impl Handler> for Connection { + type Result = (); + fn handle( + &mut self, + msg: ObservableMessage, + _ctx: &mut Self::Context, + ) -> >>::Result{ + use ObservableMessage::{Subscribe, Unsubscribe}; + match msg { + Subscribe(r) => { + println!("[Connection] adding subscriber"); + self.observers.push(r); + } + Unsubscribe(r) => { + println!("[Connection] removing subscriber"); + let r = r.upgrade(); + self.observers = self + .observers + .clone() + .into_iter() + .filter(|a| a.upgrade() != r) + .collect(); + } + }; + } +} + +impl Handler for Connection { + type Result = (); + fn handle( + &mut self, + msg: ConnectionMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + use ConnectionMessage::{CloseConnection, SendData}; + let writer = Arc::downgrade(&self.write_half); + + match msg { + SendData(d) => { + ctx.spawn(wrap_future(async move { + let Some(writer) = writer.upgrade() else { + return; + }; + + println!("[Connection] sending data"); + let mut lock = writer.lock().await; + let mut buffer = Vec::new(); + let _ = writeln!(&mut buffer, "{}", d.as_str()); + let _ = lock.write_all(&buffer).await; + })); + } + CloseConnection => ctx.stop(), + }; + } +} + +// impl Handler for Connection { +// type Result = (); +// fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result { +// use ConnectionObservableOutput::RecvData; +// use SelfMessage::UpdateObserversWithData; +// match msg { +// UpdateObserversWithData(data) => { +// let send = ctx.address(); +// let addr = self.address; +// // this is a mess +// let futs: Vec + Send>>> = self +// .observers +// .iter() +// .cloned() +// .map(|r| { +// let send = send.clone(); +// let data = data.clone(); +// async move { +// let _ = r.send(RecvData(send, addr, data)).await; +// } +// .boxed() +// }) +// .collect(); +// let _ = ctx.spawn(wrap_future(async { +// join_all(futs).await; +// })); +// } +// }; +// } +// } + +impl Handler for Connection { + type Result = (); + + fn handle( + &mut self, + msg: ConnectionPrivateMessage, + ctx: &mut Self::Context, + ) -> Self::Result { + use ConnectionPrivateMessage::Broadcast; + match msg { + Broadcast(data) => self.broadcast(ctx, data), + ConnectionPrivateMessage::DoRead(buf_reader) => { + self.do_read(ctx, buf_reader) + } + }; + } +} + +impl Drop for Connection { + fn drop(&mut self) { + println!("[Connection] Dropping value") + } +} diff --git a/server/src/network/connection/messages.rs b/server/src/network/connection/messages.rs new file mode 100644 index 0000000..c8005ce --- /dev/null +++ b/server/src/network/connection/messages.rs @@ -0,0 +1,31 @@ +use std::net::SocketAddr; + +use actix::{Message, WeakAddr}; +use tokio::{ + io::{BufReader, ReadHalf}, + net::TcpStream, +}; + +use crate::prelude::actors::Connection; + +/// This is a message that can be sent to the Connection. +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum ConnectionMessage { + SendData(String), + CloseConnection, +} + +#[derive(Message, Clone)] +#[rtype(result = "()")] +pub(crate) enum ConnectionObservableOutput { + RecvData(WeakAddr, SocketAddr, String), + ConnectionClosed(WeakAddr), +} + +#[derive(Message)] +#[rtype(result = "()")] +pub(super) enum ConnectionPrivateMessage { + Broadcast(ConnectionObservableOutput), + DoRead(BufReader>), +} diff --git a/server/src/network/connection/mod.rs b/server/src/network/connection/mod.rs new file mode 100644 index 0000000..4ef6ddd --- /dev/null +++ b/server/src/network/connection/mod.rs @@ -0,0 +1,5 @@ +mod actor; +mod messages; + +pub(crate) use actor::Connection; +pub(crate) use messages::{ConnectionMessage, ConnectionObservableOutput}; diff --git a/server/src/network/connection_initiator/actor.rs b/server/src/network/connection_initiator/actor.rs new file mode 100644 index 0000000..cb7e10c --- /dev/null +++ b/server/src/network/connection_initiator/actor.rs @@ -0,0 +1,171 @@ +use std::net::SocketAddr; + +use actix::{ + Actor, + ActorContext, + Addr, + AsyncContext, + Context, + Handler, + WeakAddr, + WeakRecipient, +}; +use foundation::{ + messages::{ + client::{ClientStreamOut, ClientStreamOut::Error}, + network::{NetworkSockIn, NetworkSockOut}, + }, + ClientDetails, +}; +use serde_json::{from_str, to_string}; + +use crate::{ + network::InitiatorOutput, + prelude::{ + actors::Connection, + messages::{ + ConnectionMessage, + ConnectionObservableOutput, + ObservableMessage, + }, + }, +}; + +/// # ConnectionInitiator +/// Handles the initiatin of a new connection. +/// +/// This will do one of two things: +/// - Create a new client and send it to the network manager. +/// - Request the eserver info and send it to the connection. +pub struct ConnectionInitiator { + delegate: WeakRecipient, + connection: Addr, +} + +impl ConnectionInitiator { + pub(crate) fn new( + delegate: WeakRecipient, + connection: Addr, + ) -> Addr { + ConnectionInitiator { + connection, + delegate, + } + .start() + } + + fn handle_request( + &mut self, + sender: WeakAddr, + ctx: &mut ::Context, + _address: SocketAddr, + data: String, + ) { + use InitiatorOutput::{ClientRequest, InfoRequest}; + use NetworkSockIn::{Connect, Info}; + + let msg = from_str::(data.as_str()); + if let Err(e) = msg.as_ref() { + println!("[ConnectionInitiator] error decoding message {}", e); + self.error(ctx, sender); + return; + } + let msg = msg.unwrap(); + + println!("[ConnectionInitiator] matching request"); + if let (Some(delegate), Some(sender)) = + (self.delegate.upgrade(), sender.upgrade()) + { + match msg { + Info => { + delegate.do_send(InfoRequest(ctx.address().downgrade(), sender)) + } + Connect { + uuid, + username, + address, + } => delegate.do_send(ClientRequest( + ctx.address().downgrade(), + sender, + ClientDetails { + uuid, + username, + address, + public_key: None, + }, + )), + }; + ctx.stop(); + } + } + + fn error( + &mut self, + ctx: &mut ::Context, + sender: WeakAddr, + ) { + use ConnectionMessage::{CloseConnection, SendData}; + if let Some(sender) = sender.upgrade() { + sender.do_send(SendData( + to_string::(&Error { + msg: "Error in connection initiator?".to_owned(), + }) + .unwrap(), + )); + sender.do_send(CloseConnection); + } + ctx.stop() + } +} + +impl Actor for ConnectionInitiator { + type Context = Context; + + /// on start initiate the protocol. + /// also add self as a subscriber to the connection. + fn started(&mut self, ctx: &mut Self::Context) { + use ConnectionMessage::SendData; + use NetworkSockOut::Request; + use ObservableMessage::Subscribe; + + println!("[ConnectionInitiator] started"); + + self + .connection + .do_send(Subscribe(ctx.address().recipient().downgrade())); + + self + .connection + .do_send(SendData(to_string(&Request).unwrap())); + } + + /// once stopped remove self from the connection subscribers + fn stopped(&mut self, ctx: &mut Self::Context) { + use ObservableMessage::Unsubscribe; + println!("[ConnectionInitiator] stopped"); + self + .connection + .do_send(Unsubscribe(ctx.address().recipient().downgrade())); + } +} + +impl Handler for ConnectionInitiator { + type Result = (); + fn handle( + &mut self, + msg: ConnectionObservableOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use ConnectionObservableOutput::RecvData; + + if let RecvData(sender, addr, data) = msg { + self.handle_request(sender, ctx, addr, data) + } + } +} + +impl Drop for ConnectionInitiator { + fn drop(&mut self) { + println!("[ConnectionInitiator] Dropping value") + } +} diff --git a/server/src/network/connection_initiator/messages.rs b/server/src/network/connection_initiator/messages.rs new file mode 100644 index 0000000..561c70b --- /dev/null +++ b/server/src/network/connection_initiator/messages.rs @@ -0,0 +1,15 @@ +use actix::{Addr, Message, WeakAddr}; +use foundation::ClientDetails; + +use crate::prelude::actors::{Connection, ConnectionInitiator}; + +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) enum InitiatorOutput { + InfoRequest(WeakAddr, Addr), + ClientRequest( + WeakAddr, + Addr, + ClientDetails, + ), +} diff --git a/server/src/network/connection_initiator/mod.rs b/server/src/network/connection_initiator/mod.rs new file mode 100644 index 0000000..07e1a19 --- /dev/null +++ b/server/src/network/connection_initiator/mod.rs @@ -0,0 +1,5 @@ +mod actor; +mod messages; + +pub(crate) use actor::ConnectionInitiator; +pub(crate) use messages::InitiatorOutput; diff --git a/server/src/network/listener/mod.rs b/server/src/network/listener/mod.rs new file mode 100644 index 0000000..7d254cf --- /dev/null +++ b/server/src/network/listener/mod.rs @@ -0,0 +1,108 @@ +use std::net::{SocketAddr, ToSocketAddrs}; + +use actix::{ + fut::wrap_future, + Actor, + Addr, + AsyncContext, + Context, + Handler, + Message, + SpawnHandle, + WeakRecipient, +}; +use tokio::net::TcpListener; + +use crate::network::connection::Connection; + +#[derive(Message)] +#[rtype(result = "()")] +pub(super) enum ListenerMessage { + StartListening, + StopListening, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub(super) enum ListenerOutput { + NewConnection(Addr), +} + +pub(super) struct NetworkListener { + address: SocketAddr, + delegate: WeakRecipient, + looper: Option, +} + +impl NetworkListener { + pub(crate) fn new( + address: T, + delegate: WeakRecipient, + ) -> Addr { + NetworkListener { + address: address + .to_socket_addrs() + .unwrap() + .collect::>()[0], + delegate, + looper: None, + } + .start() + } + + /// called when the actor is to start listening + fn start_listening(&mut self, ctx: &mut ::Context) { + println!("[NetworkListener] started listening"); + let addr = self.address; + let delegate = self.delegate.clone(); + ctx.spawn(wrap_future(async move { + use ListenerOutput::NewConnection; + let listener = TcpListener::bind(addr).await.unwrap(); + while let Ok((stream, addr)) = listener.accept().await { + println!("[NetworkListener] accepted socket"); + let conn = Connection::new(stream, addr); + + let Some(delegate) = delegate.upgrade() else { + break; + }; + + delegate.do_send(NewConnection(conn)) + } + })); + } + + /// called when the actor is to stop listening + fn stop_listening(&mut self, ctx: &mut ::Context) { + println!("[NetworkListener] stopped listening"); + if let Some(fut) = self.looper.take() { + ctx.cancel_future(fut); + } + } +} + +impl Actor for NetworkListener { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + println!("[NetworkListener] started"); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + println!("[NetworkListener] stopped"); + } +} + +impl Handler for NetworkListener { + type Result = (); + fn handle( + &mut self, + msg: ListenerMessage, + ctx: &mut ::Context, + ) -> Self::Result { + use ListenerMessage::{StartListening, StopListening}; + match msg { + StartListening => self.start_listening(ctx), + StopListening => self.stop_listening(ctx), + } + } +} diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs new file mode 100644 index 0000000..6e429da --- /dev/null +++ b/server/src/network/mod.rs @@ -0,0 +1,48 @@ +#![doc = r"# Network + +This module contains network code for the server. + +This includes: +- The network manager: For that handles all server network connections. +- The network listener: For listening for connections on a port. +- The conneciton: An abstraction over sockets sockets, for actix. +- The connection initiator: For initiating new connections to the server + +## Diagrams + +```mermaid +sequenceDiagram + Server->>NetworkManager: creates + NetworkManager->>NetworkListener: create + NetworkManager->>+NetworkListener: start listening + + loop async tcp listen + NetworkListener->>NetworkListener: check for new connections + end + + NetworkListener->>Connection: create from socket + NetworkListener->>NetworkManager: new connection + NetworkManager->>Server: new connection + + Server->>ConnectionInitiator: create with connection +```"] + +mod connection; +mod connection_initiator; +mod listener; +mod network_manager; + +pub(crate) use connection::{ + Connection, + ConnectionMessage, + ConnectionObservableOutput, +}; +pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput}; +// use listener::{ListenerMessage, ListenerOutput, NetworkListener}; +pub(crate) use network_manager::{ + NetworkDataMessage, + NetworkDataOutput, + NetworkManager, + NetworkMessage, + NetworkOutput, +}; diff --git a/server/src/network/network_manager/actor.rs b/server/src/network/network_manager/actor.rs new file mode 100644 index 0000000..1ca6621 --- /dev/null +++ b/server/src/network/network_manager/actor.rs @@ -0,0 +1,250 @@ +use actix::{ + fut::wrap_future, + Actor, + ActorFutureExt, + Addr, + AsyncContext, + Context, + Handler, + WeakAddr, + WeakRecipient, +}; +use foundation::ClientDetails; + +use crate::{ + config_manager::{ConfigManager, ConfigManagerDataMessage, ConfigValue}, + network::{ + listener::{ListenerMessage, ListenerOutput, NetworkListener}, + network_manager::{ + messages::{NetworkMessage, NetworkOutput}, + Builder, + }, + Connection, + ConnectionInitiator, + InitiatorOutput, + NetworkDataMessage, + NetworkDataOutput, + }, +}; + +/// # NetworkManager +/// this struct will handle all networking functionality. +/// +pub struct NetworkManager { + config_manager: WeakAddr, + listener_addr: Option>, + delegate: WeakRecipient, + initiators: Vec>, +} + +impl NetworkManager { + pub fn new(delegate: WeakRecipient) -> Addr { + NetworkManager { + listener_addr: None, + delegate, + initiators: Vec::new(), + config_manager: ConfigManager::shared().downgrade(), + } + .start() + } + + pub fn create(delegate: WeakRecipient) -> Builder { + Builder::new(delegate) + } + + fn start_listener(&mut self, _ctx: &mut ::Context) { + use ListenerMessage::StartListening; + + println!("[NetworkManager] got Listen message"); + + if let Some(addr) = self.listener_addr.as_ref() { + addr.do_send(StartListening); + } + } + + fn stop_listener(&mut self, _ctx: &mut ::Context) { + use ListenerMessage::StopListening; + if let Some(addr) = self.listener_addr.as_ref() { + addr.do_send(StopListening); + } + } + + /// Handles a new connection from the Listener. + /// This creates a new ConnectionInitaliser. + /// This completes the first part of the protocol. + #[inline] + fn new_connection( + &mut self, + ctx: &mut ::Context, + connection: Addr, + ) { + println!("[NetworkManager] Got new connection"); + + let init = ConnectionInitiator::new( + ctx.address().recipient().downgrade(), + connection, + ); + self.initiators.push(init); + } + + #[inline] + fn remove_initiator(&mut self, sender: WeakAddr) { + if let Some(sender) = sender.upgrade() { + let index = self.initiators.iter().position(|i| *i == sender).unwrap(); + println!("[NetworkManager] removed initiator at:{}", index); + let _ = self.initiators.remove(index); + } + } + + /// handles a initiator client request + /// this will, forward the conenction and client details + /// to the server actor to be dispatched to the appropriate + /// manager + #[inline] + fn client_request( + &mut self, + _ctx: &mut ::Context, + sender: WeakAddr, + connection: Addr, + client_details: ClientDetails, + ) { + use NetworkOutput::NewClient; + println!("[NetworkManager] recieved client request"); + if let Some(delegate) = self.delegate.upgrade() { + delegate.do_send(NewClient(connection, client_details)); + } + self.remove_initiator(sender); + } + + /// This sends the connection to the server + /// which will in turn take over the protocol by sending + /// the servers infomation. + #[inline] + fn info_request( + &mut self, + _ctx: &mut ::Context, + sender: WeakAddr, + connection: Addr, + ) { + use NetworkOutput::InfoRequested; + println!("[NetworkManager] Got recieved info request"); + if let Some(delegate) = self.delegate.upgrade() { + delegate.do_send(InfoRequested(connection)); + } + self.remove_initiator(sender); + } +} + +impl Actor for NetworkManager { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + println!("[NetworkManager] Starting"); + let config_mgr = self.config_manager.clone().upgrade(); + + if let Some(config_mgr) = config_mgr { + let fut = wrap_future(config_mgr.send( + ConfigManagerDataMessage::GetValue("Network.Port".to_owned()), + )) + .map( + |out, actor: &mut NetworkManager, ctx: &mut Context| { + use crate::config_manager::ConfigManagerDataResponse::GotValue; + + println!("[NetworkManager] got config manager value {:?}", out); + + let recipient = ctx.address().recipient(); + + let port = if let Ok(GotValue(Some(ConfigValue::Number(port)))) = out + { + port + } else { + 5600 + }; + println!("[NetworkManager] got port: {:?}", port); + let nl = NetworkListener::new( + format!("0.0.0.0:{}", port), + recipient.downgrade(), + ); + nl.do_send(ListenerMessage::StartListening); + actor.listener_addr.replace(nl); + }, + ); + ctx.spawn(fut); + println!("[NetworkManager] Finished Starting"); + } + } +} + +impl Handler for NetworkManager { + type Result = (); + fn handle( + &mut self, + msg: NetworkMessage, + ctx: &mut ::Context, + ) -> >::Result { + use NetworkMessage::{StartListening, StopListening}; + match msg { + StartListening => self.start_listener(ctx), + StopListening => self.stop_listener(ctx), + } + } +} + +impl Handler for NetworkManager { + type Result = NetworkDataOutput; + + fn handle( + &mut self, + msg: NetworkDataMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + match msg { + NetworkDataMessage::IsListening => { + NetworkDataOutput::IsListening(self.listener_addr.is_some()) + } + } + } +} + +impl Handler for NetworkManager { + type Result = (); + fn handle( + &mut self, + msg: ListenerOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use ListenerOutput::NewConnection; + match msg { + NewConnection(connection) => self.new_connection(ctx, connection), + }; + } +} + +impl Handler for NetworkManager { + type Result = (); + fn handle( + &mut self, + msg: InitiatorOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + use InitiatorOutput::{ClientRequest, InfoRequest}; + match msg { + ClientRequest(sender, addr, client_details) => { + self.client_request(ctx, sender, addr, client_details) + } + InfoRequest(sender, addr) => self.info_request(ctx, sender, addr), + } + } +} + +impl From for NetworkManager { + fn from(builder: Builder) -> Self { + Self { + listener_addr: None, + delegate: builder.delegate, + + initiators: Vec::default(), + config_manager: ConfigManager::shared().downgrade(), + } + } +} diff --git a/server/src/network/network_manager/builder.rs b/server/src/network/network_manager/builder.rs new file mode 100644 index 0000000..2656ffc --- /dev/null +++ b/server/src/network/network_manager/builder.rs @@ -0,0 +1,20 @@ +use actix::{Actor, Addr, WeakRecipient}; + +use crate::network::{ + network_manager::messages::NetworkOutput, + NetworkManager, +}; + +pub struct Builder { + pub(super) delegate: WeakRecipient, +} + +impl Builder { + pub(super) fn new(delegate: WeakRecipient) -> Self { + Self { delegate } + } + + pub fn build(self) -> Addr { + NetworkManager::from(self).start() + } +} diff --git a/server/src/network/network_manager/messages.rs b/server/src/network/network_manager/messages.rs new file mode 100644 index 0000000..8c740b2 --- /dev/null +++ b/server/src/network/network_manager/messages.rs @@ -0,0 +1,29 @@ +use actix::{Addr, Message, MessageResponse}; +use foundation::ClientDetails; + +use crate::network::Connection; + +#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)] +#[rtype(result = "()")] +pub enum NetworkMessage { + StartListening, + StopListening, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub enum NetworkOutput { + NewClient(Addr, ClientDetails), + InfoRequested(Addr), +} + +#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)] +#[rtype(result = "NetworkDataOutput")] +pub enum NetworkDataMessage { + IsListening, +} + +#[derive(MessageResponse)] +pub enum NetworkDataOutput { + IsListening(bool), +} diff --git a/server/src/network/network_manager/mod.rs b/server/src/network/network_manager/mod.rs new file mode 100644 index 0000000..ddfbf1b --- /dev/null +++ b/server/src/network/network_manager/mod.rs @@ -0,0 +1,16 @@ +//! # network_manager +//! This module contains the network manager actor +//! it's role involves handling new oncomming network connections + +mod actor; +mod builder; +mod messages; + +pub(crate) use actor::NetworkManager; +pub(crate) use builder::*; +pub(crate) use messages::{ + NetworkDataMessage, + NetworkDataOutput, + NetworkMessage, + NetworkOutput, +}; diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs deleted file mode 100644 index 5e450ac..0000000 --- a/server/src/network_manager.rs +++ /dev/null @@ -1,132 +0,0 @@ -use foundation::prelude::IPreemptive; -use std::io::BufRead; -use std::io::BufReader; -use std::io::BufWriter; -use std::io::Write; -use std::net::TcpListener; -use std::sync::Arc; -use std::thread; - -use crossbeam_channel::Sender; - -use crate::client::Client; -use crate::messages::ServerMessage; -use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; - -pub struct NetworkManager { - listener: TcpListener, - server_channel: Sender, -} - -impl NetworkManager { - pub fn new( - port: String, - server_channel: Sender, - ) -> Arc { - let mut address = "0.0.0.0:".to_string(); - address.push_str(&port); - - let listener = TcpListener::bind(address).expect("Could not bind to address"); - - Arc::new(NetworkManager { - listener, - server_channel, - }) - } -} - -impl IPreemptive for NetworkManager { - fn run(_: &Arc) {} - - fn start(arc: &Arc) { - let arc = arc.clone(); - std::thread::spawn(move || { - // fetch new connections and add them to the client queue - for connection in arc.listener.incoming() { - println!("[NetworkManager]: New Connection!"); - match connection { - Ok(stream) => { - let server_channel = arc.server_channel.clone(); - - // create readers - let mut reader = BufReader::new(stream.try_clone().unwrap()); - let mut writer = BufWriter::new(stream.try_clone().unwrap()); - - let _handle = thread::Builder::new() - .name("NetworkJoinThread".to_string()) - .spawn(move || { - let mut out_buffer: Vec = Vec::new(); - let mut in_buffer: String = String::new(); - - // send request message to connection - - let _ = writeln!( - out_buffer, - "{}", - serde_json::to_string(&NetworkSockOut::Request) - .unwrap() - ); - - let _ = writer.write_all(&out_buffer); - let _ = writer.flush(); - - // try get response - let res = reader.read_line(&mut in_buffer); - if res.is_err() { - return; - } - - //match the response - if let Ok(request) = - serde_json::from_str::(&in_buffer) - { - match request { - NetworkSockIn::Info => { - // send back server info to the connection - writer - .write_all( - serde_json::to_string( - &NetworkSockOut::GotInfo { - server_name: "oof", - server_owner: "michael", - }, - ) - .unwrap() - .as_bytes(), - ) - .unwrap(); - writer.write_all(b"\n").unwrap(); - writer.flush().unwrap(); - } - NetworkSockIn::Connect { - uuid, - username, - address, - } => { - // create client and send to server - let new_client = Client::new( - uuid, - username, - address, - stream.try_clone().unwrap(), - server_channel.clone(), - ); - server_channel - .send(ServerMessage::ClientConnected( - new_client, - )) - .unwrap_or_default(); - } - } - } - }); - } - Err(e) => { - println!("[Network manager]: error getting stream: {:?}", e); - continue; - } - } - } - }); - } -} diff --git a/server/src/plugin/Plugin.rs b/server/src/plugin/Plugin.rs new file mode 100644 index 0000000..83ba7d0 --- /dev/null +++ b/server/src/plugin/Plugin.rs @@ -0,0 +1,35 @@ +use crate::plugin::WeakPluginInterface; +use foundation::event::Event; +use std::fmt::Debug; +use std::sync::Arc; + +use crate::plugin::plugin_details::PluginDetails; +use std::sync::Arc; + +/// # Plugin +/// Type alias for plugin objects. +pub type Plugin = Arc>; + +/// # GetPluginFn +/// This defines the type for getting the plugin struct from a +pub type GetPluginFn = fn() -> Plugin; + +/// # Plugin +/// This trait defines an interface for plugins to implement. +/// +/// ## Methods +/// - details: This returns the details about the plugin. +/// - init: Defines the initialisation routine for the plugin. +/// - run: defines a routine to be ran like a thread by the plugin manager. +/// - deinit: Defines the deinitalisation routine for the plugin +#[async_trait::async_trait] +pub trait IPlugin: Send + Sync + Debug { + fn details(&self) -> PluginDetails; + fn on_event(&self, event: Event); + + fn set_interface(&self, interface: WeakPluginInterface); + + fn init(&self); + async fn run(&self); + fn deinit(&self); +} diff --git a/server/src/plugin/mod.rs b/server/src/plugin/mod.rs new file mode 100644 index 0000000..0e83d97 --- /dev/null +++ b/server/src/plugin/mod.rs @@ -0,0 +1,12 @@ +mod plugin; +mod plugin_details; +mod plugin_entry; +mod plugin_interface; +mod plugin_manager; +mod plugin_permissions; + +pub use plugin::{IPlugin, Plugin}; +pub use plugin_details::PluginDetails; +pub(crate) use plugin_interface::PluginInterface; +pub use plugin_interface::WeakPluginInterface; +pub(crate) use plugin_manager::{PluginManager, PluginManagerMessage}; diff --git a/server/src/plugin/plugin_entry.rs b/server/src/plugin/plugin_entry.rs new file mode 100644 index 0000000..66cb569 --- /dev/null +++ b/server/src/plugin/plugin_entry.rs @@ -0,0 +1,171 @@ +use crate::plugin::plugin_interface::IPluginInterface; +use crate::plugin::PluginInterface; +use foundation::event::Event; + +use crate::event_type::EventType; + +use foundation::event::EventResult; +use foundation::event::IResponder; +use serde::{Deserialize, Serialize}; +use std::sync::Weak; + +use futures::channel::oneshot::Receiver; + +use crate::plugin::plugin::Plugin; +use crate::plugin::plugin_entry::PluginExecutionState::{Paused, Running, Stopped}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::time::sleep; + +pub(crate) type PluginEntryObj = Arc; + +#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum PluginPermission { + Read, + Write, + ReadWrite, + None, +} + +#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub(crate) enum PluginExecutionState { + Running, + Paused, + Stopped, +} + +/// # PluginEntry +/// a wrapper for plugins loaded into the server. +/// Used to provide an api for the plugin to use. +/// Also acts as gatekeeper to server data with permissions. +#[derive(Debug)] +pub(crate) struct PluginEntry +where + T: Sync + Send, +{ + server_permission: PluginPermission, + network_permission: PluginPermission, + client_manager_permission: PluginPermission, + client_permission: PluginPermission, + + state: Arc>, + + plugin: Plugin>, +} + +impl PluginEntry +where + T: Sync + Send, +{ + pub fn new(plugin: Plugin) -> Arc> { + let entry = Arc::new(PluginEntry { + server_permission: PluginPermission::None, + network_permission: PluginPermission::None, + client_manager_permission: PluginPermission::None, + client_permission: PluginPermission::None, + + state: Arc::new(Mutex::new(Stopped)), + + plugin: plugin.clone(), + }); + + let entry_ref = entry.clone() as PluginInterface; + + plugin.set_interface(Arc::downgrade(&entry_ref)); + entry + } + + pub(crate) async fn getState(&self) -> PluginExecutionState { + *self.state.lock().await + } + + pub fn start(&self) { + let cont = self.plugin.clone(); + let state = self.state.clone(); + tokio::spawn(async move { + let local_state = state.clone(); + let mut lock = local_state.lock().await; + match *lock { + Running => (), + Paused => { + *lock = Running; + } + Stopped => { + tokio::spawn(async move { + cont.init(); + let mut lock = state.lock().await; + *lock = Running; + loop { + match *lock { + Running => cont.run().await, + Paused => sleep(Duration::new(1, 0)).await, + Stopped => break, + } + } + cont.deinit() + }); + } + } + }); + } + + pub fn pause(&self) { + let state = self.state.clone(); + tokio::spawn(async move { + let mut lock = state.lock().await; + match *lock { + Running => { + *lock = Paused; + } + Paused => (), + Stopped => (), + } + }); + } + + pub fn stop(&self) { + let state = self.state.clone(); + tokio::spawn(async move { + let mut lock = state.lock().await; + match *lock { + Running => { + *lock = Stopped; + } + Paused => { + *lock = Stopped; + } + Stopped => (), + } + }); + } +} + +impl IPluginInterface for PluginEntry { + fn send_event(&self, _event: Event) -> Receiver { + todo!() + } +} + +impl IResponder> for PluginEntry { + fn on_event(&self, event: Event) { + use EventType::{ClientAdded, Custom, NewConnection}; + use PluginPermission::{None, Read, ReadWrite, Write}; + + match ( + &event.r#type, + &self.network_permission, + &self.client_manager_permission, + &self.client_permission, + &self.server_permission, + ) { + (NewConnection, Read | ReadWrite, _, _, _) => self.plugin.on_event(event), + (ClientAdded(id), _, Read | ReadWrite, _, _) => self.plugin.on_event(event), + (Custom("ping"), _, _, _, _) => println!("[PluginEntry:on_event] Ping!"), + _ => println!("[PluginEntry:on_event] not handled"), + }; + } + fn get_next(&self) -> Option>> { + todo!() + } +} diff --git a/server/src/plugin/plugin_interface.rs b/server/src/plugin/plugin_interface.rs new file mode 100644 index 0000000..7b92009 --- /dev/null +++ b/server/src/plugin/plugin_interface.rs @@ -0,0 +1,25 @@ +use foundation::event::Event; +use foundation::event::EventResult; +use foundation::event::IResponder; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::Weak; + +use futures::channel::oneshot::Receiver; + +pub type WeakPluginInterface +where + T: Sync + Send, += Weak>; + +pub(crate) type PluginInterface +where + T: Sync + Send, += Arc>; + +pub trait IPluginInterface: IResponder + Send + Sync + Debug +where + T: Sync + Send, +{ + fn send_event(&self, event: Event) -> Receiver; +} diff --git a/server/src/plugin/plugin_manager.rs b/server/src/plugin/plugin_manager.rs new file mode 100644 index 0000000..d7530dd --- /dev/null +++ b/server/src/plugin/plugin_manager.rs @@ -0,0 +1,109 @@ +use std::fs::Metadata; +use std::{io::Error, mem, sync::Arc}; + +use libloading::Library; + +use tokio::fs::{create_dir, read_dir, DirEntry}; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; + +use futures::future::join_all; + +use crate::plugin::plugin::GetPluginFn; +use crate::plugin::plugin_entry::{PluginEntry, PluginEntryObj}; + +pub enum PluginManagerMessage { + None, +} + +/// # PluginManager +/// This struct handles the loading and unloading of plugins in the server +/// +/// ## Attributes +/// - plugins: A [Vec] of all loaded plugins +/// - server_channel: A [Sender] +pub struct PluginManager +where + Out: From + Send, +{ + #[allow(dead_code)] + plugins: Mutex>, + + #[allow(dead_code)] + server_channel: Mutex>, +} + +impl PluginManager +where + Out: From + Send, +{ + /// Creates a new plugin manager with sender. + pub fn new(channel: Sender) -> Arc { + Arc::new(Self { + plugins: Mutex::new(Vec::new()), + server_channel: Mutex::new(channel), + }) + } + + /// Starts loading plugins from the plugins directory. + /// If this directory isn't found then create it get created. + pub async fn load(&self) -> Result<(), Error> { + println!("[PluginManager]: loading plugins"); + println!( + "[PluginManager]: from: {}", + std::env::current_dir().unwrap().to_string_lossy() + ); + + if let Ok(mut plugins) = read_dir("./plugins").await { + // Todo: - make this concurrent + let mut plugin_vec = vec![]; + while let Some(next) = plugins.next_entry().await? { + println!("{:?}", next); + plugin_vec.push(next) + } + + // get all entries by extension + let entries: Vec = plugin_vec + .into_iter() + .filter(|item| item.path().extension().unwrap_or_default() == "dylib") + .collect(); + + // get entry metadata + #[allow(clippy::needless_collect)] // This is a false positive. Collect is needed here + let metadata: Vec = join_all(entries.iter().map(|item| item.metadata())) + .await + .into_iter() + .filter_map(|item| item.ok()) + .collect(); + + // convert correct ones to plugins + let plugins: Vec = entries + .into_iter() + .zip(metadata.into_iter()) + .filter(|(_item, meta)| meta.is_file()) + .map(|item| item.0) + .map(|item| unsafe { + let lib = Library::new(item.path()).unwrap(); + let plugin_fn = lib.get::>("get_plugin".as_ref()).unwrap(); + PluginEntry::new(plugin_fn()) + }) + .collect(); + + println!("[PluginManager:load] got plugins: {:?}", plugins); + + let mut self_vec = self.plugins.lock().await; + let _ = mem::replace(&mut *self_vec, plugins); + } else { + create_dir("./plugins").await?; + } + + self + .plugins + .lock() + .await + .iter() + .for_each(|item| item.start()); + + Ok(()) + } +} diff --git a/server/src/plugin/plugin_permissions.rs b/server/src/plugin/plugin_permissions.rs new file mode 100644 index 0000000..e69de29 diff --git a/server/src/prelude/mod.rs b/server/src/prelude/mod.rs new file mode 100644 index 0000000..6c2d7f9 --- /dev/null +++ b/server/src/prelude/mod.rs @@ -0,0 +1,29 @@ +//! # prelude +//! A module that coalesces different types into one module of defined structure + +mod observer; + +#[allow(unused_imports)] +pub mod actors { + //! exports all actors used in the program. + pub use crate::server::Server; + pub(crate) use crate::{ + client_management::{client::Client, ClientManager}, + network::{Connection, ConnectionInitiator, NetworkManager}, + }; +} + +#[allow(unused_imports)] +pub mod messages { + //! exports all messages used in the program. + pub(crate) use super::observer::ObservableMessage; + pub(crate) use crate::{ + client_management::{ClientManagerMessage, ClientManagerOutput}, + network::{ + ConnectionMessage, + ConnectionObservableOutput, + NetworkMessage, + NetworkOutput, + }, + }; +} diff --git a/server/src/prelude/observer.rs b/server/src/prelude/observer.rs new file mode 100644 index 0000000..dd8334c --- /dev/null +++ b/server/src/prelude/observer.rs @@ -0,0 +1,17 @@ +//! # observer.rs +//! crates a message type for the observer pattern. + +use actix::{Message, WeakRecipient}; + +/// # ObservableMessage +/// represents common messages for observers +#[derive(Message)] +#[rtype(result = "()")] +pub enum ObservableMessage +where + M: Message + Send, + M::Result: Send, +{ + Subscribe(WeakRecipient), + Unsubscribe(WeakRecipient), +} diff --git a/server/src/rhai/builder.rs b/server/src/rhai/builder.rs new file mode 100644 index 0000000..04239a9 --- /dev/null +++ b/server/src/rhai/builder.rs @@ -0,0 +1,62 @@ +use actix::{Actor, Addr, WeakAddr}; +use rhai::{Engine, Scope}; + +use crate::{ + client_management::ClientManager, + network::NetworkManager, + rhai::rhai_manager::RhaiManager, + Server, +}; + +pub struct Builder { + engine: Engine, + server: WeakAddr, + network_manager: WeakAddr, + client_manager: WeakAddr, + scope: Scope<'static>, +} + +impl Builder { + pub(super) fn new( + server: WeakAddr, + network_manager: WeakAddr, + client_manager: WeakAddr, + ) -> Self { + Builder { + engine: Engine::new(), + server, + network_manager, + client_manager, + scope: Default::default(), + } + } + + pub fn scope_object(mut self, name: &str, obj: T) -> Self + where + T: Clone, + { + self.engine.register_type::(); + self.scope.set_value(name, obj); + self + } + + // not sure what this is for? + // pub fn scope_fn(mut self, name: &str, func: F) -> Self + // where + // F: RegisterNativeFunction, + // { + // self.engine.register_fn(name, func); + // self + // } + + pub(crate) fn build(self) -> Addr { + RhaiManager { + engine: self.engine, + _scope: self.scope, + _server: self.server, + _network_manager: self.network_manager, + _client_manager: self.client_manager, + } + .start() + } +} diff --git a/server/src/rhai/mod.rs b/server/src/rhai/mod.rs new file mode 100644 index 0000000..7c00b43 --- /dev/null +++ b/server/src/rhai/mod.rs @@ -0,0 +1,4 @@ +mod builder; +mod rhai_manager; + +pub use rhai_manager::RhaiManager; diff --git a/server/src/rhai/rhai_manager.rs b/server/src/rhai/rhai_manager.rs new file mode 100644 index 0000000..cf1e75d --- /dev/null +++ b/server/src/rhai/rhai_manager.rs @@ -0,0 +1,47 @@ +use actix::{Actor, Context, WeakAddr}; +use rhai::{Engine, Scope}; + +use crate::{ + client_management::ClientManager, + network::NetworkManager, + rhai::builder::Builder, + Server, +}; + +pub struct RhaiManager { + pub(super) engine: Engine, + pub(super) _scope: Scope<'static>, + pub(super) _server: WeakAddr, + pub(super) _network_manager: WeakAddr, + pub(super) _client_manager: WeakAddr, +} + +impl RhaiManager { + pub fn create( + server: WeakAddr, + network_manager: WeakAddr, + client_manager: WeakAddr, + ) -> Builder { + Builder::new( + server.clone(), + network_manager.clone(), + client_manager.clone(), + ) + .scope_object("server", server) + } +} + +impl Actor for RhaiManager { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + self + .engine + .run( + r#" + print("hello rhai") + "#, + ) + .unwrap(); + } +} diff --git a/server/src/scripting/mod.rs b/server/src/scripting/mod.rs new file mode 100644 index 0000000..d984769 --- /dev/null +++ b/server/src/scripting/mod.rs @@ -0,0 +1,4 @@ +pub(crate) mod scriptable_client; +pub(crate) mod scriptable_client_manager; +pub(crate) mod scriptable_network_manager; +pub(crate) mod scriptable_server; diff --git a/server/src/scripting/scriptable_client.rs b/server/src/scripting/scriptable_client.rs new file mode 100644 index 0000000..b07ef98 --- /dev/null +++ b/server/src/scripting/scriptable_client.rs @@ -0,0 +1,60 @@ +use actix::Addr; +use mlua::{Error, UserData, UserDataMethods}; + +use crate::client_management::client::{ + Client, + ClientDataMessage, + ClientDataResponse, + ClientDataResponse::{Username, Uuid}, +}; + +#[derive(Clone)] +pub(crate) struct ScriptableClient { + addr: Addr, +} + +impl UserData for ScriptableClient { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("username", |_lua, obj, ()| async move { + let name: Option = + obj.addr.send(ClientDataMessage::Username).await.ok(); + if let Some(Username(name)) = name { + Ok(name) + } else { + Err(Error::RuntimeError( + "Name returned null or other value".to_string(), + )) + } + }); + + methods.add_async_method("uuid", |_lua, obj, ()| async move { + let uuid: Option = + obj.addr.send(ClientDataMessage::Uuid).await.ok(); + if let Some(Uuid(uuid)) = uuid { + Ok(uuid.to_string()) + } else { + Err(Error::RuntimeError( + "Uuid returned null or other value".to_string(), + )) + } + }); + + methods.add_async_method("address", |_lua, obj, ()| async move { + let address: Option = + obj.addr.send(ClientDataMessage::Address).await.ok(); + if let Some(Username(address)) = address { + Ok(address) + } else { + Err(Error::RuntimeError( + "address returned null or other value".to_string(), + )) + } + }); + } +} + +impl From> for ScriptableClient { + fn from(addr: Addr) -> Self { + Self { addr } + } +} diff --git a/server/src/scripting/scriptable_client_manager.rs b/server/src/scripting/scriptable_client_manager.rs new file mode 100644 index 0000000..1bf3521 --- /dev/null +++ b/server/src/scripting/scriptable_client_manager.rs @@ -0,0 +1,43 @@ +use actix::Addr; +use mlua::{Error, UserData, UserDataMethods}; + +use crate::{ + client_management::{ + ClientManager, + ClientManagerDataMessage, + ClientManagerDataResponse::Clients, + }, + scripting::scriptable_client::ScriptableClient, +}; + +#[derive(Clone)] +pub(crate) struct ScriptableClientManager { + addr: Addr, +} + +impl UserData for ScriptableClientManager { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("clients", |_lua, obj, ()| async move { + let res = obj.addr.send(ClientManagerDataMessage::Clients).await; + if let Ok(Clients(clients)) = res { + let clients: Vec = clients + .into_iter() + .filter_map(|a| a.upgrade()) + .map(ScriptableClient::from) + .collect(); + + Ok(clients) + } else { + Err(Error::RuntimeError( + "clients returned null or other value".to_string(), + )) + } + }) + } +} + +impl From> for ScriptableClientManager { + fn from(addr: Addr) -> Self { + Self { addr } + } +} diff --git a/server/src/scripting/scriptable_network_manager.rs b/server/src/scripting/scriptable_network_manager.rs new file mode 100644 index 0000000..d1ba282 --- /dev/null +++ b/server/src/scripting/scriptable_network_manager.rs @@ -0,0 +1,35 @@ +use actix::Addr; +use mlua::{Error, UserData, UserDataMethods}; + +use crate::network::{ + NetworkDataMessage, + NetworkDataOutput::IsListening, + NetworkManager, +}; + +#[derive(Clone)] +pub(crate) struct ScriptableNetworkManager { + addr: Addr, +} + +impl UserData for ScriptableNetworkManager { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("Listening", |_lua, obj, ()| async move { + let is_listening = + obj.addr.send(NetworkDataMessage::IsListening).await.ok(); + if let Some(IsListening(is_listening)) = is_listening { + Ok(is_listening) + } else { + Err(Error::RuntimeError( + "Uuid returned null or other value".to_string(), + )) + } + }); + } +} + +impl From> for ScriptableNetworkManager { + fn from(addr: Addr) -> Self { + Self { addr } + } +} diff --git a/server/src/scripting/scriptable_server.rs b/server/src/scripting/scriptable_server.rs new file mode 100644 index 0000000..02265b4 --- /dev/null +++ b/server/src/scripting/scriptable_server.rs @@ -0,0 +1,55 @@ +use actix::WeakAddr; +use mlua::{Error, UserData, UserDataMethods}; + +use crate::server::{ServerDataResponse::Name, *}; + +#[derive(Clone)] +pub(crate) struct ScriptableServer { + pub(super) addr: WeakAddr, +} + +impl UserData for ScriptableServer { + fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("name", |_lua, obj, ()| async move { + let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Name)) else { + return Err(Error::RuntimeError( + "[ScriptableServer:name] Server doesn't exist. Dunno how you got here".to_string(), + )) + }; + + let name: Option = send_fut.await.ok(); + + let Some(Name(name)) = name else { + return Err(Error::RuntimeError( + "[ScriptableServer:name] Name returned nil".to_string(), + )) + }; + + Ok(name) + }); + + methods.add_async_method("owner", |_lua, obj, ()| async move { + let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Owner)) else { + return Err(Error::RuntimeError( + "[ScriptableServer:owner] Server doesn't exist. Dunno how you got here".to_string(), + )) + }; + + let owner: Option = send_fut.await.ok(); + + let Some(Name(owner)) = owner else { + return Err(Error::RuntimeError( + "[ScriptableServer:owner] Owner returned nil".to_string(), + )) + }; + + Ok(owner) + }); + } +} + +impl From> for ScriptableServer { + fn from(addr: WeakAddr) -> Self { + Self { addr } + } +} diff --git a/server/src/server.rs b/server/src/server.rs deleted file mode 100644 index 2e7d7ec..0000000 --- a/server/src/server.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::sync::Arc; - -use crossbeam_channel::{unbounded, Receiver}; -use uuid::Uuid; - -use crate::client_manager::ClientManager; -use crate::messages::ClientMgrMessage; -use crate::messages::ServerMessage; -use crate::network_manager::NetworkManager; -use foundation::prelude::ICooperative; -use foundation::prelude::IMessagable; -use foundation::prelude::IPreemptive; - -/// # ServerMessages -/// This is used internally to send messages to the server to be dispatched -#[derive(Debug)] -pub enum ServerMessages { - ClientConnected(Arc), - ClientDisconnected(Uuid), -} - -pub struct Server { - client_manager: Arc, - network_manager: Arc, - - receiver: Receiver, -} - -impl Server { - pub fn new() -> Arc { - let (sender, receiver) = unbounded(); - - Arc::new(Server { - client_manager: ClientManager::new(sender.clone()), - - network_manager: NetworkManager::new("5600".to_string(), sender), - receiver, - }) - } -} - -impl ICooperative for Server { - fn tick(&self) { - use ClientMgrMessage::{Add, Remove, SendMessage}; - - // handle new messages loop - if !self.receiver.is_empty() { - for message in self.receiver.try_iter() { - println!("[server]: received message {:?}", &message); - match message { - ServerMessage::ClientConnected(client) => { - self.client_manager.send_message(Add(client)) - } - ServerMessage::ClientDisconnected(uuid) => { - println!("disconnecting client {:?}", uuid); - self.client_manager.send_message(Remove(uuid)); - } - ServerMessage::ClientSendMessage { from, to, content } => self - .client_manager - .send_message(SendMessage { from, to, content }), - ServerMessage::ClientUpdate (_uuid) => println!("not implemented"), - } - } - } - } -} - -impl IPreemptive for Server { - fn run(arc: &std::sync::Arc) { - // start services - NetworkManager::start(&arc.network_manager); - ClientManager::start(&arc.client_manager); - loop { - arc.tick(); - } - } - - fn start(arc: &std::sync::Arc) { - let arc = arc.clone(); - // start thread - std::thread::spawn(move || Server::run(&arc)); - } -} diff --git a/server/src/server/builder.rs b/server/src/server/builder.rs new file mode 100644 index 0000000..b994842 --- /dev/null +++ b/server/src/server/builder.rs @@ -0,0 +1,31 @@ +use actix::{Actor, Addr}; + +use super::*; + +pub struct ServerBuilder { + pub(super) name: String, + pub(super) owner: String, +} + +impl<'rhai> ServerBuilder { + pub(super) fn new() -> Self { + Self { + name: "".into(), + owner: "".into(), + } + } + + pub fn name(mut self, name: String) -> Self { + self.name = name; + self + } + + pub fn owner(mut self, owner: String) -> Self { + self.owner = owner; + self + } + + pub fn build(self) -> Addr { + Server::from(self).start() + } +} diff --git a/server/src/server/messages.rs b/server/src/server/messages.rs new file mode 100644 index 0000000..215b3ff --- /dev/null +++ b/server/src/server/messages.rs @@ -0,0 +1,21 @@ +use actix::{Addr, Message, MessageResponse}; + +use crate::{client_management::ClientManager, network::NetworkManager}; + +#[derive(Message, Clone)] +#[rtype(result = "ServerDataResponse")] +pub enum ServerDataMessage { + Name, + Owner, + ClientManager, + NetworkManager, +} + +#[derive(MessageResponse, Clone)] +pub enum ServerDataResponse { + Name(String), + Port(u16), + Owner(String), + ClientManager(Option>), + NetworkManager(Option>), +} diff --git a/server/src/server/mod.rs b/server/src/server/mod.rs new file mode 100644 index 0000000..6e7470f --- /dev/null +++ b/server/src/server/mod.rs @@ -0,0 +1,13 @@ +//! # actix_server +//! this holds the server actor +//! the server acts as teh main actor +//! and supervisor to the actor system. + +mod server; + +mod builder; +mod messages; + +pub use builder::ServerBuilder; +pub use messages::*; +pub use server::Server; diff --git a/server/src/server/server.rs b/server/src/server/server.rs new file mode 100644 index 0000000..9529aa9 --- /dev/null +++ b/server/src/server/server.rs @@ -0,0 +1,211 @@ +//! This crate holds the implementations and functions for the server +//! including server boot procedures + +use actix::{ + fut::wrap_future, + Actor, + ActorFutureExt, + Addr, + AsyncContext, + Context, + Handler, +}; +use foundation::{messages::network::NetworkSockOut::GotInfo, ClientDetails}; + +use crate::{ + client_management::{ + client::Client, + ClientManager, + ClientManagerMessage::AddClient, + ClientManagerOutput, + }, + config_manager::{ + ConfigManager, + ConfigManagerDataMessage, + ConfigManagerDataResponse, + ConfigValue, + }, + lua::LuaManager, + network::{ + Connection, + ConnectionMessage::{CloseConnection, SendData}, + NetworkManager, + NetworkOutput, + NetworkOutput::{InfoRequested, NewClient}, + }, + prelude::messages::NetworkMessage, + rhai::RhaiManager, + server::{builder, ServerBuilder, ServerDataMessage, ServerDataResponse}, +}; + +/// This struct is the main actor of the server. +/// all other actors are ran through here. +pub struct Server { + name: String, + owner: String, + + network_manager: Option>, + client_manager: Option>, + rhai_manager: Option>, + lua_manager: Option>, +} + +impl Server { + pub(crate) fn create() -> builder::ServerBuilder { + ServerBuilder::new() + } + + pub(crate) fn client_request( + &mut self, + _ctx: &mut ::Context, + addr: Addr, + details: ClientDetails, + ) { + if let Some(mgr) = self.client_manager.as_ref() { + let client = Client::new(addr, details.clone()); + mgr.do_send(AddClient(details.uuid, client)); + } + } + + pub(crate) fn info_request( + &mut self, + ctx: &mut ::Context, + sender: Addr, + ) { + let fut = wrap_future( + sender.send(SendData( + serde_json::to_string(&GotInfo { + server_name: self.name.clone(), + server_owner: self.owner.clone(), + }) + .expect("Failed to serialise"), + )), + ) + // equivalent to using .then() in js + .map(move |_out, _act: &mut Self, _ctx| { + sender.do_send(CloseConnection); + }); + ctx.spawn(fut); + } +} + +impl Actor for Server { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + use ConfigManagerDataMessage::GetValue; + use ConfigManagerDataResponse::GotValue; + + let addr = ctx.address().downgrade(); + + let nm = NetworkManager::create(addr.clone().recipient()).build(); + let cm = ClientManager::new(addr.recipient()); + let rm = RhaiManager::create( + ctx.address().downgrade(), + nm.downgrade(), + cm.downgrade(), + ) + .build(); + let lm = LuaManager::create( + ctx.address().downgrade(), + nm.downgrade(), + cm.downgrade(), + ) + .build(); + + self.network_manager.replace(nm.clone()); + self.client_manager.replace(cm.clone()); + self.rhai_manager.replace(rm); + self.lua_manager.replace(lm); + + nm.do_send(NetworkMessage::StartListening); + + let name_fut = wrap_future( + ConfigManager::shared().send(GetValue("Server.Name".to_owned())), + ) + .map(|out, actor: &mut Server, _ctx| { + if let Ok(GotValue(Some(ConfigValue::String(val)))) = out { + actor.name = val + } + }); + + let owner_fut = wrap_future( + ConfigManager::shared().send(GetValue("Server.Owner".to_owned())), + ) + .map(|out, actor: &mut Server, _ctx| { + if let Ok(GotValue(Some(ConfigValue::String(val)))) = out { + actor.owner = val + } + }); + + ctx.spawn(name_fut); + ctx.spawn(owner_fut); + } +} + +impl Handler for Server { + type Result = ServerDataResponse; + + fn handle( + &mut self, + msg: ServerDataMessage, + _ctx: &mut Self::Context, + ) -> Self::Result { + println!("[Server] got data message"); + match msg { + ServerDataMessage::Name => ServerDataResponse::Name(self.name.clone()), + ServerDataMessage::Owner => ServerDataResponse::Owner(self.owner.clone()), + ServerDataMessage::ClientManager => { + ServerDataResponse::ClientManager(self.client_manager.clone()) + } + ServerDataMessage::NetworkManager => { + ServerDataResponse::NetworkManager(self.network_manager.clone()) + } + } + } +} + +impl Handler for Server { + type Result = (); + fn handle( + &mut self, + msg: NetworkOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + println!("[ServerActor] received message"); + match msg { + // This uses promise like funcionality to queue + // a set of async operations, + // so they occur in the right order + InfoRequested(sender) => self.info_request(ctx, sender), + // A new client is to be added + NewClient(addr, details) => self.client_request(ctx, addr, details), + }; + } +} + +impl Handler for Server { + type Result = (); + + fn handle( + &mut self, + _msg: ClientManagerOutput, + _ctx: &mut Self::Context, + ) -> Self::Result { + todo!() + } +} + +impl From for Server { + fn from(builder: ServerBuilder) -> Self { + Server { + name: builder.name, + owner: builder.owner, + + network_manager: None, + client_manager: None, + rhai_manager: None, + lua_manager: None, + } + } +}