From f43ceb07df0f4048426930a5acd84f74cb79c4a4 Mon Sep 17 00:00:00 2001 From: michael bailey Date: Tue, 13 Apr 2021 21:54:20 +0100 Subject: [PATCH 01/30] Create rust.yml (#3) + added a workflow file for CI --- .github/workflows/rust.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..edf72fb --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,25 @@ +name: Rust + +on: + push: + branches: [ ref-method ] + pull_request: + branches: [ master ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + + steps: + - uses: actions/checkout@v2 + - name: check + run: cargo check --verbose + - name: Build + run: cargo build --verbose + -- 2.40.1 From 71b77de447ffde434fbdfe3dbb744ab53f710848 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 21 Apr 2021 13:14:57 +0000 Subject: [PATCH 02/30] adding user update support --- Cargo.toml | 2 +- foundation/src/lib.rs | 8 ++--- foundation/src/messages/client.rs | 2 +- rustfmt.toml | 2 +- server/Cargo.toml | 4 +-- server/src/client.rs | 53 ++++++++++++++++--------------- server/src/client_manager.rs | 37 +++++++++++---------- server/src/main.rs | 25 ++++++++------- server/src/messages.rs | 18 ++++++++--- server/src/network_manager.rs | 14 +++----- server/src/server.rs | 12 ++++--- 11 files changed, 94 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 00e10ba..be07e27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,4 +4,4 @@ members = [ 'server', 'client', 'serverctl' -] \ No newline at end of file +] diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index 3ff3748..97dca2e 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -6,7 +6,7 @@ use uuid::Uuid; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct ClientDetails { - pub uuid: Uuid, - pub username: String, - pub address: String, -} \ No newline at end of file + pub uuid: Uuid, + pub username: String, + pub address: String, +} diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs index cabe3bc..2944a31 100644 --- a/foundation/src/messages/client.rs +++ b/foundation/src/messages/client.rs @@ -25,7 +25,7 @@ pub enum ClientStreamOut { UserMessage { from: Uuid, content: String }, GlobalMessage { content: String }, - ConnectedClients {clients: Vec}, + ConnectedClients { clients: Vec }, Disconnected, } diff --git a/rustfmt.toml b/rustfmt.toml index 779de58..544999c 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1,2 @@ hard_tabs = true -max_width = 90 \ No newline at end of file +max_width = 100 \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml index a91dd21..0cbca46 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,6 +13,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" crossbeam = "0.8.0" crossbeam-channel = "0.5.0" +zeroize = "1.1.0" -[dependencies.foundation] -path = '../foundation' \ No newline at end of file +foundation = {path = '../foundation'} \ No newline at end of file diff --git a/server/src/client.rs b/server/src/client.rs index d5e1efd..bfd9bee 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -13,10 +13,11 @@ use std::sync::Mutex; use crossbeam_channel::{unbounded, Receiver, Sender}; use serde::Serialize; use uuid::Uuid; +use zeroize::Zeroize; -use foundation::ClientDetails; use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; use foundation::prelude::IMessagable; +use foundation::ClientDetails; /// # Client /// This struct represents a connected user. @@ -117,7 +118,7 @@ impl IPreemptive for Client { let _ = std::thread::Builder::new() .name(format!("client thread recv [{:?}]", &arc.uuid)) .spawn(move || { - use ClientMessage::{Disconnect}; + use ClientMessage::Disconnect; let arc = arc1; let mut buffer = String::new(); @@ -131,6 +132,7 @@ impl IPreemptive for Client { } let command = serde_json::from_str::(buffer.as_str()); + println!("[Client {:?}]: recieved {}", arc.uuid, &buffer); match command { Ok(ClientStreamIn::Disconnect) => { println!("[Client {:?}]: Disconnect recieved", &arc.uuid); @@ -138,10 +140,7 @@ impl IPreemptive for Client { break 'main; } Ok(ClientStreamIn::SendMessage { to, content }) => { - println!( - "[Client {:?}]: send message to: {:?}", - &arc.uuid, &to - ); + println!("[Client {:?}]: send message to: {:?}", &arc.uuid, &to); let lock = arc.server_channel.lock().unwrap(); let sender = lock.as_ref().unwrap(); let _ = sender.send(ServerMessage::ClientSendMessage { @@ -150,8 +149,14 @@ impl IPreemptive for Client { content, }); } + Ok(ClientStreamIn::Update) => { + let lock = arc.server_channel.lock().unwrap(); + let sender = lock.as_ref().unwrap(); + let _ = sender.send(ServerMessage::ClientUpdate { to: arc.uuid }); + } _ => println!("[Client {:?}]: command not found", &arc.uuid), } + buffer.zeroize(); } println!("[Client {:?}] exited thread 1", &arc.uuid); }); @@ -175,7 +180,7 @@ impl IPreemptive for Client { 'main: loop { for message in arc.output.iter() { - use ClientMessage::{Disconnect,Message, Update}; + use ClientMessage::{Disconnect, Message, SendClients}; println!("[Client {:?}]: {:?}", &arc.uuid, message); match message { Disconnect => { @@ -184,35 +189,33 @@ impl IPreemptive for Client { .unwrap() .as_mut() .unwrap() - .send(ServerMessage::ClientDisconnected(arc.uuid)) + .send(ServerMessage::ClientDisconnected { id: arc.uuid }) .unwrap(); break 'main; } Message { from, content } => { - let _ = writeln!( - buffer, - "{}", - serde_json::to_string( - &ClientStreamOut::UserMessage { from, content } - ) - .unwrap() - ); + let msg = &ClientStreamOut::UserMessage { from, content }; + let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap()); let _ = writer.write_all(&buffer); let _ = writer.flush(); } - Update {clients} => { - let client_details_vec: Vec = clients.iter().map(|client| &client.details).cloned().collect(); - let _ = writeln!( - buffer, - "{}", - serde_json::to_string( - &ClientStreamOut::ConnectedClients {clients: client_details_vec} - ).unwrap() - ); + SendClients { clients } => { + let client_details_vec: Vec = clients + .iter() + .map(|client| &client.details) + .cloned() + .collect(); + + let msg = &ClientStreamOut::ConnectedClients { + clients: client_details_vec, + }; + + let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap()); let _ = writer.write_all(&buffer); let _ = writer.flush(); } } + buffer.zeroize(); } } println!("[Client {:?}]: exited thread 2", &arc.uuid); diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index a3f4d51..0c0cbef 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -39,6 +39,13 @@ impl ClientManager { receiver, }) } + + fn send_to_client(&self, id: &Uuid, msg: ClientMessage) { + let lock = self.clients.lock().unwrap(); + if let Some(client) = lock.get(id) { + client.send_message(msg) + } + } } impl IMessagable> for ClientManager { @@ -59,7 +66,7 @@ impl IPreemptive for ClientManager { if !arc.receiver.is_empty() { for message in arc.receiver.try_iter() { println!("[Client manager]: recieved message: {:?}", message); - use ClientMgrMessage::{Add, Remove, SendMessage, SendClients}; + use ClientMgrMessage::{Add, Remove, SendClients, SendMessage}; match message { Add(client) => { @@ -69,35 +76,27 @@ impl IPreemptive for ClientManager { if lock.insert(client.uuid, client).is_none() { println!("value is new"); } - }, + } Remove(uuid) => { println!("[Client Manager]: removing client: {:?}", &uuid); - if let Some(client) = - arc.clients.lock().unwrap().remove(&uuid) - { + if let Some(client) = arc.clients.lock().unwrap().remove(&uuid) { client.send_message(ClientMessage::Disconnect); } - }, + } SendMessage { to, from, content } => { + arc.send_to_client(&to, ClientMessage::Message { from, content }) + } + SendClients { to } => { let lock = arc.clients.lock().unwrap(); if let Some(client) = lock.get(&to) { - client.send_message(ClientMessage::Message { - from, - content, - }) - } - }, - SendClients {to} => { - let lock = arc.clients.lock().unwrap(); - if let Some(client) = lock.get(&to) { - let clients_vec: Vec> = lock.values().cloned().collect(); + let clients_vec: Vec> = + lock.values().cloned().collect(); - client.send_message(ClientMessage::Update { + client.send_message(ClientMessage::SendClients { clients: clients_vec, }) } - }, - + } #[allow(unreachable_patterns)] _ => println!("[Client manager]: not implemented"), diff --git a/server/src/main.rs b/server/src/main.rs index dfc409f..e90c6ac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -11,17 +11,20 @@ use server::Server; fn main() { let _args = App::new("--rust chat server--") - .version("0.1.5") - .author("Mitchel Hardie , Michael Bailey ") - .about("this is a chat server developed in rust, depending on the version one of two implementations will be used") - .arg( - Arg::with_name("config") - .short("p") - .long("port") - .value_name("PORT") - .help("sets the port the server runs on.") - .takes_value(true)) - .get_matches(); + .version("0.1.5") + .author("Mitchel Hardie , Michael Bailey ") + .about( + "this is a chat server developed in rust, depending on the version one of two implementations will be used", + ) + .arg( + Arg::with_name("config") + .short("p") + .long("port") + .value_name("PORT") + .help("sets the port the server runs on.") + .takes_value(true), + ) + .get_matches(); let server = Server::new(); diff --git a/server/src/messages.rs b/server/src/messages.rs index f5d2e11..2db937f 100644 --- a/server/src/messages.rs +++ b/server/src/messages.rs @@ -7,7 +7,7 @@ use crate::client::Client; pub enum ClientMessage { Message { from: Uuid, content: String }, - Update {clients: Vec>}, + SendClients { clients: Vec> }, Disconnect, } @@ -16,7 +16,9 @@ pub enum ClientMessage { pub enum ClientMgrMessage { Remove(Uuid), Add(Arc), - SendClients {to: Uuid}, + SendClients { + to: Uuid, + }, SendMessage { from: Uuid, to: Uuid, @@ -26,12 +28,18 @@ pub enum ClientMgrMessage { #[derive(Debug)] pub enum ServerMessage { - ClientConnected(Arc), + ClientConnected { + client: Arc, + }, ClientSendMessage { from: Uuid, to: Uuid, content: String, }, - ClientDisconnected(Uuid), - ClientUpdate(Uuid), + ClientDisconnected { + id: Uuid, + }, + ClientUpdate { + to: Uuid, + }, } diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs index 5e450ac..5a1e0ee 100644 --- a/server/src/network_manager.rs +++ b/server/src/network_manager.rs @@ -19,10 +19,7 @@ pub struct NetworkManager { } impl NetworkManager { - pub fn new( - port: String, - server_channel: Sender, - ) -> Arc { + pub fn new(port: String, server_channel: Sender) -> Arc { let mut address = "0.0.0.0:".to_string(); address.push_str(&port); @@ -63,8 +60,7 @@ impl IPreemptive for NetworkManager { let _ = writeln!( out_buffer, "{}", - serde_json::to_string(&NetworkSockOut::Request) - .unwrap() + serde_json::to_string(&NetworkSockOut::Request).unwrap() ); let _ = writer.write_all(&out_buffer); @@ -112,9 +108,9 @@ impl IPreemptive for NetworkManager { server_channel.clone(), ); server_channel - .send(ServerMessage::ClientConnected( - new_client, - )) + .send(ServerMessage::ClientConnected { + client: new_client, + }) .unwrap_or_default(); } } diff --git a/server/src/server.rs b/server/src/server.rs index 2e7d7ec..cc62ec6 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -48,17 +48,19 @@ impl ICooperative for Server { for message in self.receiver.try_iter() { println!("[server]: received message {:?}", &message); match message { - ServerMessage::ClientConnected(client) => { + ServerMessage::ClientConnected { client } => { self.client_manager.send_message(Add(client)) } - ServerMessage::ClientDisconnected(uuid) => { - println!("disconnecting client {:?}", uuid); - self.client_manager.send_message(Remove(uuid)); + ServerMessage::ClientDisconnected { id } => { + println!("disconnecting client {:?}", id); + self.client_manager.send_message(Remove(id)); } ServerMessage::ClientSendMessage { from, to, content } => self .client_manager .send_message(SendMessage { from, to, content }), - ServerMessage::ClientUpdate (_uuid) => println!("not implemented"), + ServerMessage::ClientUpdate { to } => self + .client_manager + .send_message(ClientMgrMessage::SendClients { to }), } } } -- 2.40.1 From 137ef3d3b1f0ceb2d78a567dc1991d943779f066 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 23 Apr 2021 15:14:45 +0000 Subject: [PATCH 03/30] Adding public key storage --- foundation/Cargo.toml | 3 +++ foundation/src/lib.rs | 2 ++ server/Cargo.toml | 2 ++ 3 files changed, 7 insertions(+) diff --git a/foundation/Cargo.toml b/foundation/Cargo.toml index a20e1ab..52bad92 100644 --- a/foundation/Cargo.toml +++ b/foundation/Cargo.toml @@ -22,4 +22,7 @@ url = "2.2.0" uuid = {version = "0.8", features = ["serde", "v4"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +ring = "0.16.20" +sodiumoxide = "0.2.6" + diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index 97dca2e..af64154 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -3,10 +3,12 @@ pub mod prelude; use serde::{Deserialize, Serialize}; use uuid::Uuid; +// use ring::signature::RsaPublicKeyComponents; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct ClientDetails { pub uuid: Uuid, pub username: String, pub address: String, + // pub public_key: Option>, } diff --git a/server/Cargo.toml b/server/Cargo.toml index 0cbca46..d8d320c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,5 +14,7 @@ serde_json = "1.0" crossbeam = "0.8.0" crossbeam-channel = "0.5.0" zeroize = "1.1.0" +ring = "0.16.20" +sodiumoxide = "0.2.6" foundation = {path = '../foundation'} \ No newline at end of file -- 2.40.1 From 614e73b78803059c13a796f71dcecb1befc3b13d Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Sun, 25 Apr 2021 13:35:38 +0100 Subject: [PATCH 04/30] replaced duplicate fields --- foundation/Cargo.toml | 5 +--- foundation/src/lib.rs | 12 ++++++++-- server/Cargo.toml | 3 +-- server/src/client.rs | 45 ++++++++++++++---------------------- server/src/client_manager.rs | 2 +- 5 files changed, 30 insertions(+), 37 deletions(-) diff --git a/foundation/Cargo.toml b/foundation/Cargo.toml index 52bad92..b3f43a5 100644 --- a/foundation/Cargo.toml +++ b/foundation/Cargo.toml @@ -22,7 +22,4 @@ url = "2.2.0" uuid = {version = "0.8", features = ["serde", "v4"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -ring = "0.16.20" -sodiumoxide = "0.2.6" - - +openssl = "0.10" \ No newline at end of file diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index af64154..248ce04 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -3,12 +3,20 @@ pub mod prelude; use serde::{Deserialize, Serialize}; use uuid::Uuid; -// use ring::signature::RsaPublicKeyComponents; + +/** + * #ClientDetails. + * This defines the fileds a client would want to send when connecitng + * uuid: the unique id of the user. + * username: the users user name. + * address: the ip address of the connected user. + * public_key: the public key used when sending messages to the user. + */ #[derive(Deserialize, Serialize, Debug, Clone)] pub struct ClientDetails { pub uuid: Uuid, pub username: String, pub address: String, - // pub public_key: Option>, + pub public_key: Option>, } diff --git a/server/Cargo.toml b/server/Cargo.toml index d8d320c..ba6ee6a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,7 +14,6 @@ serde_json = "1.0" crossbeam = "0.8.0" crossbeam-channel = "0.5.0" zeroize = "1.1.0" -ring = "0.16.20" -sodiumoxide = "0.2.6" +openssl = "0.10.33" foundation = {path = '../foundation'} \ No newline at end of file diff --git a/server/src/client.rs b/server/src/client.rs index bfd9bee..e3590c0 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -23,9 +23,7 @@ use foundation::ClientDetails; /// This struct represents a connected user. /// /// ## Attrubutes -/// - uuid: The id of the connected user. -/// - username: The username of the connected user. -/// - address: The the address of the connected client. +/// - details: store of the clients infomation. /// /// - stream: The socket for the connected client. /// - stream_reader: the buffered reader used to receive messages @@ -33,9 +31,6 @@ use foundation::ClientDetails; /// - owner: An optional reference to the owning object. #[derive(Debug, Serialize)] pub struct Client { - pub uuid: Uuid, - username: String, - address: String, pub details: ClientDetails, // non serializable @@ -73,14 +68,11 @@ impl Client { let in_stream = stream.try_clone().unwrap(); Arc::new(Client { - username: username.clone(), - uuid: Uuid::parse_str(&uuid).expect("invalid id"), - address: address.clone(), - details: ClientDetails { uuid: Uuid::parse_str(&uuid).expect("invalid id"), username, address, + public_key: None }, server_channel: Mutex::new(Some(server_channel)), @@ -116,7 +108,7 @@ impl IPreemptive for Client { // read thread let _ = std::thread::Builder::new() - .name(format!("client thread recv [{:?}]", &arc.uuid)) + .name(format!("client thread recv [{:?}]", &arc.details.uuid)) .spawn(move || { use ClientMessage::Disconnect; let arc = arc1; @@ -132,19 +124,19 @@ impl IPreemptive for Client { } let command = serde_json::from_str::(buffer.as_str()); - println!("[Client {:?}]: recieved {}", arc.uuid, &buffer); + println!("[Client {:?}]: recieved {}", arc.details.uuid, &buffer); match command { Ok(ClientStreamIn::Disconnect) => { - println!("[Client {:?}]: Disconnect recieved", &arc.uuid); + println!("[Client {:?}]: Disconnect recieved", &arc.details.uuid); arc.send_message(Disconnect); break 'main; } Ok(ClientStreamIn::SendMessage { to, content }) => { - println!("[Client {:?}]: send message to: {:?}", &arc.uuid, &to); + println!("[Client {:?}]: send message to: {:?}", &arc.details.uuid, &to); let lock = arc.server_channel.lock().unwrap(); let sender = lock.as_ref().unwrap(); let _ = sender.send(ServerMessage::ClientSendMessage { - from: arc.uuid, + from: arc.details.uuid, to, content, }); @@ -152,18 +144,18 @@ impl IPreemptive for Client { Ok(ClientStreamIn::Update) => { let lock = arc.server_channel.lock().unwrap(); let sender = lock.as_ref().unwrap(); - let _ = sender.send(ServerMessage::ClientUpdate { to: arc.uuid }); + let _ = sender.send(ServerMessage::ClientUpdate { to: arc.details.uuid }); } - _ => println!("[Client {:?}]: command not found", &arc.uuid), + _ => println!("[Client {:?}]: command not found", &arc.details.uuid), } buffer.zeroize(); } - println!("[Client {:?}] exited thread 1", &arc.uuid); + println!("[Client {:?}] exited thread 1", &arc.details.uuid); }); // write thread let _ = std::thread::Builder::new() - .name(format!("client thread msg [{:?}]", &arc.uuid)) + .name(format!("client thread msg [{:?}]", &arc.details.uuid)) .spawn(move || { let arc = arc2; let mut writer_lock = arc.stream_writer.lock().unwrap(); @@ -181,7 +173,7 @@ impl IPreemptive for Client { 'main: loop { for message in arc.output.iter() { use ClientMessage::{Disconnect, Message, SendClients}; - println!("[Client {:?}]: {:?}", &arc.uuid, message); + println!("[Client {:?}]: {:?}", &arc.details.uuid, message); match message { Disconnect => { arc.server_channel @@ -189,7 +181,7 @@ impl IPreemptive for Client { .unwrap() .as_mut() .unwrap() - .send(ServerMessage::ClientDisconnected { id: arc.uuid }) + .send(ServerMessage::ClientDisconnected { id: arc.details.uuid }) .unwrap(); break 'main; } @@ -218,7 +210,7 @@ impl IPreemptive for Client { buffer.zeroize(); } } - println!("[Client {:?}]: exited thread 2", &arc.uuid); + println!("[Client {:?}]: exited thread 2", &arc.details.uuid); }); } @@ -232,14 +224,11 @@ impl Default for Client { fn default() -> Self { let (sender, reciever) = unbounded(); Client { - username: "generic_client".to_string(), - uuid: Uuid::new_v4(), - address: "127.0.0.1".to_string(), - details: ClientDetails { uuid: Uuid::new_v4(), username: "generic_client".to_string(), address: "127.0.0.1".to_string(), + public_key: None }, output: reciever, @@ -258,7 +247,7 @@ impl Default for Client { // MARK: - used for sorting. impl PartialEq for Client { fn eq(&self, other: &Self) -> bool { - self.uuid == other.uuid + self.details.uuid == other.details.uuid } } @@ -272,7 +261,7 @@ impl PartialOrd for Client { impl Ord for Client { fn cmp(&self, other: &Self) -> Ordering { - self.uuid.cmp(&other.uuid) + self.details.uuid.cmp(&other.details.uuid) } } diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index 0c0cbef..51b6721 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -73,7 +73,7 @@ impl IPreemptive for ClientManager { println!("[Client Manager]: adding new client"); Client::start(&client); let mut lock = arc.clients.lock().unwrap(); - if lock.insert(client.uuid, client).is_none() { + if lock.insert(client.details.uuid, client).is_none() { println!("value is new"); } } -- 2.40.1 From 8ebfbb0a704d74d5d25d0e9caceaa9a73bede80a Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Sat, 10 Jul 2021 14:02:04 +0100 Subject: [PATCH 05/30] changed messaegs to include a type property --- foundation/src/messages/client.rs | 2 ++ foundation/src/messages/network.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs index 2944a31..0488c58 100644 --- a/foundation/src/messages/client.rs +++ b/foundation/src/messages/client.rs @@ -8,6 +8,7 @@ use uuid::Uuid; /// This uses the serde library to transform to and from json. /// #[derive(Serialize, Deserialize)] +#[serde(tag = "type")] pub enum ClientStreamIn { Connected, @@ -19,6 +20,7 @@ pub enum ClientStreamIn { } #[derive(Serialize, Deserialize)] +#[serde(tag = "type")] pub enum ClientStreamOut { Connected, diff --git a/foundation/src/messages/network.rs b/foundation/src/messages/network.rs index 98a2683..3a9aad6 100644 --- a/foundation/src/messages/network.rs +++ b/foundation/src/messages/network.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] +#[serde(tag = "type")] pub enum NetworkSockIn { Info, Connect { @@ -11,6 +12,7 @@ pub enum NetworkSockIn { } #[derive(Serialize, Deserialize)] +#[serde(tag = "type")] pub enum NetworkSockOut<'a> { Request, -- 2.40.1 From 0ed2c5a290ffac864732c64302e456ac655ce2f8 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Sat, 10 Jul 2021 14:03:06 +0100 Subject: [PATCH 06/30] added encryption example --- foundation/src/encryption/mod.rs | 32 ++++++++++++++++++++++++++++++++ foundation/src/lib.rs | 3 +++ 2 files changed, 35 insertions(+) create mode 100644 foundation/src/encryption/mod.rs diff --git a/foundation/src/encryption/mod.rs b/foundation/src/encryption/mod.rs new file mode 100644 index 0000000..352dc52 --- /dev/null +++ b/foundation/src/encryption/mod.rs @@ -0,0 +1,32 @@ +use openssl::symm::{Cipher, Crypter, Mode}; +use openssl::sha::sha256; + + +#[cfg(test)] +mod test { + use openssl::symm::{Cipher, Crypter, Mode}; + use openssl::sha::sha256; + + #[test] + fn testEncryption() { + let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes(); + let key = sha256(b"This is a key"); + let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); + + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV)); + let mut decrypted = vec![0u8; 1024]; + decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); + + println!("{:?}", plaintext); + println!("{:?}", ciphertext.as_slice()); + println!("{:?}", decrypted.as_slice()); + + println!("{:?}", plaintext.len()); + println!("{:?}", ciphertext.len()); + println!("{:?}", decrypted.len()); + } +} \ No newline at end of file diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index 248ce04..f277e13 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -1,5 +1,6 @@ pub mod messages; pub mod prelude; +pub mod encryption; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -20,3 +21,5 @@ pub struct ClientDetails { pub address: String, pub public_key: Option>, } + + -- 2.40.1 From 596dd0db0500a7c43f57f2d26eecd5d6e82b2119 Mon Sep 17 00:00:00 2001 From: michael bailey Date: Tue, 3 Aug 2021 21:54:04 +0100 Subject: [PATCH 07/30] Update README.md (#16) * Update README.md + added feature, todo and goals section. * Update README.md --- README.md | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index de85e4c..260be15 100644 --- a/README.md +++ b/README.md @@ -1 +1,32 @@ -# rust-chat-server \ No newline at end of file +# Rust-chat-server + +A Chat server writen in rust to allow communication between peers. + +--- + +## Features: +- implemented: + - json based API. + - Server introspection. + - Peer discovery. + - sending messages to connected clients. + - +- todo: + - Encryption to server. + - server to server meshing. + - asynchronous client managment instead of threaded approach. + +## Goals: +- Learn the rust programming lanaguage. + - Ownership: how that affects normal programming styles. + - Borrowing and references: how this affects shared state. + - Lifetimes: how this affects data retention and sharing. +- Learn how to create networked programs. + - Application level protocol: how to get two programs to communicate via TCP sockets. + - Socket handling: Discovering ways to handle multiple socket connections without affecting performance. +- Learn common encryption protocols. + - Adding support for encrypted sockets. + - Pros and cons of symetric and asymetric encryption. + - resolving common encryption flaws + +> Questions: For questions please add a issue with the question label. It will eventually be responded to -- 2.40.1 From 2f8677710ae4324d1c195744613e18215ecf3480 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 30 Jul 2021 11:50:08 +0100 Subject: [PATCH 08/30] Moved threads to tokio async --- foundation/src/encryption/mod.rs | 41 +++-- foundation/src/lib.rs | 5 +- server/Cargo.toml | 2 + server/src/client.rs | 289 +++++++++++++------------------ server/src/client_manager.rs | 128 +++++++------- server/src/main.rs | 11 +- server/src/network_manager.rs | 178 ++++++++----------- server/src/server.rs | 89 +++++----- 8 files changed, 334 insertions(+), 409 deletions(-) diff --git a/foundation/src/encryption/mod.rs b/foundation/src/encryption/mod.rs index 352dc52..cc05424 100644 --- a/foundation/src/encryption/mod.rs +++ b/foundation/src/encryption/mod.rs @@ -1,32 +1,37 @@ -use openssl::symm::{Cipher, Crypter, Mode}; -use openssl::sha::sha256; - +// use openssl::sha::sha256; +// use openssl::symm::{Cipher, Crypter, Mode}; #[cfg(test)] mod test { - use openssl::symm::{Cipher, Crypter, Mode}; use openssl::sha::sha256; + use openssl::symm::{Cipher, Crypter, Mode}; #[test] fn testEncryption() { let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes(); - let key = sha256(b"This is a key"); + let key = sha256(b"This is a key"); let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV)); - let mut ciphertext = vec![0u8; 1024]; - let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter + .unwrap() + .update(plaintext, ciphertext.as_mut_slice()) + .unwrap(); - let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV)); - let mut decrypted = vec![0u8; 1024]; - decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV)); + let mut decrypted = vec![0u8; 1024]; + decrypter + .unwrap() + .update(&ciphertext[..cipherlen], decrypted.as_mut_slice()) + .unwrap(); - println!("{:?}", plaintext); - println!("{:?}", ciphertext.as_slice()); - println!("{:?}", decrypted.as_slice()); + println!("{:?}", plaintext); + println!("{:?}", ciphertext.as_slice()); + println!("{:?}", decrypted.as_slice()); - println!("{:?}", plaintext.len()); - println!("{:?}", ciphertext.len()); - println!("{:?}", decrypted.len()); + println!("{:?}", plaintext.len()); + println!("{:?}", ciphertext.len()); + println!("{:?}", decrypted.len()); } -} \ No newline at end of file +} diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index f277e13..e1b3daa 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -1,11 +1,10 @@ +pub mod encryption; pub mod messages; pub mod prelude; -pub mod encryption; use serde::{Deserialize, Serialize}; use uuid::Uuid; - /** * #ClientDetails. * This defines the fileds a client would want to send when connecitng @@ -21,5 +20,3 @@ pub struct ClientDetails { pub address: String, pub public_key: Option>, } - - diff --git a/server/Cargo.toml b/server/Cargo.toml index ba6ee6a..a32a144 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,5 +15,7 @@ crossbeam = "0.8.0" crossbeam-channel = "0.5.0" zeroize = "1.1.0" openssl = "0.10.33" +tokio = { version = "1.9.0", features = ["full"] } +futures = "0.3.16" foundation = {path = '../foundation'} \ No newline at end of file diff --git a/server/src/client.rs b/server/src/client.rs index e3590c0..d12e0c5 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,56 +1,47 @@ -use crate::messages::ClientMessage; -use crate::messages::ServerMessage; -use foundation::prelude::IPreemptive; -use std::cmp::Ordering; -use std::io::BufRead; -use std::io::Write; -use std::io::{BufReader, BufWriter}; -use std::mem::replace; -use std::net::TcpStream; use std::sync::Arc; -use std::sync::Mutex; +use std::cmp::Ordering; +use std::fmt::Write; -use crossbeam_channel::{unbounded, Receiver, Sender}; -use serde::Serialize; use uuid::Uuid; + use zeroize::Zeroize; -use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; -use foundation::prelude::IMessagable; +use futures::lock::Mutex; + +use tokio::task; +use tokio::io::{ReadHalf, WriteHalf}; +use tokio::sync::mpsc::{Sender, Receiver, channel}; +use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt}; + +use crate::messages::ClientMessage; +use crate::messages::ServerMessage; + use foundation::ClientDetails; +use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; /// # Client /// This struct represents a connected user. /// -/// ## Attrubutes +/// ## Attributes /// - details: store of the clients infomation. /// /// - stream: The socket for the connected client. /// - stream_reader: the buffered reader used to receive messages /// - stream_writer: the buffered writer used to send messages /// - owner: An optional reference to the owning object. -#[derive(Debug, Serialize)] +#[derive(Debug)] pub struct Client { pub details: ClientDetails, - // non serializable - #[serde(skip)] - server_channel: Mutex>>, + // server send channel + server_channel: Mutex>, - #[serde(skip)] - input: Sender, + // object channels + tx: Sender, + rx: Mutex>, - #[serde(skip)] - output: Receiver, - - #[serde(skip)] - stream: Mutex>, - - #[serde(skip)] - stream_reader: Mutex>>, - - #[serde(skip)] - stream_writer: Mutex>>, + stream_rx: Mutex>>, + stream_tx: Mutex>, } // client funciton implmentations @@ -59,13 +50,11 @@ impl Client { uuid: String, username: String, address: String, - stream: TcpStream, + stream_rx: BufReader>, + stream_tx: WriteHalf, server_channel: Sender, ) -> Arc { - let (sender, receiver) = unbounded(); - - let out_stream = stream.try_clone().unwrap(); - let in_stream = stream.try_clone().unwrap(); + let (sender, receiver) = channel(1024); Arc::new(Client { details: ClientDetails { @@ -75,172 +64,134 @@ impl Client { public_key: None }, - server_channel: Mutex::new(Some(server_channel)), + server_channel: Mutex::new(server_channel), - input: sender, - output: receiver, + tx: sender, + rx: Mutex::new(receiver), - stream: Mutex::new(Some(stream)), - - stream_reader: Mutex::new(Some(BufReader::new(in_stream))), - stream_writer: Mutex::new(Some(BufWriter::new(out_stream))), + stream_rx: Mutex::new(stream_rx), + stream_tx: Mutex::new(stream_tx), }) } -} -impl IMessagable> for Client { - fn send_message(&self, msg: ClientMessage) { - self.input - .send(msg) - .expect("failed to send message to client."); - } - fn set_sender(&self, sender: Sender) { - let mut server_lock = self.server_channel.lock().unwrap(); - let _ = replace(&mut *server_lock, Some(sender)); - } -} + pub fn start(self: &Arc) { -// cooperative multitasking implementation -impl IPreemptive for Client { - fn run(arc: &Arc) { - let arc1 = arc.clone(); - let arc2 = arc.clone(); + let t1_client = self.clone(); + let t2_client = self.clone(); - // read thread - let _ = std::thread::Builder::new() - .name(format!("client thread recv [{:?}]", &arc.details.uuid)) - .spawn(move || { - use ClientMessage::Disconnect; - let arc = arc1; + // client stream read task + tokio::spawn(async move { + use ClientMessage::Disconnect; + + let client = t1_client; + + let mut lock = client.stream_tx.lock().await; + let mut buffer = String::new(); + + // tell client that is is now connected + let _ = writeln!(buffer, "{}", + serde_json::to_string(&ClientStreamOut::Connected).unwrap() + ); + + let _ = lock.write_all(&buffer.as_bytes()); + let _ = lock.flush().await; + + drop(lock); + + loop { + let mut stream_reader = client.stream_rx.lock().await; let mut buffer = String::new(); - let mut reader_lock = arc.stream_reader.lock().unwrap(); - let reader = reader_lock.as_mut().unwrap(); - 'main: while let Ok(size) = reader.read_line(&mut buffer) { - if size == 0 { - arc.send_message(Disconnect); - break 'main; - } + if let Ok(_size) = stream_reader.read_line(&mut buffer).await { let command = serde_json::from_str::(buffer.as_str()); - println!("[Client {:?}]: recieved {}", arc.details.uuid, &buffer); + println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer); + match command { Ok(ClientStreamIn::Disconnect) => { - println!("[Client {:?}]: Disconnect recieved", &arc.details.uuid); - arc.send_message(Disconnect); - break 'main; + println!("[Client {:?}]: Disconnect recieved", &client.details.uuid); + client.send_message(Disconnect).await; + return; } Ok(ClientStreamIn::SendMessage { to, content }) => { - println!("[Client {:?}]: send message to: {:?}", &arc.details.uuid, &to); - let lock = arc.server_channel.lock().unwrap(); - let sender = lock.as_ref().unwrap(); - let _ = sender.send(ServerMessage::ClientSendMessage { - from: arc.details.uuid, + println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientSendMessage { + from: client.details.uuid, to, content, }); } Ok(ClientStreamIn::Update) => { - let lock = arc.server_channel.lock().unwrap(); - let sender = lock.as_ref().unwrap(); - let _ = sender.send(ServerMessage::ClientUpdate { to: arc.details.uuid }); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }); } - _ => println!("[Client {:?}]: command not found", &arc.details.uuid), + _ => println!("[Client {:?}]: command not found", &client.details.uuid), } buffer.zeroize(); } - println!("[Client {:?}] exited thread 1", &arc.details.uuid); - }); + println!("[Client {:?}] exited thread 1", &client.details.uuid); + } + }); - // write thread - let _ = std::thread::Builder::new() - .name(format!("client thread msg [{:?}]", &arc.details.uuid)) - .spawn(move || { - let arc = arc2; - let mut writer_lock = arc.stream_writer.lock().unwrap(); - let writer = writer_lock.as_mut().unwrap(); - let mut buffer: Vec = Vec::new(); + // client channel read thread + tokio::spawn(async move { + use ClientMessage::{Disconnect, Message, SendClients}; - let _ = writeln!( - buffer, - "{}", - serde_json::to_string(&ClientStreamOut::Connected).unwrap() - ); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); + let client = t2_client; - 'main: loop { - for message in arc.output.iter() { - use ClientMessage::{Disconnect, Message, SendClients}; - println!("[Client {:?}]: {:?}", &arc.details.uuid, message); - match message { - Disconnect => { - arc.server_channel - .lock() - .unwrap() - .as_mut() - .unwrap() - .send(ServerMessage::ClientDisconnected { id: arc.details.uuid }) - .unwrap(); - break 'main; - } - Message { from, content } => { - let msg = &ClientStreamOut::UserMessage { from, content }; - let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap()); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); - } - SendClients { clients } => { - let client_details_vec: Vec = clients - .iter() - .map(|client| &client.details) - .cloned() - .collect(); + loop { + let mut channel = client.rx.lock().await; + let mut buffer = String::new(); - let msg = &ClientStreamOut::ConnectedClients { - clients: client_details_vec, - }; + let message = channel.recv().await.unwrap(); + drop(channel); - let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap()); - let _ = writer.write_all(&buffer); - let _ = writer.flush(); - } - } - buffer.zeroize(); + println!("[Client {:?}]: {:?}", &client.details.uuid, message); + match message { + Disconnect => { + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await; + return + } + Message { from, content } => { + let msg = ClientStreamOut::UserMessage { from, content }; + let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap()); + + let mut stream = client.stream_tx.lock().await; + + let _ = stream.write_all(&buffer.as_bytes()); + let _ = stream.flush().await; + + drop(stream); + } + SendClients { clients } => { + let client_details_vec: Vec = clients + .iter() + .map(|client| &client.details) + .cloned() + .collect(); + + let msg = ClientStreamOut::ConnectedClients { + clients: client_details_vec, + }; + + let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap()); + + let mut stream = client.stream_tx.lock().await; + + + let _ = stream.write_all(&buffer.as_bytes()); + let _ = stream.flush().await; } } - println!("[Client {:?}]: exited thread 2", &arc.details.uuid); - }); + } + }); } - fn start(arc: &Arc) { - Client::run(arc) - } -} - -// default value implementation -impl Default for Client { - fn default() -> Self { - let (sender, reciever) = unbounded(); - Client { - details: ClientDetails { - uuid: Uuid::new_v4(), - username: "generic_client".to_string(), - address: "127.0.0.1".to_string(), - public_key: None - }, - - output: reciever, - input: sender, - - server_channel: Mutex::new(None), - - stream: Mutex::new(None), - - stream_reader: Mutex::new(None), - stream_writer: Mutex::new(None), - } + pub async fn send_message(self: &Arc, msg: ClientMessage) { + let _ = self.tx.send(msg).await; } } diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index 51b6721..4c962b1 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -1,18 +1,15 @@ -// use crate::lib::server::ServerMessages; -use foundation::prelude::IPreemptive; use std::collections::HashMap; -use std::mem::replace; use std::sync::Arc; -use std::sync::Mutex; -use crossbeam_channel::{unbounded, Receiver, Sender}; use uuid::Uuid; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::task; +use futures::lock::Mutex; use crate::client::Client; use crate::messages::ClientMessage; use crate::messages::ClientMgrMessage; use crate::messages::ServerMessage; -use foundation::prelude::IMessagable; /// # ClientManager /// This struct manages all connected users @@ -22,92 +19,85 @@ pub struct ClientManager { server_channel: Mutex>, - sender: Sender, - receiver: Receiver, + tx: Sender, + rx: Mutex>, } impl ClientManager { pub fn new(server_channel: Sender) -> Arc { - let (sender, receiver) = unbounded(); + let (tx, rx) = channel(1024); Arc::new(ClientManager { clients: Mutex::default(), server_channel: Mutex::new(server_channel), - sender, - receiver, + tx, + rx: Mutex::new(rx), }) } - fn send_to_client(&self, id: &Uuid, msg: ClientMessage) { - let lock = self.clients.lock().unwrap(); - if let Some(client) = lock.get(id) { - client.send_message(msg) - } - } -} + pub fn start(self: &Arc) { -impl IMessagable> for ClientManager { - fn send_message(&self, msg: ClientMgrMessage) { - self.sender.send(msg).unwrap(); - } - fn set_sender(&self, sender: Sender) { - let mut server_lock = self.server_channel.lock().unwrap(); - let _ = replace(&mut *server_lock, sender); - } -} + let client_manager = self.clone(); -impl IPreemptive for ClientManager { - fn run(arc: &Arc) { - loop { - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::spawn(async move { - if !arc.receiver.is_empty() { - for message in arc.receiver.try_iter() { - println!("[Client manager]: recieved message: {:?}", message); - use ClientMgrMessage::{Add, Remove, SendClients, SendMessage}; + use ClientMgrMessage::{Add, Remove, SendClients, SendMessage}; - match message { - Add(client) => { - println!("[Client Manager]: adding new client"); - Client::start(&client); - let mut lock = arc.clients.lock().unwrap(); - if lock.insert(client.details.uuid, client).is_none() { - println!("value is new"); - } + loop { + let mut receiver = client_manager.rx.lock().await; + let message = receiver.recv().await.unwrap(); + + println!("[Client manager]: recieved message: {:?}", message); + + match message { + Add(client) => { + println!("[Client Manager]: adding new client"); + client.start(); + let mut lock = client_manager.clients.lock().await; + if lock.insert(client.details.uuid, client).is_none() { + println!("value is new"); } - Remove(uuid) => { - println!("[Client Manager]: removing client: {:?}", &uuid); - if let Some(client) = arc.clients.lock().unwrap().remove(&uuid) { - client.send_message(ClientMessage::Disconnect); - } - } - SendMessage { to, from, content } => { - arc.send_to_client(&to, ClientMessage::Message { from, content }) - } - SendClients { to } => { - let lock = arc.clients.lock().unwrap(); - if let Some(client) = lock.get(&to) { - let clients_vec: Vec> = - lock.values().cloned().collect(); - - client.send_message(ClientMessage::SendClients { - clients: clients_vec, - }) - } - } - - #[allow(unreachable_patterns)] - _ => println!("[Client manager]: not implemented"), } + Remove(uuid) => { + println!("[Client Manager]: removing client: {:?}", &uuid); + if let Some(client) = client_manager.clients.lock().await.remove(&uuid) { + client.send_message(ClientMessage::Disconnect).await; + } + } + SendMessage { to, from, content } => { + client_manager.send_to_client(&to, ClientMessage::Message { from, content }).await; + } + SendClients { to } => { + let lock = client_manager.clients.lock().await; + if let Some(client) = lock.get(&to) { + let clients_vec: Vec> = + lock.values().cloned().collect(); + + client.send_message(ClientMessage::SendClients { + clients: clients_vec, + }).await + } + } + #[allow(unreachable_patterns)] + _ => println!("[Client manager]: not implemented"), } } + }); + } + + async fn send_to_client(self: &Arc, id: &Uuid, msg: ClientMessage) { + let lock = self.clients.lock().await; + if let Some(client) = lock.get(&id) { + client.clone().send_message(msg).await; } } - fn start(arc: &Arc) { - let arc = arc.clone(); - std::thread::spawn(move || ClientManager::run(&arc)); + pub async fn send_message( + self: Arc, + message: ClientMgrMessage) + { + let _ = self.tx.send(message).await; } } diff --git a/server/src/main.rs b/server/src/main.rs index e90c6ac..904040f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,12 +4,14 @@ pub mod messages; pub mod network_manager; pub mod server; +use std::io; + use clap::{App, Arg}; -use foundation::prelude::IPreemptive; use server::Server; -fn main() { +#[tokio::main] +async fn main() -> io::Result<()> { let _args = App::new("--rust chat server--") .version("0.1.5") .author("Mitchel Hardie , Michael Bailey ") @@ -26,7 +28,8 @@ fn main() { ) .get_matches(); - let server = Server::new(); + let server = Server::new().unwrap(); - Server::run(&server); + server.start().await; + Ok(()) } diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs index 5a1e0ee..bcdc612 100644 --- a/server/src/network_manager.rs +++ b/server/src/network_manager.rs @@ -1,128 +1,104 @@ -use foundation::prelude::IPreemptive; -use std::io::BufRead; -use std::io::BufReader; -use std::io::BufWriter; -use std::io::Write; -use std::net::TcpListener; use std::sync::Arc; -use std::thread; +use std::io::Write; -use crossbeam_channel::Sender; +use tokio::task; +use tokio::net::TcpListener; +use tokio::sync::mpsc::Sender; +use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader}; use crate::client::Client; use crate::messages::ServerMessage; use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; pub struct NetworkManager { - listener: TcpListener, + address: String, server_channel: Sender, } impl NetworkManager { - pub fn new(port: String, server_channel: Sender) -> Arc { - let mut address = "0.0.0.0:".to_string(); - address.push_str(&port); - - let listener = TcpListener::bind(address).expect("Could not bind to address"); - + pub fn new(_port: String, server_channel: Sender) -> Arc { Arc::new(NetworkManager { - listener, + address: "0.0.0.0:5600".to_string(), server_channel, }) } -} -impl IPreemptive for NetworkManager { - fn run(_: &Arc) {} + pub fn start(self: &Arc) { - fn start(arc: &Arc) { - let arc = arc.clone(); - std::thread::spawn(move || { - // fetch new connections and add them to the client queue - for connection in arc.listener.incoming() { - println!("[NetworkManager]: New Connection!"); - match connection { - Ok(stream) => { - let server_channel = arc.server_channel.clone(); + let network_manager = self.clone(); - // create readers - let mut reader = BufReader::new(stream.try_clone().unwrap()); - let mut writer = BufWriter::new(stream.try_clone().unwrap()); + tokio::spawn(async move { + let listener = TcpListener::bind(network_manager.address.clone()).await.unwrap(); - let _handle = thread::Builder::new() - .name("NetworkJoinThread".to_string()) - .spawn(move || { - let mut out_buffer: Vec = Vec::new(); - let mut in_buffer: String = String::new(); + loop { + let (connection, _) = listener.accept().await.unwrap(); + let (rd, mut wd) = io::split(connection); + + let mut reader = BufReader::new(rd); + let server_channel = network_manager.server_channel.clone(); - // send request message to connection + task::spawn(async move { + let mut out_buffer: Vec = Vec::new(); + let mut in_buffer: String = String::new(); - let _ = writeln!( - out_buffer, - "{}", - serde_json::to_string(&NetworkSockOut::Request).unwrap() + // write request + let a = serde_json::to_string(&NetworkSockOut::Request).unwrap(); + println!("{:?}", &a); + let _ = writeln!( + out_buffer, + "{}", + a + ); + + let _ = wd.write_all(&out_buffer).await; + let _ = wd.flush().await; + + // get response + let _ = reader.read_line(&mut in_buffer).await.unwrap(); + + //match the response + if let Ok(request) = + serde_json::from_str::(&in_buffer) + { + match request { + NetworkSockIn::Info => { + // send back server info to the connection + let _ = wd.write_all( + serde_json::to_string( + &NetworkSockOut::GotInfo { + server_name: "oof", + server_owner: "michael", + }, + ) + .unwrap() + .as_bytes(), + ).await; + let _ = wd.write_all(b"\n").await; + let _ = wd.flush().await; + } + NetworkSockIn::Connect { + uuid, + username, + address, + } => { + // create client and send to server + let new_client = Client::new( + uuid, + username, + address, + reader, + wd, + server_channel.clone(), ); - - let _ = writer.write_all(&out_buffer); - let _ = writer.flush(); - - // try get response - let res = reader.read_line(&mut in_buffer); - if res.is_err() { - return; - } - - //match the response - if let Ok(request) = - serde_json::from_str::(&in_buffer) - { - match request { - NetworkSockIn::Info => { - // send back server info to the connection - writer - .write_all( - serde_json::to_string( - &NetworkSockOut::GotInfo { - server_name: "oof", - server_owner: "michael", - }, - ) - .unwrap() - .as_bytes(), - ) - .unwrap(); - writer.write_all(b"\n").unwrap(); - writer.flush().unwrap(); - } - NetworkSockIn::Connect { - uuid, - username, - address, - } => { - // create client and send to server - let new_client = Client::new( - uuid, - username, - address, - stream.try_clone().unwrap(), - server_channel.clone(), - ); - server_channel - .send(ServerMessage::ClientConnected { - client: new_client, - }) - .unwrap_or_default(); - } - } - } - }); + let _ = server_channel + .send(ServerMessage::ClientConnected { + client: new_client, + }).await; + } + } } - Err(e) => { - println!("[Network manager]: error getting stream: {:?}", e); - continue; - } - } + }); } - }); + }); } } diff --git a/server/src/server.rs b/server/src/server.rs index cc62ec6..2bdcb62 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,15 +1,15 @@ use std::sync::Arc; -use crossbeam_channel::{unbounded, Receiver}; +// use crossbeam_channel::{unbounded, Receiver}; use uuid::Uuid; +use tokio::task; +use tokio::sync::mpsc::{channel, Receiver}; +use futures::lock::Mutex; use crate::client_manager::ClientManager; use crate::messages::ClientMgrMessage; use crate::messages::ServerMessage; use crate::network_manager::NetworkManager; -use foundation::prelude::ICooperative; -use foundation::prelude::IMessagable; -use foundation::prelude::IPreemptive; /// # ServerMessages /// This is used internally to send messages to the server to be dispatched @@ -19,67 +19,68 @@ pub enum ServerMessages { ClientDisconnected(Uuid), } +/// # Server +/// authors: @michael-bailey, @Mitch161 +/// This Represents a server instance. +/// it is componsed of a client manager and a network manager +/// pub struct Server { client_manager: Arc, network_manager: Arc, - - receiver: Receiver, + receiver: Mutex>, } impl Server { - pub fn new() -> Arc { - let (sender, receiver) = unbounded(); + /// Create a new server object + pub fn new() -> Result, Box> { + let (sender, receiver) = channel(1024); - Arc::new(Server { - client_manager: ClientManager::new(sender.clone()), - - network_manager: NetworkManager::new("5600".to_string(), sender), - receiver, - }) + Ok( + Arc::new( + Server { + client_manager: ClientManager::new(sender.clone()), + network_manager: NetworkManager::new("5600".to_string(), sender), + receiver: Mutex::new(receiver), + } + ) + ) } -} -impl ICooperative for Server { - fn tick(&self) { + pub async fn start(self: &Arc) { + + // start client manager and network manager + self.network_manager.clone().start(); + self.client_manager.clone().start(); + + // clone block items + let server = self.clone(); + + use ClientMgrMessage::{Add, Remove, SendMessage}; - // handle new messages loop - if !self.receiver.is_empty() { - for message in self.receiver.try_iter() { + loop { + let mut lock = server.receiver.lock().await; + if let Some(message) = lock.recv().await { println!("[server]: received message {:?}", &message); + match message { ServerMessage::ClientConnected { client } => { - self.client_manager.send_message(Add(client)) + server.client_manager.clone() + .send_message(Add(client)).await } ServerMessage::ClientDisconnected { id } => { println!("disconnecting client {:?}", id); - self.client_manager.send_message(Remove(id)); + server.client_manager.clone().send_message(Remove(id)).await; } - ServerMessage::ClientSendMessage { from, to, content } => self - .client_manager - .send_message(SendMessage { from, to, content }), - ServerMessage::ClientUpdate { to } => self - .client_manager - .send_message(ClientMgrMessage::SendClients { to }), + ServerMessage::ClientSendMessage { from, to, content } => server + .client_manager.clone() + .send_message(SendMessage { from, to, content }).await, + ServerMessage::ClientUpdate { to } => server + .client_manager.clone() + .send_message(ClientMgrMessage::SendClients { to }).await, } } } } } -impl IPreemptive for Server { - fn run(arc: &std::sync::Arc) { - // start services - NetworkManager::start(&arc.network_manager); - ClientManager::start(&arc.client_manager); - loop { - arc.tick(); - } - } - - fn start(arc: &std::sync::Arc) { - let arc = arc.clone(); - // start thread - std::thread::spawn(move || Server::run(&arc)); - } -} -- 2.40.1 From 14495e1b273a3d6af677de939600136fcfb74cf2 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 30 Jul 2021 11:50:08 +0100 Subject: [PATCH 09/30] Moved threads to tokio async --- foundation/src/messages/client.rs | 2 ++ server/src/client.rs | 26 ++++++++++++++++++-------- server/src/client_manager.rs | 11 ++++++++--- server/src/messages.rs | 8 ++++++++ server/src/server.rs | 4 +++- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs index 0488c58..51ebf92 100644 --- a/foundation/src/messages/client.rs +++ b/foundation/src/messages/client.rs @@ -30,4 +30,6 @@ pub enum ClientStreamOut { ConnectedClients { clients: Vec }, Disconnected, + + Error, } diff --git a/server/src/client.rs b/server/src/client.rs index d12e0c5..360c701 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -8,7 +8,6 @@ use zeroize::Zeroize; use futures::lock::Mutex; -use tokio::task; use tokio::io::{ReadHalf, WriteHalf}; use tokio::sync::mpsc::{Sender, Receiver, channel}; use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt}; @@ -121,23 +120,27 @@ impl Client { from: client.details.uuid, to, content, - }); + }).await; } Ok(ClientStreamIn::Update) => { + println!("[Client {:?}]: update received", &client.details.uuid); let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }); + let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await; + } + _ => { + println!("[Client {:?}]: command not found", &client.details.uuid); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await; } - _ => println!("[Client {:?}]: command not found", &client.details.uuid), } buffer.zeroize(); } - println!("[Client {:?}] exited thread 1", &client.details.uuid); } }); // client channel read thread tokio::spawn(async move { - use ClientMessage::{Disconnect, Message, SendClients}; + use ClientMessage::{Disconnect, Message, SendClients, Error}; let client = t2_client; @@ -161,7 +164,7 @@ impl Client { let mut stream = client.stream_tx.lock().await; - let _ = stream.write_all(&buffer.as_bytes()); + let _ = stream.write_all(&buffer.as_bytes()).await; let _ = stream.flush().await; drop(stream); @@ -181,8 +184,15 @@ impl Client { let mut stream = client.stream_tx.lock().await; + let _ = stream.write_all(&buffer.as_bytes()).await; + let _ = stream.flush().await; + }, + Error => { + let _ = writeln!(buffer, "{}", serde_json::to_string(&ClientStreamOut::Error).unwrap()); - let _ = stream.write_all(&buffer.as_bytes()); + let mut stream = client.stream_tx.lock().await; + + let _ = stream.write_all(&buffer.as_bytes()).await; let _ = stream.flush().await; } } diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index 4c962b1..b7a6333 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use uuid::Uuid; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::task; use futures::lock::Mutex; use crate::client::Client; @@ -43,7 +42,7 @@ impl ClientManager { tokio::spawn(async move { - use ClientMgrMessage::{Add, Remove, SendClients, SendMessage}; + use ClientMgrMessage::{Add, Remove, SendClients, SendMessage, SendError}; loop { let mut receiver = client_manager.rx.lock().await; @@ -79,7 +78,13 @@ impl ClientManager { clients: clients_vec, }).await } - } + }, + SendError { to } => { + let lock = client_manager.clients.lock().await; + if let Some(client) = lock.get(&to) { + client.send_message(ClientMessage::Error).await + } + }, #[allow(unreachable_patterns)] _ => println!("[Client manager]: not implemented"), } diff --git a/server/src/messages.rs b/server/src/messages.rs index 2db937f..f703171 100644 --- a/server/src/messages.rs +++ b/server/src/messages.rs @@ -10,6 +10,8 @@ pub enum ClientMessage { SendClients { clients: Vec> }, Disconnect, + + Error, } #[derive(Debug)] @@ -24,6 +26,9 @@ pub enum ClientMgrMessage { to: Uuid, content: String, }, + SendError { + to: Uuid, + } } #[derive(Debug)] @@ -42,4 +47,7 @@ pub enum ServerMessage { ClientUpdate { to: Uuid, }, + ClientError { + to: Uuid + } } diff --git a/server/src/server.rs b/server/src/server.rs index 2bdcb62..9d72786 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -2,7 +2,6 @@ use std::sync::Arc; // use crossbeam_channel::{unbounded, Receiver}; use uuid::Uuid; -use tokio::task; use tokio::sync::mpsc::{channel, Receiver}; use futures::lock::Mutex; @@ -78,6 +77,9 @@ impl Server { ServerMessage::ClientUpdate { to } => server .client_manager.clone() .send_message(ClientMgrMessage::SendClients { to }).await, + ServerMessage::ClientError { to } => server + .client_manager.clone() + .send_message(ClientMgrMessage::SendError {to}).await, } } } -- 2.40.1 From 029e00144b8fbfd8e983390e0d6b3b03c897f869 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Tue, 3 Aug 2021 17:49:27 +0100 Subject: [PATCH 10/30] removed old rust encryption module --- server/src/network/mod.rs | 0 server/src/prelude.rs | 12 ++++++++++++ 2 files changed, 12 insertions(+) create mode 100644 server/src/network/mod.rs create mode 100644 server/src/prelude.rs diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/server/src/prelude.rs b/server/src/prelude.rs new file mode 100644 index 0000000..ec3834b --- /dev/null +++ b/server/src/prelude.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use serde::{Serialize,Deserialize}; + + +#[async_trait] +trait Sender<'de, TMessage: Deserialize<'de> + Serialize> { + async fn send(self: &Arc, message: TMessage) -> Result<(), std::io::Error>; + async fn recv(self: &Arc) -> Result; +} -- 2.40.1 From 127d88cf867596a1779c5de411aa8b6b8dd515eb Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Tue, 3 Aug 2021 17:51:03 +0100 Subject: [PATCH 11/30] added encryption test + added openssl elliptic diffie-hellman test to server module --- server/src/encryption.rs | 61 ++++++++++++++++++++++++++++++++++++++++ server/src/main.rs | 1 + 2 files changed, 62 insertions(+) create mode 100644 server/src/encryption.rs diff --git a/server/src/encryption.rs b/server/src/encryption.rs new file mode 100644 index 0000000..52f0efb --- /dev/null +++ b/server/src/encryption.rs @@ -0,0 +1,61 @@ +#[cfg(test)] +use std::borrow::Borrow; + +use openssl::ec::*; +// use std::ops::Deref; +use openssl::nid::Nid; +use openssl::ec::EcKey; +use openssl::pkey::PKey; +use openssl::sha::sha256; +use openssl::derive::Deriver; +use openssl::bn::BigNumContext; +use openssl::symm::{Cipher, Crypter, Mode}; + +#[test] +pub fn test_aes() { + + let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + + let eckey1 = EcKey::generate(ec_group1.as_ref()).unwrap(); + let eckey2 = EcKey::generate(ec_group2.as_ref()).unwrap(); + + let pkey1 = PKey::from_ec_key(eckey1).unwrap(); + let pkey2 = PKey::from_ec_key(eckey2).unwrap(); + + let pem1 = pkey1.public_key_to_pem().unwrap(); + let pem2 = pkey2.public_key_to_pem().unwrap(); + + let pub1 = PKey::public_key_from_pem(&pem1).unwrap(); + let pub2 = PKey::public_key_from_pem(&pem2).unwrap(); + + let mut deriver1 = Deriver::new(pkey1.as_ref()).expect("deriver1 failed"); + let mut deriver2 = Deriver::new(pkey2.as_ref()).expect("deriver2 failed"); + + deriver1.set_peer(pub2.as_ref()).unwrap(); + deriver2.set_peer(pub1.as_ref()).unwrap(); + + let shared1 = deriver1.derive_to_vec().unwrap(); + let shared2 = deriver2.derive_to_vec().unwrap(); + + println!("shared1: {:?}", &shared1); + println!("shared2: {:?}", &shared2); + + assert_eq!(shared1, shared2); + + let plaintext = b"This is a message"; + let key = sha256(&shared1); + let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); + + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); + let mut decrypted = vec![0u8; 1024]; + decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); + + println!("plaintext: {:?}", plaintext); + println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); + println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); +} \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 904040f..eaf477b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -3,6 +3,7 @@ pub mod client_manager; pub mod messages; pub mod network_manager; pub mod server; +pub mod encryption; use std::io; -- 2.40.1 From 1b53b96645029022c0cc064909dc4519bcdc590a Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Tue, 3 Aug 2021 19:42:47 +0100 Subject: [PATCH 12/30] fixed unused import warning --- server/src/encryption.rs | 106 +++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 50 deletions(-) diff --git a/server/src/encryption.rs b/server/src/encryption.rs index 52f0efb..d91a1ca 100644 --- a/server/src/encryption.rs +++ b/server/src/encryption.rs @@ -1,61 +1,67 @@ + + + #[cfg(test)] -use std::borrow::Borrow; +mod test { + use openssl::ec::*; + use openssl::nid::Nid; + use openssl::ec::EcKey; + use openssl::pkey::PKey; + use openssl::sha::sha256; + use openssl::derive::Deriver; + use openssl::symm::{ + Cipher, + Crypter, + Mode + }; -use openssl::ec::*; -// use std::ops::Deref; -use openssl::nid::Nid; -use openssl::ec::EcKey; -use openssl::pkey::PKey; -use openssl::sha::sha256; -use openssl::derive::Deriver; -use openssl::bn::BigNumContext; -use openssl::symm::{Cipher, Crypter, Mode}; + #[test] + pub fn test_aes() { -#[test] -pub fn test_aes() { + let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); - let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); - let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + let eckey1 = EcKey::generate(ec_group1.as_ref()).unwrap(); + let eckey2 = EcKey::generate(ec_group2.as_ref()).unwrap(); + + let pkey1 = PKey::from_ec_key(eckey1).unwrap(); + let pkey2 = PKey::from_ec_key(eckey2).unwrap(); - let eckey1 = EcKey::generate(ec_group1.as_ref()).unwrap(); - let eckey2 = EcKey::generate(ec_group2.as_ref()).unwrap(); + let pem1 = pkey1.public_key_to_pem().unwrap(); + let pem2 = pkey2.public_key_to_pem().unwrap(); + + let pub1 = PKey::public_key_from_pem(&pem1).unwrap(); + let pub2 = PKey::public_key_from_pem(&pem2).unwrap(); + + let mut deriver1 = Deriver::new(pkey1.as_ref()).expect("deriver1 failed"); + let mut deriver2 = Deriver::new(pkey2.as_ref()).expect("deriver2 failed"); + + deriver1.set_peer(pub2.as_ref()).unwrap(); + deriver2.set_peer(pub1.as_ref()).unwrap(); + + let shared1 = deriver1.derive_to_vec().unwrap(); + let shared2 = deriver2.derive_to_vec().unwrap(); + + println!("shared1: {:?}", &shared1); + println!("shared2: {:?}", &shared2); + + assert_eq!(shared1, shared2); - let pkey1 = PKey::from_ec_key(eckey1).unwrap(); - let pkey2 = PKey::from_ec_key(eckey2).unwrap(); + let plaintext = b"This is a message"; + let key = sha256(&shared1); + let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - let pem1 = pkey1.public_key_to_pem().unwrap(); - let pem2 = pkey2.public_key_to_pem().unwrap(); + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); - let pub1 = PKey::public_key_from_pem(&pem1).unwrap(); - let pub2 = PKey::public_key_from_pem(&pem2).unwrap(); + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); + let mut decrypted = vec![0u8; 1024]; + decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); - let mut deriver1 = Deriver::new(pkey1.as_ref()).expect("deriver1 failed"); - let mut deriver2 = Deriver::new(pkey2.as_ref()).expect("deriver2 failed"); + println!("plaintext: {:?}", plaintext); + println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); + println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); + } - deriver1.set_peer(pub2.as_ref()).unwrap(); - deriver2.set_peer(pub1.as_ref()).unwrap(); - - let shared1 = deriver1.derive_to_vec().unwrap(); - let shared2 = deriver2.derive_to_vec().unwrap(); - - println!("shared1: {:?}", &shared1); - println!("shared2: {:?}", &shared2); - - assert_eq!(shared1, shared2); - - let plaintext = b"This is a message"; - let key = sha256(&shared1); - let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - - let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); - let mut ciphertext = vec![0u8; 1024]; - let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); - - let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); - let mut decrypted = vec![0u8; 1024]; - decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); - - println!("plaintext: {:?}", plaintext); - println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); - println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); } \ No newline at end of file -- 2.40.1 From 4e91c4d6600364d0f0abcc39b8e4f5f686773600 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 4 Aug 2021 23:42:41 +0100 Subject: [PATCH 13/30] added clone trait to network Messages --- foundation/src/messages/client.rs | 4 ++-- foundation/src/messages/network.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs index 51ebf92..dfb603b 100644 --- a/foundation/src/messages/client.rs +++ b/foundation/src/messages/client.rs @@ -7,7 +7,7 @@ use uuid::Uuid; /// This enum defined the message that a client can receive from the server /// This uses the serde library to transform to and from json. /// -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(tag = "type")] pub enum ClientStreamIn { Connected, @@ -19,7 +19,7 @@ pub enum ClientStreamIn { Disconnect, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(tag = "type")] pub enum ClientStreamOut { Connected, diff --git a/foundation/src/messages/network.rs b/foundation/src/messages/network.rs index 3a9aad6..6a14abc 100644 --- a/foundation/src/messages/network.rs +++ b/foundation/src/messages/network.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(tag = "type")] pub enum NetworkSockIn { Info, @@ -11,7 +11,7 @@ pub enum NetworkSockIn { }, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(tag = "type")] pub enum NetworkSockOut<'a> { Request, -- 2.40.1 From bee9617206a1873350ec5754f719de4b87552dac Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 4 Aug 2021 23:42:59 +0100 Subject: [PATCH 14/30] Update Cargo.toml + added async trait crate --- server/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/server/Cargo.toml b/server/Cargo.toml index a32a144..8ac20d3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,5 +17,6 @@ zeroize = "1.1.0" openssl = "0.10.33" tokio = { version = "1.9.0", features = ["full"] } futures = "0.3.16" +async-trait = "0.1.51" foundation = {path = '../foundation'} \ No newline at end of file -- 2.40.1 From fb43ad45fcbd513088d510b677aaa93bdf0b85f0 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 4 Aug 2021 23:46:26 +0100 Subject: [PATCH 15/30] Created sender objects + added StreamMessageSender trait + added SocketSender struct + added StreamMessageSender implementation +added new function + added new network module to main --- server/src/main.rs | 2 ++ server/src/network/mod.rs | 62 +++++++++++++++++++++++++++++++++++++++ server/src/prelude.rs | 9 +++--- 3 files changed, 69 insertions(+), 4 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index eaf477b..221008b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,8 +2,10 @@ pub mod client; pub mod client_manager; pub mod messages; pub mod network_manager; +pub mod network; pub mod server; pub mod encryption; +pub mod prelude; use std::io; diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index e69de29..4deb858 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; +use std::io::Write; +use std::io::Error; + +use async_trait::async_trait; +use serde::Serialize; +use serde::de::DeserializeOwned; +use tokio::io::split; +use tokio::sync::Mutex; +use tokio::io::ReadHalf; +use tokio::io::BufReader; +use tokio::io::WriteHalf; +use tokio::net::TcpStream; +use tokio::io::AsyncWriteExt; +use tokio::io::AsyncBufReadExt; + +use crate::prelude::StreamMessageSender; + +#[derive(Debug)] +pub struct SocketSender { + stream_tx: Mutex>, + stream_rx: Mutex>>, +} + +impl SocketSender { + pub fn new(connection: TcpStream) -> Arc { + let (rd, wd) = split(connection); + let reader = BufReader::new(rd); + + Arc::new(SocketSender { + stream_tx: Mutex::new(wd), + stream_rx: Mutex::new(reader), + }) + } +} + +#[async_trait] +impl StreamMessageSender for SocketSender { + async fn send + (self: &Arc, message: TOutMessage) -> Result<(), Error> + { + let mut out_buffer: Vec = Vec::new(); + let message_string = serde_json::to_string(&message)?; + writeln!(out_buffer, "{}", message_string)?; + let mut lock = self.stream_tx.lock().await; + lock.write_all(&out_buffer).await?; + lock.flush().await?; + Ok(()) + } + + async fn recv<'de, TInMessage: DeserializeOwned + Send> + (self: &Arc) -> Result + { + let mut in_buffer = String::new(); + let mut lock = self.stream_rx.lock().await; + lock.read_line(&mut in_buffer).await?; + let message: TInMessage = serde_json::from_str(&in_buffer) + .expect("[StreamMessageSender:recv] deserialisation failed"); + + Ok(message) + } +} \ No newline at end of file diff --git a/server/src/prelude.rs b/server/src/prelude.rs index ec3834b..e9b8154 100644 --- a/server/src/prelude.rs +++ b/server/src/prelude.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use async_trait::async_trait; -use serde::{Serialize,Deserialize}; +use serde::Serialize; +use serde::de::DeserializeOwned; #[async_trait] -trait Sender<'de, TMessage: Deserialize<'de> + Serialize> { - async fn send(self: &Arc, message: TMessage) -> Result<(), std::io::Error>; - async fn recv(self: &Arc) -> Result; +pub trait StreamMessageSender { + async fn send(self: &Arc, message: TOutMessage) -> Result<(), std::io::Error>; + async fn recv<'de, TInMessage: DeserializeOwned + Send>(self: &Arc) -> Result; } -- 2.40.1 From e6d087b4d83d274f8f133f0e3e24e0f3416359bb Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Wed, 4 Aug 2021 23:46:50 +0100 Subject: [PATCH 16/30] Converted structs to use sender struct --- server/src/client.rs | 143 ++++++++++++---------------------- server/src/client_manager.rs | 2 +- server/src/network_manager.rs | 57 ++++---------- 3 files changed, 65 insertions(+), 137 deletions(-) diff --git a/server/src/client.rs b/server/src/client.rs index 360c701..c21225c 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,19 +1,14 @@ use std::sync::Arc; use std::cmp::Ordering; -use std::fmt::Write; use uuid::Uuid; - -use zeroize::Zeroize; - use futures::lock::Mutex; - -use tokio::io::{ReadHalf, WriteHalf}; use tokio::sync::mpsc::{Sender, Receiver, channel}; -use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt}; +use crate::network::SocketSender; use crate::messages::ClientMessage; use crate::messages::ServerMessage; +use crate::prelude::StreamMessageSender; use foundation::ClientDetails; use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; @@ -39,8 +34,7 @@ pub struct Client { tx: Sender, rx: Mutex>, - stream_rx: Mutex>>, - stream_tx: Mutex>, + socket_sender: Arc, } // client funciton implmentations @@ -49,8 +43,7 @@ impl Client { uuid: String, username: String, address: String, - stream_rx: BufReader>, - stream_tx: WriteHalf, + socket_sender: Arc, server_channel: Sender, ) -> Arc { let (sender, receiver) = channel(1024); @@ -64,12 +57,11 @@ impl Client { }, server_channel: Mutex::new(server_channel), + socket_sender, tx: sender, rx: Mutex::new(receiver), - stream_rx: Mutex::new(stream_rx), - stream_tx: Mutex::new(stream_tx), }) } @@ -80,60 +72,39 @@ impl Client { // client stream read task tokio::spawn(async move { - use ClientMessage::Disconnect; let client = t1_client; - let mut lock = client.stream_tx.lock().await; - let mut buffer = String::new(); - - // tell client that is is now connected - let _ = writeln!(buffer, "{}", - serde_json::to_string(&ClientStreamOut::Connected).unwrap() - ); - - let _ = lock.write_all(&buffer.as_bytes()); - let _ = lock.flush().await; - - drop(lock); + client.socket_sender.send::(ClientStreamOut::Connected).await.expect("error"); loop { - let mut stream_reader = client.stream_rx.lock().await; - let mut buffer = String::new(); - - if let Ok(_size) = stream_reader.read_line(&mut buffer).await { - - let command = serde_json::from_str::(buffer.as_str()); - println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer); - - match command { - Ok(ClientStreamIn::Disconnect) => { - println!("[Client {:?}]: Disconnect recieved", &client.details.uuid); - client.send_message(Disconnect).await; - return; - } - Ok(ClientStreamIn::SendMessage { to, content }) => { - println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to); - let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientSendMessage { - from: client.details.uuid, - to, - content, - }).await; - } - Ok(ClientStreamIn::Update) => { - println!("[Client {:?}]: update received", &client.details.uuid); - let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await; - } - _ => { - println!("[Client {:?}]: command not found", &client.details.uuid); - let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await; - } + let command = client.socket_sender.recv::().await; + match command { + Ok(ClientStreamIn::Disconnect) => { + println!("[Client {:?}]: Disconnect recieved", &client.details.uuid); + client.send_message(Disconnect).await; + return; + } + Ok(ClientStreamIn::SendMessage { to, content }) => { + println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientSendMessage { + from: client.details.uuid, + to, + content, + }).await; + } + Ok(ClientStreamIn::Update) => { + println!("[Client {:?}]: update received", &client.details.uuid); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await; + } + _ => { + println!("[Client {:?}]: command not found", &client.details.uuid); + let lock = client.server_channel.lock().await; + let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await; } - buffer.zeroize(); } } }); @@ -146,7 +117,6 @@ impl Client { loop { let mut channel = client.rx.lock().await; - let mut buffer = String::new(); let message = channel.recv().await.unwrap(); drop(channel); @@ -158,43 +128,26 @@ impl Client { let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await; return } - Message { from, content } => { - let msg = ClientStreamOut::UserMessage { from, content }; - let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap()); - - let mut stream = client.stream_tx.lock().await; - - let _ = stream.write_all(&buffer.as_bytes()).await; - let _ = stream.flush().await; - - drop(stream); - } + Message { from, content } => + client.socket_sender.send::( + ClientStreamOut::UserMessage { from, content } + ).await.expect("error sending message"), + SendClients { clients } => { - let client_details_vec: Vec = clients - .iter() - .map(|client| &client.details) - .cloned() - .collect(); + let client_details_vec: Vec = + clients.iter().map(|client| &client.details) + .cloned().collect(); - let msg = ClientStreamOut::ConnectedClients { - clients: client_details_vec, - }; - - let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap()); - - let mut stream = client.stream_tx.lock().await; - - let _ = stream.write_all(&buffer.as_bytes()).await; - let _ = stream.flush().await; + client.socket_sender.send::( + ClientStreamOut::ConnectedClients { + clients: client_details_vec, + } + ).await.expect("error sending message"); }, - Error => { - let _ = writeln!(buffer, "{}", serde_json::to_string(&ClientStreamOut::Error).unwrap()); - - let mut stream = client.stream_tx.lock().await; - - let _ = stream.write_all(&buffer.as_bytes()).await; - let _ = stream.flush().await; - } + Error => + client.socket_sender.send::( + ClientStreamOut::Error + ).await.expect("error sending message"), } } }); diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index b7a6333..d750306 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -94,7 +94,7 @@ impl ClientManager { async fn send_to_client(self: &Arc, id: &Uuid, msg: ClientMessage) { let lock = self.clients.lock().await; - if let Some(client) = lock.get(&id) { + if let Some(client) = lock.get(id) { client.clone().send_message(msg).await; } } diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs index bcdc612..39f58c5 100644 --- a/server/src/network_manager.rs +++ b/server/src/network_manager.rs @@ -1,13 +1,12 @@ use std::sync::Arc; -use std::io::Write; -use tokio::task; use tokio::net::TcpListener; use tokio::sync::mpsc::Sender; -use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader}; use crate::client::Client; +use crate::network::SocketSender; use crate::messages::ServerMessage; +use crate::prelude::StreamMessageSender; use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; pub struct NetworkManager { @@ -32,49 +31,26 @@ impl NetworkManager { loop { let (connection, _) = listener.accept().await.unwrap(); - let (rd, mut wd) = io::split(connection); - - let mut reader = BufReader::new(rd); + let stream_sender = SocketSender::new(connection); let server_channel = network_manager.server_channel.clone(); - task::spawn(async move { - let mut out_buffer: Vec = Vec::new(); - let mut in_buffer: String = String::new(); + tokio::spawn(async move { - // write request - let a = serde_json::to_string(&NetworkSockOut::Request).unwrap(); - println!("{:?}", &a); - let _ = writeln!( - out_buffer, - "{}", - a - ); + stream_sender.send::(NetworkSockOut::Request) + .await.expect("failed to send message"); - let _ = wd.write_all(&out_buffer).await; - let _ = wd.flush().await; - - // get response - let _ = reader.read_line(&mut in_buffer).await.unwrap(); - - //match the response - if let Ok(request) = - serde_json::from_str::(&in_buffer) + if let Ok(request) = + stream_sender.recv::().await { + match request { NetworkSockIn::Info => { - // send back server info to the connection - let _ = wd.write_all( - serde_json::to_string( - &NetworkSockOut::GotInfo { - server_name: "oof", - server_owner: "michael", - }, - ) - .unwrap() - .as_bytes(), - ).await; - let _ = wd.write_all(b"\n").await; - let _ = wd.flush().await; + stream_sender.send( + NetworkSockOut::GotInfo { + server_name: "oof", + server_owner: "michael", + } + ).await.expect("failed to send got info"); } NetworkSockIn::Connect { uuid, @@ -86,8 +62,7 @@ impl NetworkManager { uuid, username, address, - reader, - wd, + stream_sender, server_channel.clone(), ); let _ = server_channel -- 2.40.1 From 85f02e553afc2c0f65032836aeef2a0d7c2b4f78 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 17:39:38 +0100 Subject: [PATCH 17/30] Update mod.rs + added custom debug message - removed debug derive --- server/src/network/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index 4deb858..d1b9e7b 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::io::Write; use std::io::Error; +use std::fmt::Debug; use async_trait::async_trait; use serde::Serialize; @@ -16,7 +17,7 @@ use tokio::io::AsyncBufReadExt; use crate::prelude::StreamMessageSender; -#[derive(Debug)] + pub struct SocketSender { stream_tx: Mutex>, stream_rx: Mutex>>, @@ -59,4 +60,12 @@ impl StreamMessageSender for SocketSender { Ok(message) } +} + +impl Debug for SocketSender { + + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) + -> std::result::Result<(), std::fmt::Error> { + write!(f, "[SocketSender]") + } } \ No newline at end of file -- 2.40.1 From 312c7bde9f54b48f55f2969e7686a55da69f6392 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 17:40:58 +0100 Subject: [PATCH 18/30] Update mod.rs + added custom type for a function vector + added vector for sending and receiving + added functions to push and pop from the transformation stack. --- server/src/network/mod.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index d1b9e7b..512d5c0 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -17,10 +17,14 @@ use tokio::io::AsyncBufReadExt; use crate::prelude::StreamMessageSender; +type TransformerVec = Vec &[u8]>; pub struct SocketSender { stream_tx: Mutex>, stream_rx: Mutex>>, + + send_transformer: Mutex, + recv_transformer: Mutex, } impl SocketSender { @@ -31,8 +35,35 @@ impl SocketSender { Arc::new(SocketSender { stream_tx: Mutex::new(wd), stream_rx: Mutex::new(reader), + + send_transformer: Mutex::new(Vec::new()), + recv_transformer: Mutex::new(Vec::new()), }) } + + pub async fn push_layer( + self: &Arc, + send_func: fn(&[u8]) -> &[u8], + recv_func: fn(&[u8]) -> &[u8], + ) { + let mut send_lock = self.send_transformer.lock().await; + let mut recv_lock = self.recv_transformer.lock().await; + send_lock.push(send_func); + recv_lock.reverse(); + recv_lock.push(recv_func); + recv_lock.reverse(); + } + + pub async fn pop_layer(self: &Arc,) { + let mut send_lock = self.send_transformer.lock().await; + let mut recv_lock = self.recv_transformer.lock().await; + + let _ = send_lock.pop(); + + recv_lock.reverse(); + let _ = recv_lock.pop(); + recv_lock.reverse(); + } } #[async_trait] -- 2.40.1 From 8f7fd76817b660e79b54072760a3e3bde10b7e8a Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 20:40:17 +0100 Subject: [PATCH 19/30] Update encryption.rs + added function that created encrypt and decrypt functions + added transformer function type definition --- server/src/encryption.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/src/encryption.rs b/server/src/encryption.rs index d91a1ca..c442350 100644 --- a/server/src/encryption.rs +++ b/server/src/encryption.rs @@ -1,5 +1,37 @@ +use openssl::symm::{ + Cipher, + Crypter, + Mode +}; +type TransformerFn = dyn Fn(&[u8]) -> Vec; +#[allow(clippy::clone_on_copy)] +pub fn create_encryption_transformers(key: Vec, iv: &[u8; 32]) + -> (Box,Box) +{ + // clone vecs + let key1 = key.clone(); + let key2 = key.clone(); + + let iv1 = iv.clone(); + let iv2 = iv.clone(); + + ( + Box::new(move |plain_text| { + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key1, Some(&iv1)); + let mut ciphertext = vec![0u8; 1024]; + let _cipherlen = encrypter.unwrap().update(plain_text, &mut ciphertext).unwrap(); + ciphertext + }), + Box::new(move |cipher_text| { + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key2, Some(&iv2)); + let mut plain_text = vec![0u8; 1024]; + decrypter.unwrap().update(cipher_text, &mut plain_text).unwrap(); + plain_text + }) + ) +} #[cfg(test)] mod test { -- 2.40.1 From 9f63f8c2f11092c56207731be57ddcb8c2679d3d Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 21:07:19 +0100 Subject: [PATCH 20/30] Update encryption.rs ~ moved key derivation to separate function + added function to test transformer functions --- server/src/encryption.rs | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/server/src/encryption.rs b/server/src/encryption.rs index c442350..789c114 100644 --- a/server/src/encryption.rs +++ b/server/src/encryption.rs @@ -47,9 +47,9 @@ mod test { Mode }; - #[test] - pub fn test_aes() { + use super::create_encryption_transformers; + fn create_shared() -> Vec { let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); @@ -71,29 +71,46 @@ mod test { deriver1.set_peer(pub2.as_ref()).unwrap(); deriver2.set_peer(pub1.as_ref()).unwrap(); - let shared1 = deriver1.derive_to_vec().unwrap(); - let shared2 = deriver2.derive_to_vec().unwrap(); + deriver1.derive_to_vec().unwrap() + } - println!("shared1: {:?}", &shared1); - println!("shared2: {:?}", &shared2); + #[test] + pub fn test_transformer_functions() { + let shared = create_shared(); - assert_eq!(shared1, shared2); + let (en, de) = create_encryption_transformers(shared, b"12345678901234561234561234567765"); + + let message = b"Hello world"; + + let cipher_text = (*en)(message); + let decrypted_text = (*de)(&cipher_text); + + assert_eq!(&decrypted_text[0..message.len()], message); + } + + #[test] + pub fn test_aes() { + + let shared = create_shared(); let plaintext = b"This is a message"; - let key = sha256(&shared1); + let key = sha256(&shared); let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); let mut ciphertext = vec![0u8; 1024]; - let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap(); + let cipherlen = encrypter.unwrap().update(plaintext, &mut ciphertext).unwrap(); let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); let mut decrypted = vec![0u8; 1024]; - decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap(); + decrypter.unwrap().update(&ciphertext[..cipherlen], &mut decrypted).unwrap(); println!("plaintext: {:?}", plaintext); println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); - } + let test: &[u8] = &decrypted; + + assert_eq!(&test[0..plaintext.len()], plaintext); + } } \ No newline at end of file -- 2.40.1 From 3dfc99a2d9056d0d3719e139c0358c012559f622 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 21:07:58 +0100 Subject: [PATCH 21/30] removed old encryption module - removed old encryption file in foundation - removed reference from lib.rs --- foundation/src/encryption/mod.rs | 37 -------------------------------- foundation/src/lib.rs | 1 - 2 files changed, 38 deletions(-) delete mode 100644 foundation/src/encryption/mod.rs diff --git a/foundation/src/encryption/mod.rs b/foundation/src/encryption/mod.rs deleted file mode 100644 index cc05424..0000000 --- a/foundation/src/encryption/mod.rs +++ /dev/null @@ -1,37 +0,0 @@ -// use openssl::sha::sha256; -// use openssl::symm::{Cipher, Crypter, Mode}; - -#[cfg(test)] -mod test { - use openssl::sha::sha256; - use openssl::symm::{Cipher, Crypter, Mode}; - - #[test] - fn testEncryption() { - let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes(); - let key = sha256(b"This is a key"); - let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - - let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV)); - let mut ciphertext = vec![0u8; 1024]; - let cipherlen = encrypter - .unwrap() - .update(plaintext, ciphertext.as_mut_slice()) - .unwrap(); - - let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV)); - let mut decrypted = vec![0u8; 1024]; - decrypter - .unwrap() - .update(&ciphertext[..cipherlen], decrypted.as_mut_slice()) - .unwrap(); - - println!("{:?}", plaintext); - println!("{:?}", ciphertext.as_slice()); - println!("{:?}", decrypted.as_slice()); - - println!("{:?}", plaintext.len()); - println!("{:?}", ciphertext.len()); - println!("{:?}", decrypted.len()); - } -} diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index e1b3daa..6bf0240 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -1,4 +1,3 @@ -pub mod encryption; pub mod messages; pub mod prelude; -- 2.40.1 From 7a0f92510b24dd6e5df43bb3fa4d3081b4f4c3bf Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Fri, 6 Aug 2021 21:08:18 +0100 Subject: [PATCH 22/30] renamed socket sender ~ renamed socket sender to socket handler --- server/src/client.rs | 6 +++--- server/src/network/mod.rs | 10 +++++----- server/src/network_manager.rs | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/client.rs b/server/src/client.rs index c21225c..0318bcb 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use futures::lock::Mutex; use tokio::sync::mpsc::{Sender, Receiver, channel}; -use crate::network::SocketSender; +use crate::network::SocketHandler; use crate::messages::ClientMessage; use crate::messages::ServerMessage; use crate::prelude::StreamMessageSender; @@ -34,7 +34,7 @@ pub struct Client { tx: Sender, rx: Mutex>, - socket_sender: Arc, + socket_sender: Arc, } // client funciton implmentations @@ -43,7 +43,7 @@ impl Client { uuid: String, username: String, address: String, - socket_sender: Arc, + socket_sender: Arc, server_channel: Sender, ) -> Arc { let (sender, receiver) = channel(1024); diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index 512d5c0..e94c056 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -19,7 +19,7 @@ use crate::prelude::StreamMessageSender; type TransformerVec = Vec &[u8]>; -pub struct SocketSender { +pub struct SocketHandler { stream_tx: Mutex>, stream_rx: Mutex>>, @@ -27,12 +27,12 @@ pub struct SocketSender { recv_transformer: Mutex, } -impl SocketSender { +impl SocketHandler { pub fn new(connection: TcpStream) -> Arc { let (rd, wd) = split(connection); let reader = BufReader::new(rd); - Arc::new(SocketSender { + Arc::new(SocketHandler { stream_tx: Mutex::new(wd), stream_rx: Mutex::new(reader), @@ -67,7 +67,7 @@ impl SocketSender { } #[async_trait] -impl StreamMessageSender for SocketSender { +impl StreamMessageSender for SocketHandler { async fn send (self: &Arc, message: TOutMessage) -> Result<(), Error> { @@ -93,7 +93,7 @@ impl StreamMessageSender for SocketSender { } } -impl Debug for SocketSender { +impl Debug for SocketHandler { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs index 39f58c5..7d5e478 100644 --- a/server/src/network_manager.rs +++ b/server/src/network_manager.rs @@ -4,7 +4,7 @@ use tokio::net::TcpListener; use tokio::sync::mpsc::Sender; use crate::client::Client; -use crate::network::SocketSender; +use crate::network::SocketHandler; use crate::messages::ServerMessage; use crate::prelude::StreamMessageSender; use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; @@ -31,7 +31,7 @@ impl NetworkManager { loop { let (connection, _) = listener.accept().await.unwrap(); - let stream_sender = SocketSender::new(connection); + let stream_sender = SocketHandler::new(connection); let server_channel = network_manager.server_channel.clone(); tokio::spawn(async move { -- 2.40.1 From 1e173586f1ceb55b967252110a3e55d9367eb50d Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Sat, 7 Aug 2021 17:20:33 +0100 Subject: [PATCH 23/30] Update mod.rs + added dummy tokio server. + added async test for socket sender with no transformers. --- server/src/network/mod.rs | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index e94c056..c622d7d 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -99,4 +99,59 @@ impl Debug for SocketHandler { -> std::result::Result<(), std::fmt::Error> { write!(f, "[SocketSender]") } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use tokio::net::TcpStream; + use tokio::net::TcpListener; + use tokio::time::sleep; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + use super::SocketHandler; + use crate::prelude::StreamMessageSender; + + async fn start_server() { + let listener = TcpListener::bind("127.0.0.1:5600").await.expect("failed to create listener"); + let mut buf = [0; 1024]; + + loop { + let (mut socket, _) = listener.accept().await.expect("failed to accept connection"); + + tokio::spawn(async move { + let n = match socket.read(&mut buf).await { + // socket closed + Ok(n) if n == 0 => return, + Ok(n) => n, + Err(e) => { + println!("failed to read from socket; err = {:?}", e); + return; + } + }; + + // Write the data back + if let Err(e) = socket.write_all(&buf[0..n]).await { + println!("failed to write to socket; err = {:?}", e); + return; + } + }); + } + } + + #[tokio::test] + async fn test_socket_sender() { + tokio::spawn(start_server()); + + let socket = TcpStream::connect("localhost:5600").await.expect("failed to connect"); + + sleep(Duration::from_secs(1)).await; + + let handle = SocketHandler::new(socket); + let _ = handle.send::(true).await; + let message = handle.recv::().await.unwrap(); + + assert!(message); + } } \ No newline at end of file -- 2.40.1 From ab1a2f7e77058c192a8c3c01f0b7bd408dfe2e37 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 17:35:17 +0100 Subject: [PATCH 24/30] Update Cargo.toml + added async crates to foundation --- foundation/Cargo.toml | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/foundation/Cargo.toml b/foundation/Cargo.toml index b3f43a5..a0c361d 100644 --- a/foundation/Cargo.toml +++ b/foundation/Cargo.toml @@ -9,17 +9,14 @@ edition = "2018" [dependencies] regex = "1" -crossbeam = "0.8.0" -crossbeam-channel = "0.5.0" -crossbeam-queue = "0.3.1" -parking_lot = "0.11.1" -dashmap = "4.0.2" -rayon = "1.3.1" -zeroize = "1.1.0" -crossterm = "0.19.0" -log = "0.4" url = "2.2.0" -uuid = {version = "0.8", features = ["serde", "v4"]} -serde = { version = "1.0", features = ["derive"] } +openssl = "0.10" +base64 = "0.13.0" +zeroize = "1.1.0" serde_json = "1.0" -openssl = "0.10" \ No newline at end of file +futures = "0.3.16" +async-trait = "0.1.51" + +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.9.0", features = ["full"] } +uuid = {version = "0.8", features = ["serde", "v4"]} \ No newline at end of file -- 2.40.1 From 235eecda5e671ad6b9c3ad8282e9cfa875de55c0 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 17:42:27 +0100 Subject: [PATCH 25/30] moved encryption ~ moved encryption module to foundation + created function to generate shared secrets --- foundation/src/encryption/helpers.rs | 30 +++++++ foundation/src/encryption/mod.rs | 97 ++++++++++++++++++++++ server/src/encryption.rs | 116 --------------------------- 3 files changed, 127 insertions(+), 116 deletions(-) create mode 100644 foundation/src/encryption/helpers.rs create mode 100644 foundation/src/encryption/mod.rs delete mode 100644 server/src/encryption.rs diff --git a/foundation/src/encryption/helpers.rs b/foundation/src/encryption/helpers.rs new file mode 100644 index 0000000..4209cae --- /dev/null +++ b/foundation/src/encryption/helpers.rs @@ -0,0 +1,30 @@ +use openssl::derive::Deriver; +use openssl::ec::EcGroup; +use openssl::ec::EcKey; +use openssl::nid::Nid; +use openssl::pkey::PKey; + +pub fn create_test_shared() -> Vec { + let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); + + let eckey1 = EcKey::generate(ec_group1.as_ref()).unwrap(); + let eckey2 = EcKey::generate(ec_group2.as_ref()).unwrap(); + + let pkey1 = PKey::from_ec_key(eckey1).unwrap(); + let pkey2 = PKey::from_ec_key(eckey2).unwrap(); + + let pem1 = pkey1.public_key_to_pem().unwrap(); + let pem2 = pkey2.public_key_to_pem().unwrap(); + + let pub1 = PKey::public_key_from_pem(&pem1).unwrap(); + let pub2 = PKey::public_key_from_pem(&pem2).unwrap(); + + let mut deriver1 = Deriver::new(pkey1.as_ref()).expect("deriver1 failed"); + let mut deriver2 = Deriver::new(pkey2.as_ref()).expect("deriver2 failed"); + + deriver1.set_peer(pub2.as_ref()).unwrap(); + deriver2.set_peer(pub1.as_ref()).unwrap(); + + deriver1.derive_to_vec().unwrap() +} \ No newline at end of file diff --git a/foundation/src/encryption/mod.rs b/foundation/src/encryption/mod.rs new file mode 100644 index 0000000..0196d52 --- /dev/null +++ b/foundation/src/encryption/mod.rs @@ -0,0 +1,97 @@ +pub mod helpers; + +use crate::prelude::TransformerFn; +use openssl::symm::{Cipher, Crypter, Mode}; + +#[allow(clippy::clone_on_copy)] +pub fn create_encryption_transformers( + key: Vec, + iv: &[u8; 32], +) -> (TransformerFn, TransformerFn) { + // clone vecs + let key1 = key.clone(); + let key2 = key.clone(); + + let iv1 = iv.clone(); + let iv2 = iv.clone(); + + ( + Box::new(move |plain_text| { + println!("[encryptor_fn] plain_text:{:?}", plain_text); + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key1, Some(&iv1)); + let mut ciphertext = vec![0u8; 128]; + let _cipherlen = encrypter + .unwrap() + .update(plain_text, &mut ciphertext) + .unwrap(); + ciphertext + }), + Box::new(move |cipher_text| { + println!("[decryptor_fn] cipher_text:{:?}", cipher_text); + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key2, Some(&iv2)); + let mut plain_text = vec![0u8; 128]; + decrypter + .unwrap() + .update(cipher_text, &mut plain_text) + .unwrap(); + plain_text + }), + ) +} + +#[cfg(test)] +mod test { + use openssl::sha::sha256; + use openssl::symm::{Cipher, Crypter, Mode}; + + use super::create_encryption_transformers; + use super::helpers::create_test_shared; + + #[test] + pub fn test_transformer_functions() { + let shared = create_test_shared(); + + let (en, de) = create_encryption_transformers(shared, b"12345678901234561234561234567765"); + + let message = b"Hello world"; + + let cipher_text = (*en)(message); + + assert_ne!(&cipher_text[0..message.len()], message); + + let decrypted_text = (*de)(&cipher_text); + + assert_eq!(&decrypted_text[0..message.len()], message); + } + + #[test] + pub fn test_aes() { + let shared = create_test_shared(); + + let plaintext = b"This is a message"; + let key = sha256(&shared); + let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); + let mut ciphertext = vec![0u8; 1024]; + let cipherlen = encrypter + .unwrap() + .update(plaintext, &mut ciphertext) + .unwrap(); + + let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); + let mut decrypted = vec![0u8; 1024]; + decrypter + .unwrap() + .update(&ciphertext[..cipherlen], &mut decrypted) + .unwrap(); + + println!("plaintext: {:?}", plaintext); + println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); + println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); + + let test: &[u8] = &decrypted; + + assert_eq!(&test[0..plaintext.len()], plaintext); + } +} diff --git a/server/src/encryption.rs b/server/src/encryption.rs deleted file mode 100644 index 789c114..0000000 --- a/server/src/encryption.rs +++ /dev/null @@ -1,116 +0,0 @@ -use openssl::symm::{ - Cipher, - Crypter, - Mode -}; - -type TransformerFn = dyn Fn(&[u8]) -> Vec; - -#[allow(clippy::clone_on_copy)] -pub fn create_encryption_transformers(key: Vec, iv: &[u8; 32]) - -> (Box,Box) -{ - // clone vecs - let key1 = key.clone(); - let key2 = key.clone(); - - let iv1 = iv.clone(); - let iv2 = iv.clone(); - - ( - Box::new(move |plain_text| { - let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key1, Some(&iv1)); - let mut ciphertext = vec![0u8; 1024]; - let _cipherlen = encrypter.unwrap().update(plain_text, &mut ciphertext).unwrap(); - ciphertext - }), - Box::new(move |cipher_text| { - let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key2, Some(&iv2)); - let mut plain_text = vec![0u8; 1024]; - decrypter.unwrap().update(cipher_text, &mut plain_text).unwrap(); - plain_text - }) - ) -} - -#[cfg(test)] -mod test { - use openssl::ec::*; - use openssl::nid::Nid; - use openssl::ec::EcKey; - use openssl::pkey::PKey; - use openssl::sha::sha256; - use openssl::derive::Deriver; - use openssl::symm::{ - Cipher, - Crypter, - Mode - }; - - use super::create_encryption_transformers; - - fn create_shared() -> Vec { - let ec_group1 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); - let ec_group2 = EcGroup::from_curve_name(Nid::SECP256K1).unwrap(); - - let eckey1 = EcKey::generate(ec_group1.as_ref()).unwrap(); - let eckey2 = EcKey::generate(ec_group2.as_ref()).unwrap(); - - let pkey1 = PKey::from_ec_key(eckey1).unwrap(); - let pkey2 = PKey::from_ec_key(eckey2).unwrap(); - - let pem1 = pkey1.public_key_to_pem().unwrap(); - let pem2 = pkey2.public_key_to_pem().unwrap(); - - let pub1 = PKey::public_key_from_pem(&pem1).unwrap(); - let pub2 = PKey::public_key_from_pem(&pem2).unwrap(); - - let mut deriver1 = Deriver::new(pkey1.as_ref()).expect("deriver1 failed"); - let mut deriver2 = Deriver::new(pkey2.as_ref()).expect("deriver2 failed"); - - deriver1.set_peer(pub2.as_ref()).unwrap(); - deriver2.set_peer(pub1.as_ref()).unwrap(); - - deriver1.derive_to_vec().unwrap() - } - - #[test] - pub fn test_transformer_functions() { - let shared = create_shared(); - - let (en, de) = create_encryption_transformers(shared, b"12345678901234561234561234567765"); - - let message = b"Hello world"; - - let cipher_text = (*en)(message); - let decrypted_text = (*de)(&cipher_text); - - assert_eq!(&decrypted_text[0..message.len()], message); - } - - #[test] - pub fn test_aes() { - - let shared = create_shared(); - - let plaintext = b"This is a message"; - let key = sha256(&shared); - let iv = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - - let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(iv)); - let mut ciphertext = vec![0u8; 1024]; - let cipherlen = encrypter.unwrap().update(plaintext, &mut ciphertext).unwrap(); - - let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(iv)); - let mut decrypted = vec![0u8; 1024]; - decrypter.unwrap().update(&ciphertext[..cipherlen], &mut decrypted).unwrap(); - - println!("plaintext: {:?}", plaintext); - println!("ciphertext: {:?}", &ciphertext[0..plaintext.len()]); - println!("decryptedtext: {:?}", &decrypted[0..plaintext.len()]); - - let test: &[u8] = &decrypted; - - assert_eq!(&test[0..plaintext.len()], plaintext); - } -} \ No newline at end of file -- 2.40.1 From 7d759f152b44d99912730349f6e671c2c8be066b Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 17:47:42 +0100 Subject: [PATCH 26/30] Create helpers.rs + added struct to mock and async stream --- foundation/src/helpers.rs | 115 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 foundation/src/helpers.rs diff --git a/foundation/src/helpers.rs b/foundation/src/helpers.rs new file mode 100644 index 0000000..93e838e --- /dev/null +++ b/foundation/src/helpers.rs @@ -0,0 +1,115 @@ +use std::pin::Pin; +use std::io::Error; +use std::task::Poll; +use std::sync::Mutex; +use std::task::Context; + +use tokio::io::ReadBuf; +use tokio::io::{AsyncRead, AsyncWrite}; + +pub struct BufferStream { + buffer: Mutex>, +} + +impl BufferStream { + pub fn new() -> BufferStream { + BufferStream { + buffer: Mutex::new(Vec::new()), + } + } +} + +impl AsyncRead for BufferStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context, + buf: &mut ReadBuf<'_> + ) -> Poll> { + let mut lock = self.buffer.lock().unwrap(); + + let a = if buf.remaining() < lock.len() {buf.remaining()} else {lock.len()}; + + buf.put_slice(&lock[..a]); + + *lock = Vec::from(&lock[a..]); + + Poll::Ready(Ok(())) + } +} + +impl AsyncWrite for BufferStream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8] + ) -> Poll> { + let mut lock = self.buffer.lock().unwrap(); + lock.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod test { + + use tokio::io::split; + use tokio::io::AsyncWriteExt; + + use crate::helpers::BufferStream; + use tokio::io::AsyncReadExt; + + #[tokio::test] + async fn test_reading_and_writing() { + let stream = BufferStream::new(); + + let (mut rd, mut wd) = split(stream); + + let _ = wd.write_all(b"1010").await; + + let mut buf: [u8; 4] = [0; 4]; + + let _ = rd.read(&mut buf[..]).await; + + println!("[test_reading_and_writing] {:?}", &buf[..]); + + assert_eq!(b"1010", &buf[..]); + } + + + #[tokio::test] + async fn test_reading_small() { + let stream = BufferStream::new(); + + let (mut rd, mut wd) = split(stream); + + let _ = wd.write_all(b"10100101").await; + + let mut buf: [u8; 4] = [0; 4]; + + let _ = rd.read(&mut buf[..]).await; + + println!("[test_reading_and_writing] {:?}", &buf[..]); + + assert_eq!(b"1010", &buf[..]); + + let _ = rd.read(&mut buf[..]).await; + + println!("[test_reading_and_writing] {:?}", &buf[..]); + + assert_eq!(b"0101", &buf[..]); + } +} \ No newline at end of file -- 2.40.1 From 5615a8b7a8ad017221e10d983edb003409ce054a Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 17:48:03 +0100 Subject: [PATCH 27/30] Update lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit forgot this file ¯\_(ツ)_/¯ --- foundation/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs index 6bf0240..1458ccf 100644 --- a/foundation/src/lib.rs +++ b/foundation/src/lib.rs @@ -1,5 +1,8 @@ +pub mod encryption; pub mod messages; pub mod prelude; +pub mod network; +pub mod helpers; use serde::{Deserialize, Serialize}; use uuid::Uuid; -- 2.40.1 From a39e43396f57b09fff91c2dd4fcc06f37614c095 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 17:51:50 +0100 Subject: [PATCH 28/30] Update prelude.rs also should have added this earlier --- foundation/src/prelude.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/foundation/src/prelude.rs b/foundation/src/prelude.rs index 92ff1d7..6709dbe 100644 --- a/foundation/src/prelude.rs +++ b/foundation/src/prelude.rs @@ -1,15 +1,19 @@ use std::sync::Arc; -pub trait IMessagable { - fn send_message(&self, msg: TMessage); - fn set_sender(&self, sender: TSender); -} +use async_trait::async_trait; -pub trait ICooperative { - fn tick(&self); -} +use serde::de::DeserializeOwned; +use serde::Serialize; -pub trait IPreemptive { - fn run(arc: &Arc); - fn start(arc: &Arc); -} +pub type TransformerFn = Box Vec + Send + Sync>; + +#[async_trait] +pub trait StreamMessageSender { + async fn send( + self: &Arc, + message: TOutMessage, + ) -> Result<(), std::io::Error>; + async fn recv<'de, TInMessage: DeserializeOwned + Send>( + self: &Arc, + ) -> Result; +} \ No newline at end of file -- 2.40.1 From dc164fc325fa8bf0703ba7713fb1955755a45d1e Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 18:01:56 +0100 Subject: [PATCH 29/30] Create mod.rs + added socket handler to manage socket connections and message passing + added tests + added transformer functionality to manipulate bytes --- foundation/src/network/mod.rs | 184 ++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 foundation/src/network/mod.rs diff --git a/foundation/src/network/mod.rs b/foundation/src/network/mod.rs new file mode 100644 index 0000000..5c9bf4b --- /dev/null +++ b/foundation/src/network/mod.rs @@ -0,0 +1,184 @@ +use tokio::io::AsyncWrite; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use std::fmt::Debug; +use std::io::Error; +use std::io::Write; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::io::split; +use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; +use tokio::io::{BufReader}; +use tokio::sync::Mutex; + +use crate::prelude::StreamMessageSender; +use crate::prelude::TransformerFn; + +type TransformerVec = Vec; + +pub struct SocketHandler +where + T: AsyncRead + AsyncWrite + Send +{ + stream_tx: Mutex>, + stream_rx: Mutex>>, + + send_transformer: Mutex, + recv_transformer: Mutex, +} + +impl SocketHandler +where + T: AsyncReadExt + AsyncWriteExt + Send +{ + pub fn new(connection: T) -> Arc { + let (rd, wd) = split(connection); + let reader = BufReader::new(rd); + + Arc::new(SocketHandler { + stream_tx: Mutex::new(wd), + stream_rx: Mutex::new(reader), + + send_transformer: Mutex::new(Vec::new()), + recv_transformer: Mutex::new(Vec::new()), + }) + } + + pub async fn push_layer(self: &Arc, send_func: TransformerFn, recv_func: TransformerFn) { + let mut send_lock = self.send_transformer.lock().await; + let mut recv_lock = self.recv_transformer.lock().await; + send_lock.push(send_func); + recv_lock.push(recv_func); + } + + pub async fn pop_layer(self: &Arc) { + let mut send_lock = self.send_transformer.lock().await; + let mut recv_lock = self.recv_transformer.lock().await; + let _ = send_lock.pop(); + let _ = recv_lock.pop(); + } +} + +#[async_trait] +impl StreamMessageSender for SocketHandler +where + T: AsyncReadExt + AsyncWriteExt + Send +{ + async fn send( + self: &Arc, + message: TOutMessage, + ) -> Result<(), Error> { + let message_string = serde_json::to_string(&message)?; + let mut out_buffer = Vec::from(message_string); + let message_length = out_buffer.len(); + println!("[SocketHandler:send] message_length:{:?}", &message_length); + + println!("[SocketHandler:send] message_before: {:?}", &out_buffer); + + let transformers = self.send_transformer.lock().await; + let iter = transformers.iter(); + + for func in iter { + let transform = (**func)(&out_buffer); + out_buffer.clear(); + out_buffer.extend_from_slice(&transform); + } + + let data = base64::encode(&out_buffer[..message_length]); + + println!("[SocketHandler:send] message_encode_base64: {:?}", &data); + + out_buffer.clear(); + + writeln!(out_buffer, "{}", data)?; + + println!("[SocketHandler:send] message_out: {:?}", &out_buffer); + + let mut lock = self.stream_tx.lock().await; + lock.write_all(&out_buffer).await?; + lock.flush().await?; + Ok(()) + } + + async fn recv<'de, TInMessage: DeserializeOwned + Send>( + self: &Arc, + ) -> Result { + let mut in_buffer = String::new(); + let mut lock = self.stream_rx.lock().await; + let mut length = lock.read_line(&mut in_buffer).await.unwrap(); + in_buffer.pop(); + println!("[SocketHandler:recv] message_in: {:?}", &in_buffer); + + let mut in_buffer = base64::decode(in_buffer).unwrap(); + println!("[SocketHandler:recv] message_decoded_base64: {:?}", &in_buffer); + + length = in_buffer.len(); + + let transformers = self.recv_transformer.lock().await; + let iter = transformers.iter().rev(); + for func in iter { + let transform = (**func)(&in_buffer); + in_buffer.clear(); + in_buffer.extend_from_slice(&transform[..length]); + } + println!("[SocketHandler:recv] message_after_transoformed: {:?}", &in_buffer); + + let in_buffer = String::from_utf8(in_buffer).unwrap(); + let message: TInMessage = serde_json::from_str(&in_buffer).unwrap(); + Ok(message) + } +} + +impl Debug for SocketHandler +where + T: AsyncReadExt + AsyncWriteExt + Send +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!(f, "[SocketSender]") + } +} + +#[cfg(test)] +mod test { + use crate::helpers::BufferStream; + use super::SocketHandler; + + use crate::prelude::StreamMessageSender; + use crate::encryption::helpers::create_test_shared; + use crate::encryption::create_encryption_transformers; + + #[tokio::test] + async fn test_socket_sender() { + + let stream = BufferStream::new(); + + let handle = SocketHandler::new(stream); + let _ = handle.send::(true).await.unwrap(); + let message = handle.recv::().await.unwrap(); + + assert!(message); + } + + #[tokio::test] + async fn test_socket_sender_with_encryption() { + + let stream = BufferStream::new(); + + let shared = create_test_shared(); + let (en, de) = create_encryption_transformers(shared, b"12345678901234567890123456789011"); + let handle = SocketHandler::new(stream); + + handle.push_layer(en, de).await; + + handle.send::(true).await.unwrap(); + let message = handle.recv::().await.unwrap(); + + assert!(message); + } +} -- 2.40.1 From 030bd8ca535a86c8a8fe99e0575cc6e2797ac879 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Thu, 12 Aug 2021 18:04:41 +0100 Subject: [PATCH 30/30] reformatted files --- server/src/client.rs | 111 ++++++++++++++--------- server/src/client_manager.rs | 34 ++++--- server/src/main.rs | 8 +- server/src/messages.rs | 6 +- server/src/network/mod.rs | 165 ++++++++++++++++++++-------------- server/src/network_manager.rs | 42 ++++----- server/src/prelude.rs | 14 ++- server/src/server.rs | 60 +++++++------ 8 files changed, 254 insertions(+), 186 deletions(-) diff --git a/server/src/client.rs b/server/src/client.rs index 0318bcb..278feaf 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,18 +1,20 @@ -use std::sync::Arc; +use tokio::net::TcpStream; use std::cmp::Ordering; +use std::sync::Arc; use uuid::Uuid; use futures::lock::Mutex; -use tokio::sync::mpsc::{Sender, Receiver, channel}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::network::SocketHandler; -use crate::messages::ClientMessage; -use crate::messages::ServerMessage; -use crate::prelude::StreamMessageSender; use foundation::ClientDetails; +use foundation::network::SocketHandler; +use foundation::prelude::StreamMessageSender; use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; +use crate::messages::ClientMessage; +use crate::messages::ServerMessage; + /// # Client /// This struct represents a connected user. /// @@ -34,7 +36,7 @@ pub struct Client { tx: Sender, rx: Mutex>, - socket_sender: Arc, + socket_sender: Arc>, } // client funciton implmentations @@ -43,7 +45,7 @@ impl Client { uuid: String, username: String, address: String, - socket_sender: Arc, + socket_sender: Arc>, server_channel: Sender, ) -> Arc { let (sender, receiver) = channel(1024); @@ -53,7 +55,7 @@ impl Client { uuid: Uuid::parse_str(&uuid).expect("invalid id"), username, address, - public_key: None + public_key: None, }, server_channel: Mutex::new(server_channel), @@ -61,12 +63,10 @@ impl Client { tx: sender, rx: Mutex::new(receiver), - }) } pub fn start(self: &Arc) { - let t1_client = self.clone(); let t2_client = self.clone(); @@ -76,7 +76,11 @@ impl Client { let client = t1_client; - client.socket_sender.send::(ClientStreamOut::Connected).await.expect("error"); + client + .socket_sender + .send::(ClientStreamOut::Connected) + .await + .expect("error"); loop { let command = client.socket_sender.recv::().await; @@ -87,23 +91,36 @@ impl Client { return; } Ok(ClientStreamIn::SendMessage { to, content }) => { - println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to); + println!( + "[Client {:?}]: send message to: {:?}", + &client.details.uuid, &to + ); let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientSendMessage { - from: client.details.uuid, - to, - content, - }).await; + let _ = lock + .send(ServerMessage::ClientSendMessage { + from: client.details.uuid, + to, + content, + }) + .await; } Ok(ClientStreamIn::Update) => { println!("[Client {:?}]: update received", &client.details.uuid); let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await; + let _ = lock + .send(ServerMessage::ClientUpdate { + to: client.details.uuid, + }) + .await; } _ => { println!("[Client {:?}]: command not found", &client.details.uuid); let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await; + let _ = lock + .send(ServerMessage::ClientError { + to: client.details.uuid, + }) + .await; } } } @@ -111,7 +128,7 @@ impl Client { // client channel read thread tokio::spawn(async move { - use ClientMessage::{Disconnect, Message, SendClients, Error}; + use ClientMessage::{Disconnect, Error, Message, SendClients}; let client = t2_client; @@ -125,32 +142,42 @@ impl Client { match message { Disconnect => { let lock = client.server_channel.lock().await; - let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await; - return + let _ = lock + .send(ServerMessage::ClientDisconnected { + id: client.details.uuid, + }) + .await; + return; } - Message { from, content } => - client.socket_sender.send::( - ClientStreamOut::UserMessage { from, content } - ).await.expect("error sending message"), - - SendClients { clients } => { - let client_details_vec: Vec = - clients.iter().map(|client| &client.details) - .cloned().collect(); + Message { from, content } => client + .socket_sender + .send::(ClientStreamOut::UserMessage { from, content }) + .await + .expect("error sending message"), - client.socket_sender.send::( - ClientStreamOut::ConnectedClients { + SendClients { clients } => { + let client_details_vec: Vec = clients + .iter() + .map(|client| &client.details) + .cloned() + .collect(); + + client + .socket_sender + .send::(ClientStreamOut::ConnectedClients { clients: client_details_vec, - } - ).await.expect("error sending message"); - }, - Error => - client.socket_sender.send::( - ClientStreamOut::Error - ).await.expect("error sending message"), + }) + .await + .expect("error sending message"); + } + Error => client + .socket_sender + .send::(ClientStreamOut::Error) + .await + .expect("error sending message"), } } - }); + }); } pub async fn send_message(self: &Arc, msg: ClientMessage) { diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs index d750306..7e8d8c3 100644 --- a/server/src/client_manager.rs +++ b/server/src/client_manager.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; use std::sync::Arc; -use uuid::Uuid; -use tokio::sync::mpsc::{channel, Receiver, Sender}; use futures::lock::Mutex; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use uuid::Uuid; use crate::client::Client; use crate::messages::ClientMessage; @@ -37,19 +37,17 @@ impl ClientManager { } pub fn start(self: &Arc) { - let client_manager = self.clone(); tokio::spawn(async move { - - use ClientMgrMessage::{Add, Remove, SendClients, SendMessage, SendError}; + use ClientMgrMessage::{Add, Remove, SendClients, SendError, SendMessage}; loop { let mut receiver = client_manager.rx.lock().await; let message = receiver.recv().await.unwrap(); println!("[Client manager]: recieved message: {:?}", message); - + match message { Add(client) => { println!("[Client Manager]: adding new client"); @@ -66,25 +64,28 @@ impl ClientManager { } } SendMessage { to, from, content } => { - client_manager.send_to_client(&to, ClientMessage::Message { from, content }).await; + client_manager + .send_to_client(&to, ClientMessage::Message { from, content }) + .await; } SendClients { to } => { let lock = client_manager.clients.lock().await; if let Some(client) = lock.get(&to) { - let clients_vec: Vec> = - lock.values().cloned().collect(); + let clients_vec: Vec> = lock.values().cloned().collect(); - client.send_message(ClientMessage::SendClients { - clients: clients_vec, - }).await + client + .send_message(ClientMessage::SendClients { + clients: clients_vec, + }) + .await } - }, + } SendError { to } => { let lock = client_manager.clients.lock().await; if let Some(client) = lock.get(&to) { client.send_message(ClientMessage::Error).await } - }, + } #[allow(unreachable_patterns)] _ => println!("[Client manager]: not implemented"), } @@ -99,10 +100,7 @@ impl ClientManager { } } - pub async fn send_message( - self: Arc, - message: ClientMgrMessage) - { + pub async fn send_message(self: Arc, message: ClientMgrMessage) { let _ = self.tx.send(message).await; } } diff --git a/server/src/main.rs b/server/src/main.rs index 221008b..b6b69b4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,11 +1,9 @@ pub mod client; pub mod client_manager; -pub mod messages; pub mod network_manager; -pub mod network; -pub mod server; -pub mod encryption; pub mod prelude; +pub mod server; +pub mod messages; use std::io; @@ -34,5 +32,5 @@ async fn main() -> io::Result<()> { let server = Server::new().unwrap(); server.start().await; - Ok(()) + Ok(()) } diff --git a/server/src/messages.rs b/server/src/messages.rs index f703171..b9306c2 100644 --- a/server/src/messages.rs +++ b/server/src/messages.rs @@ -28,7 +28,7 @@ pub enum ClientMgrMessage { }, SendError { to: Uuid, - } + }, } #[derive(Debug)] @@ -48,6 +48,6 @@ pub enum ServerMessage { to: Uuid, }, ClientError { - to: Uuid - } + to: Uuid, + }, } diff --git a/server/src/network/mod.rs b/server/src/network/mod.rs index c622d7d..b6e420f 100644 --- a/server/src/network/mod.rs +++ b/server/src/network/mod.rs @@ -1,23 +1,24 @@ -use std::sync::Arc; -use std::io::Write; -use std::io::Error; use std::fmt::Debug; +use std::io::Error; +use std::io::Write; +use std::sync::Arc; use async_trait::async_trait; -use serde::Serialize; use serde::de::DeserializeOwned; +use serde::Serialize; use tokio::io::split; -use tokio::sync::Mutex; -use tokio::io::ReadHalf; -use tokio::io::BufReader; -use tokio::io::WriteHalf; -use tokio::net::TcpStream; -use tokio::io::AsyncWriteExt; use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; +use tokio::io::{BufReader, BufWriter}; +use tokio::net::TcpStream; +use tokio::sync::Mutex; use crate::prelude::StreamMessageSender; +use crate::prelude::TransformerFn; -type TransformerVec = Vec &[u8]>; +type TransformerVec = Vec; pub struct SocketHandler { stream_tx: Mutex>, @@ -41,117 +42,147 @@ impl SocketHandler { }) } - pub async fn push_layer( - self: &Arc, - send_func: fn(&[u8]) -> &[u8], - recv_func: fn(&[u8]) -> &[u8], - ) { + pub async fn push_layer(self: &Arc, send_func: TransformerFn, recv_func: TransformerFn) { let mut send_lock = self.send_transformer.lock().await; let mut recv_lock = self.recv_transformer.lock().await; send_lock.push(send_func); - recv_lock.reverse(); recv_lock.push(recv_func); - recv_lock.reverse(); } - pub async fn pop_layer(self: &Arc,) { + pub async fn pop_layer(self: &Arc) { let mut send_lock = self.send_transformer.lock().await; let mut recv_lock = self.recv_transformer.lock().await; - let _ = send_lock.pop(); - - recv_lock.reverse(); let _ = recv_lock.pop(); - recv_lock.reverse(); } } #[async_trait] impl StreamMessageSender for SocketHandler { - async fn send - (self: &Arc, message: TOutMessage) -> Result<(), Error> - { + async fn send( + self: &Arc, + message: TOutMessage, + ) -> Result<(), Error> { let mut out_buffer: Vec = Vec::new(); let message_string = serde_json::to_string(&message)?; writeln!(out_buffer, "{}", message_string)?; + + println!("[SocketHandler:send] message_before: {:?}", &out_buffer); + + let transformers = self.send_transformer.lock().await; + let iter = transformers.iter(); + + for func in iter { + out_buffer = (**func)(&out_buffer); + } + + println!("[SocketHandler:send] message_after: {:?}", &out_buffer); + let mut lock = self.stream_tx.lock().await; lock.write_all(&out_buffer).await?; lock.flush().await?; Ok(()) } - async fn recv<'de, TInMessage: DeserializeOwned + Send> - (self: &Arc) -> Result - { + async fn recv<'de, TInMessage: DeserializeOwned + Send>( + self: &Arc, + ) -> Result { let mut in_buffer = String::new(); let mut lock = self.stream_rx.lock().await; lock.read_line(&mut in_buffer).await?; - let message: TInMessage = serde_json::from_str(&in_buffer) - .expect("[StreamMessageSender:recv] deserialisation failed"); + + println!("[SocketHandler:recv] message_before: {:?}", &in_buffer); + + let transformers = self.recv_transformer.lock().await; + let iter = transformers.iter(); + + let mut in_buffer = in_buffer.into_bytes(); + + for func in iter { + in_buffer = (**func)(&in_buffer); + } + + println!("[SocketHandler:recv] message_after: {:?}", &in_buffer); + + let in_buffer = String::from_utf8(in_buffer).expect("invalid utf_8"); + + let message: TInMessage = serde_json::from_str(&in_buffer).unwrap(); Ok(message) } } impl Debug for SocketHandler { - - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) - -> std::result::Result<(), std::fmt::Error> { - write!(f, "[SocketSender]") + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!(f, "[SocketSender]") } } #[cfg(test)] mod test { + use tokio::runtime::Runtime; + use std::sync::Once; use std::time::Duration; - use tokio::net::TcpStream; - use tokio::net::TcpListener; - use tokio::time::sleep; + use tokio::task; use tokio::io::{AsyncReadExt, AsyncWriteExt}; - + use tokio::net::TcpListener; + use tokio::net::TcpStream; + use tokio::time::sleep; + use super::SocketHandler; + use crate::helpers::start_server; + use crate::helpers::create_test_shared; use crate::prelude::StreamMessageSender; + use crate::encryption::create_encryption_transformers; - async fn start_server() { - let listener = TcpListener::bind("127.0.0.1:5600").await.expect("failed to create listener"); - let mut buf = [0; 1024]; - loop { - let (mut socket, _) = listener.accept().await.expect("failed to accept connection"); + static SERVER_INIT: Once = Once::new(); - tokio::spawn(async move { - let n = match socket.read(&mut buf).await { - // socket closed - Ok(n) if n == 0 => return, - Ok(n) => n, - Err(e) => { - println!("failed to read from socket; err = {:?}", e); - return; - } - }; - - // Write the data back - if let Err(e) = socket.write_all(&buf[0..n]).await { - println!("failed to write to socket; err = {:?}", e); - return; - } + fn setup() { + SERVER_INIT.call_once(|| { + std::thread::spawn(|| { + let rt = Runtime::new().unwrap(); + rt.block_on(start_server()) + }); - } + }) } #[tokio::test] - async fn test_socket_sender() { - tokio::spawn(start_server()); + async fn test_socket_sender() { + setup(); + task::spawn(start_server()); - let socket = TcpStream::connect("localhost:5600").await.expect("failed to connect"); + let socket = TcpStream::connect("localhost:5600") + .await + .expect("failed to connect"); - sleep(Duration::from_secs(1)).await; - let handle = SocketHandler::new(socket); let _ = handle.send::(true).await; let message = handle.recv::().await.unwrap(); assert!(message); } -} \ No newline at end of file + + #[tokio::test] + async fn test_socket_sender_with_encryption() { + setup(); + task::spawn(start_server()); + + let socket = TcpStream::connect("localhost:5600") + .await + .unwrap(); + + let shared = create_test_shared(); + let (en, de) = create_encryption_transformers(shared, b"12345678901234567890123456789011"); + let handle = SocketHandler::new(socket); + + handle.push_layer(en, de).await; + + let _ = handle.send::(true).await; + let message = handle.recv::().await.unwrap(); + + assert!(message); + } +} diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs index 7d5e478..7329656 100644 --- a/server/src/network_manager.rs +++ b/server/src/network_manager.rs @@ -3,11 +3,12 @@ use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc::Sender; -use crate::client::Client; -use crate::network::SocketHandler; -use crate::messages::ServerMessage; -use crate::prelude::StreamMessageSender; +use foundation::prelude::StreamMessageSender; use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; +use foundation::network::SocketHandler; + +use crate::client::Client; +use crate::messages::ServerMessage; pub struct NetworkManager { address: String, @@ -23,11 +24,12 @@ impl NetworkManager { } pub fn start(self: &Arc) { - let network_manager = self.clone(); tokio::spawn(async move { - let listener = TcpListener::bind(network_manager.address.clone()).await.unwrap(); + let listener = TcpListener::bind(network_manager.address.clone()) + .await + .unwrap(); loop { let (connection, _) = listener.accept().await.unwrap(); @@ -35,22 +37,21 @@ impl NetworkManager { let server_channel = network_manager.server_channel.clone(); tokio::spawn(async move { + stream_sender + .send::(NetworkSockOut::Request) + .await + .expect("failed to send message"); - stream_sender.send::(NetworkSockOut::Request) - .await.expect("failed to send message"); - - if let Ok(request) = - stream_sender.recv::().await - { - + if let Ok(request) = stream_sender.recv::().await { match request { NetworkSockIn::Info => { - stream_sender.send( - NetworkSockOut::GotInfo { + stream_sender + .send(NetworkSockOut::GotInfo { server_name: "oof", server_owner: "michael", - } - ).await.expect("failed to send got info"); + }) + .await + .expect("failed to send got info"); } NetworkSockIn::Connect { uuid, @@ -66,14 +67,13 @@ impl NetworkManager { server_channel.clone(), ); let _ = server_channel - .send(ServerMessage::ClientConnected { - client: new_client, - }).await; + .send(ServerMessage::ClientConnected { client: new_client }) + .await; } } } }); } - }); + }); } } diff --git a/server/src/prelude.rs b/server/src/prelude.rs index e9b8154..1242c64 100644 --- a/server/src/prelude.rs +++ b/server/src/prelude.rs @@ -2,12 +2,18 @@ use std::sync::Arc; use async_trait::async_trait; -use serde::Serialize; use serde::de::DeserializeOwned; - +use serde::Serialize; #[async_trait] pub trait StreamMessageSender { - async fn send(self: &Arc, message: TOutMessage) -> Result<(), std::io::Error>; - async fn recv<'de, TInMessage: DeserializeOwned + Send>(self: &Arc) -> Result; + async fn send( + self: &Arc, + message: TOutMessage, + ) -> Result<(), std::io::Error>; + async fn recv<'de, TInMessage: DeserializeOwned + Send>( + self: &Arc, + ) -> Result; } + +pub type TransformerFn = Box Vec + Send + Sync>; diff --git a/server/src/server.rs b/server/src/server.rs index 9d72786..cb26327 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,9 +1,9 @@ use std::sync::Arc; // use crossbeam_channel::{unbounded, Receiver}; -use uuid::Uuid; -use tokio::sync::mpsc::{channel, Receiver}; use futures::lock::Mutex; +use tokio::sync::mpsc::{channel, Receiver}; +use uuid::Uuid; use crate::client_manager::ClientManager; use crate::messages::ClientMgrMessage; @@ -22,7 +22,7 @@ pub enum ServerMessages { /// authors: @michael-bailey, @Mitch161 /// This Represents a server instance. /// it is componsed of a client manager and a network manager -/// +/// pub struct Server { client_manager: Arc, network_manager: Arc, @@ -34,19 +34,14 @@ impl Server { pub fn new() -> Result, Box> { let (sender, receiver) = channel(1024); - Ok( - Arc::new( - Server { - client_manager: ClientManager::new(sender.clone()), - network_manager: NetworkManager::new("5600".to_string(), sender), - receiver: Mutex::new(receiver), - } - ) - ) + Ok(Arc::new(Server { + client_manager: ClientManager::new(sender.clone()), + network_manager: NetworkManager::new("5600".to_string(), sender), + receiver: Mutex::new(receiver), + })) } pub async fn start(self: &Arc) { - // start client manager and network manager self.network_manager.clone().start(); self.client_manager.clone().start(); @@ -54,7 +49,6 @@ impl Server { // clone block items let server = self.clone(); - use ClientMgrMessage::{Add, Remove, SendMessage}; loop { @@ -64,25 +58,39 @@ impl Server { match message { ServerMessage::ClientConnected { client } => { - server.client_manager.clone() - .send_message(Add(client)).await + server + .client_manager + .clone() + .send_message(Add(client)) + .await } ServerMessage::ClientDisconnected { id } => { println!("disconnecting client {:?}", id); server.client_manager.clone().send_message(Remove(id)).await; } - ServerMessage::ClientSendMessage { from, to, content } => server - .client_manager.clone() - .send_message(SendMessage { from, to, content }).await, - ServerMessage::ClientUpdate { to } => server - .client_manager.clone() - .send_message(ClientMgrMessage::SendClients { to }).await, - ServerMessage::ClientError { to } => server - .client_manager.clone() - .send_message(ClientMgrMessage::SendError {to}).await, + ServerMessage::ClientSendMessage { from, to, content } => { + server + .client_manager + .clone() + .send_message(SendMessage { from, to, content }) + .await + } + ServerMessage::ClientUpdate { to } => { + server + .client_manager + .clone() + .send_message(ClientMgrMessage::SendClients { to }) + .await + } + ServerMessage::ClientError { to } => { + server + .client_manager + .clone() + .send_message(ClientMgrMessage::SendError { to }) + .await + } } } } } } - -- 2.40.1