diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml deleted file mode 100644 index edf72fb..0000000 --- a/.github/workflows/rust.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: Rust - -on: - push: - branches: [ ref-method ] - pull_request: - branches: [ master ] - -env: - CARGO_TERM_COLOR: always - -jobs: - build: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - - steps: - - uses: actions/checkout@v2 - - name: check - run: cargo check --verbose - - name: Build - run: cargo build --verbose - diff --git a/.gitignore b/.gitignore index d422aab..ad414a9 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ Cargo.lock .vscode/launch.json *.cer *.pem +.vscode/settings.json diff --git a/Cargo.toml b/Cargo.toml index d430020..00e10ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,7 @@ -[package] -name = "rust-chat-server" -version = "0.1.5" -authors = ["Mitchell "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -regex = "1" -crossbeam = "0.7" -crossbeam-channel = "0.4" -crossbeam-utils = "0.7" -crossbeam-queue = "0.2" -parking_lot = "0.10" -dashmap = "3.11.4" -rayon = "1.3.1" -zeroize = "1.1.0" -crossterm = "0.17.7" -clap = "3.0.0-beta.1" -log = "0.4" -cursive = { version = "0.15.0", default-features = false, features = ["crossterm-backend"]} -openssl = { version = "0.10", features = ["vendored"] } - - -[profile.dev] -opt-level = 0 - -[profile.release] -opt-level = 3 +[workspace] +members = [ + 'foundation', + 'server', + 'client', + 'serverctl' +] \ No newline at end of file diff --git a/LICENSE b/LICENSE index f288702..43fab77 100644 --- a/LICENSE +++ b/LICENSE @@ -42,7 +42,7 @@ know their rights. giving you legal permission to copy, distribute and/or modify it. For the developers' and authors' protection, the GPL clearly explains -that there is no warranty for this free software. For both users' and +that there is No warranty for this free software. For both users' and authors' sake, the GPL requires that modified versions be marked as changed, so that their problems will not be attributed erroneously to authors of previous versions. @@ -98,14 +98,14 @@ public, and in some countries other activities as well. To "convey" a work means any kind of propagation that enables other parties to make or receive copies. Mere interaction with a user through -a computer network, with no transfer of a copy, is not conveying. +a computer network, with No transfer of a copy, is not conveying. An interactive user interface displays "Appropriate Legal Notices" to the extent that it includes a convenient and prominently visible feature that (1) displays an appropriate copyright notice, and (2) -tells the user that there is no warranty for the work (except to the +tells the user that there is No warranty for the work (except to the extent that warranties are provided), that licensees may convey the -work under this License, and how to view a copy of this License. If +work under this License, and how to View a copy of this License. If the interface presents a list of user commands or options, such as a menu, a prominent item in the list meets this criterion. @@ -202,7 +202,7 @@ non-permissive terms added in accord with section 7 apply to the code; keep intact all notices of the absence of any warranty; and give all recipients a copy of this License along with the Program. - You may charge any price or no price for each copy that you convey, + You may charge any price or No price for each copy that you convey, and you may offer support or warranty protection for a fee. 5. Conveying Modified Source Versions. @@ -223,7 +223,7 @@ terms of section 4, provided that you also meet all of these conditions: License to anyone who comes into possession of a copy. This License will therefore apply, along with any applicable section 7 additional terms, to the whole of the work, and all its parts, - regardless of how they are packaged. This License gives no + regardless of how they are packaged. This License gives No permission to license the work in any other way, but it does not invalidate such permission if you have separately received it. @@ -258,13 +258,13 @@ in one of these ways: (including a physical distribution medium), accompanied by a written offer, valid for at least three years and valid for as long as you offer spare parts or customer support for that product - model, to give anyone who possesses the object code either (1) a + Model, to give anyone who possesses the object code either (1) a copy of the Corresponding Source for all the software in the product that is covered by this License, on a durable physical - medium customarily used for software interchange, for a price no + medium customarily used for software interchange, for a price No more than your reasonable cost of physically performing this conveying of source, or (2) access to copy the - Corresponding Source from a network server at no charge. + Corresponding Source from a network server at No charge. c) Convey individual copies of the object code with a copy of the written offer to provide the Corresponding Source. This @@ -274,7 +274,7 @@ in one of these ways: d) Convey the object code by offering access from a designated place (gratis or for a charge), and offer equivalent access to the - Corresponding Source in the same way through the same place at no + Corresponding Source in the same way through the same place at No further charge. You need not require recipients to copy the Corresponding Source along with the object code. If the place to copy the object code is a network server, the Corresponding Source @@ -287,7 +287,7 @@ in one of these ways: e) Convey the object code using peer-to-peer transmission, provided you inform other peers where the object code and Corresponding - Source of the work are being offered to the general public at no + Source of the work are being offered to the general public at No charge under subsection 6d. A separable portion of the object code, whose source code is excluded @@ -312,7 +312,7 @@ procedures, authorization keys, or other information required to install and execute modified versions of a covered work in that User Product from a modified version of its Corresponding Source. The information must suffice to ensure that the continued functioning of the modified object -code is in no case prevented or interfered with solely because +code is in No case prevented or interfered with solely because modification has been made. If you convey an object code work under this section in, or with, or @@ -337,7 +337,7 @@ protocols for communication across the network. Corresponding Source conveyed, and Installation Information provided, in accord with this section must be in a format that is publicly documented (and with an implementation available to the public in -source code form), and must require no special password or key for +source code form), and must require No special password or key for unpacking, reading or copying. 7. Additional Terms. @@ -582,7 +582,7 @@ public statement of acceptance of a version permanently authorizes you to choose that version for the Program. Later license versions may give you additional or different -permissions. However, no additional obligations are imposed on any +permissions. However, No additional obligations are imposed on any author or copyright holder as a result of your choosing to follow a later version. diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..bd28415 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "client" +version = "0.1.0" +authors = ["michael-bailey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..a30eb95 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/foundation/Cargo.toml b/foundation/Cargo.toml new file mode 100644 index 0000000..a20e1ab --- /dev/null +++ b/foundation/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "foundation" +version = "0.1.0" +authors = ["Mitchell ","michael-bailey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] + +[dependencies] +regex = "1" +crossbeam = "0.8.0" +crossbeam-channel = "0.5.0" +crossbeam-queue = "0.3.1" +parking_lot = "0.11.1" +dashmap = "4.0.2" +rayon = "1.3.1" +zeroize = "1.1.0" +crossterm = "0.19.0" +log = "0.4" +url = "2.2.0" +uuid = {version = "0.8", features = ["serde", "v4"]} +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + diff --git a/foundation/src/lib.rs b/foundation/src/lib.rs new file mode 100644 index 0000000..3ff3748 --- /dev/null +++ b/foundation/src/lib.rs @@ -0,0 +1,12 @@ +pub mod messages; +pub mod prelude; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct ClientDetails { + pub uuid: Uuid, + pub username: String, + pub address: String, +} \ No newline at end of file diff --git a/foundation/src/messages/client.rs b/foundation/src/messages/client.rs new file mode 100644 index 0000000..cabe3bc --- /dev/null +++ b/foundation/src/messages/client.rs @@ -0,0 +1,31 @@ +use crate::ClientDetails; +use serde::{Deserialize, Serialize}; + +use uuid::Uuid; + +/// # ClientMessage +/// This enum defined the message that a client can receive from the server +/// This uses the serde library to transform to and from json. +/// +#[derive(Serialize, Deserialize)] +pub enum ClientStreamIn { + Connected, + + Update, + SendMessage { to: Uuid, content: String }, + SendGlobalMessage { content: String }, + + Disconnect, +} + +#[derive(Serialize, Deserialize)] +pub enum ClientStreamOut { + Connected, + + UserMessage { from: Uuid, content: String }, + GlobalMessage { content: String }, + + ConnectedClients {clients: Vec}, + + Disconnected, +} diff --git a/foundation/src/messages/mod.rs b/foundation/src/messages/mod.rs new file mode 100644 index 0000000..bd258a6 --- /dev/null +++ b/foundation/src/messages/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod network; diff --git a/foundation/src/messages/network.rs b/foundation/src/messages/network.rs new file mode 100644 index 0000000..98a2683 --- /dev/null +++ b/foundation/src/messages/network.rs @@ -0,0 +1,22 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub enum NetworkSockIn { + Info, + Connect { + uuid: String, + username: String, + address: String, + }, +} + +#[derive(Serialize, Deserialize)] +pub enum NetworkSockOut<'a> { + Request, + + GotInfo { + server_name: &'a str, + server_owner: &'a str, + }, + Connecting, +} diff --git a/foundation/src/prelude.rs b/foundation/src/prelude.rs new file mode 100644 index 0000000..92ff1d7 --- /dev/null +++ b/foundation/src/prelude.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +pub trait IMessagable { + fn send_message(&self, msg: TMessage); + fn set_sender(&self, sender: TSender); +} + +pub trait ICooperative { + fn tick(&self); +} + +pub trait IPreemptive { + fn run(arc: &Arc); + fn start(arc: &Arc); +} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..779de58 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +hard_tabs = true +max_width = 90 \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..a91dd21 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "server" +version = "0.1.0" +authors = ["michael-bailey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = "2.33.3" +uuid = {version = "0.8", features = ["serde", "v4"]} +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +crossbeam = "0.8.0" +crossbeam-channel = "0.5.0" + +[dependencies.foundation] +path = '../foundation' \ No newline at end of file diff --git a/server/src/client.rs b/server/src/client.rs new file mode 100644 index 0000000..d5e1efd --- /dev/null +++ b/server/src/client.rs @@ -0,0 +1,280 @@ +use crate::messages::ClientMessage; +use crate::messages::ServerMessage; +use foundation::prelude::IPreemptive; +use std::cmp::Ordering; +use std::io::BufRead; +use std::io::Write; +use std::io::{BufReader, BufWriter}; +use std::mem::replace; +use std::net::TcpStream; +use std::sync::Arc; +use std::sync::Mutex; + +use crossbeam_channel::{unbounded, Receiver, Sender}; +use serde::Serialize; +use uuid::Uuid; + +use foundation::ClientDetails; +use foundation::messages::client::{ClientStreamIn, ClientStreamOut}; +use foundation::prelude::IMessagable; + +/// # Client +/// This struct represents a connected user. +/// +/// ## Attrubutes +/// - uuid: The id of the connected user. +/// - username: The username of the connected user. +/// - address: The the address of the connected client. +/// +/// - stream: The socket for the connected client. +/// - stream_reader: the buffered reader used to receive messages +/// - stream_writer: the buffered writer used to send messages +/// - owner: An optional reference to the owning object. +#[derive(Debug, Serialize)] +pub struct Client { + pub uuid: Uuid, + username: String, + address: String, + pub details: ClientDetails, + + // non serializable + #[serde(skip)] + server_channel: Mutex>>, + + #[serde(skip)] + input: Sender, + + #[serde(skip)] + output: Receiver, + + #[serde(skip)] + stream: Mutex>, + + #[serde(skip)] + stream_reader: Mutex>>, + + #[serde(skip)] + stream_writer: Mutex>>, +} + +// client funciton implmentations +impl Client { + pub fn new( + uuid: String, + username: String, + address: String, + stream: TcpStream, + server_channel: Sender, + ) -> Arc { + let (sender, receiver) = unbounded(); + + let out_stream = stream.try_clone().unwrap(); + let in_stream = stream.try_clone().unwrap(); + + Arc::new(Client { + username: username.clone(), + uuid: Uuid::parse_str(&uuid).expect("invalid id"), + address: address.clone(), + + details: ClientDetails { + uuid: Uuid::parse_str(&uuid).expect("invalid id"), + username, + address, + }, + + server_channel: Mutex::new(Some(server_channel)), + + input: sender, + output: receiver, + + stream: Mutex::new(Some(stream)), + + stream_reader: Mutex::new(Some(BufReader::new(in_stream))), + stream_writer: Mutex::new(Some(BufWriter::new(out_stream))), + }) + } +} + +impl IMessagable> for Client { + fn send_message(&self, msg: ClientMessage) { + self.input + .send(msg) + .expect("failed to send message to client."); + } + fn set_sender(&self, sender: Sender) { + let mut server_lock = self.server_channel.lock().unwrap(); + let _ = replace(&mut *server_lock, Some(sender)); + } +} + +// cooperative multitasking implementation +impl IPreemptive for Client { + fn run(arc: &Arc) { + let arc1 = arc.clone(); + let arc2 = arc.clone(); + + // read thread + let _ = std::thread::Builder::new() + .name(format!("client thread recv [{:?}]", &arc.uuid)) + .spawn(move || { + use ClientMessage::{Disconnect}; + let arc = arc1; + + let mut buffer = String::new(); + let mut reader_lock = arc.stream_reader.lock().unwrap(); + let reader = reader_lock.as_mut().unwrap(); + + 'main: while let Ok(size) = reader.read_line(&mut buffer) { + if size == 0 { + arc.send_message(Disconnect); + break 'main; + } + + let command = serde_json::from_str::(buffer.as_str()); + match command { + Ok(ClientStreamIn::Disconnect) => { + println!("[Client {:?}]: Disconnect recieved", &arc.uuid); + arc.send_message(Disconnect); + break 'main; + } + Ok(ClientStreamIn::SendMessage { to, content }) => { + println!( + "[Client {:?}]: send message to: {:?}", + &arc.uuid, &to + ); + let lock = arc.server_channel.lock().unwrap(); + let sender = lock.as_ref().unwrap(); + let _ = sender.send(ServerMessage::ClientSendMessage { + from: arc.uuid, + to, + content, + }); + } + _ => println!("[Client {:?}]: command not found", &arc.uuid), + } + } + println!("[Client {:?}] exited thread 1", &arc.uuid); + }); + + // write thread + let _ = std::thread::Builder::new() + .name(format!("client thread msg [{:?}]", &arc.uuid)) + .spawn(move || { + let arc = arc2; + let mut writer_lock = arc.stream_writer.lock().unwrap(); + let writer = writer_lock.as_mut().unwrap(); + let mut buffer: Vec = Vec::new(); + + let _ = writeln!( + buffer, + "{}", + serde_json::to_string(&ClientStreamOut::Connected).unwrap() + ); + let _ = writer.write_all(&buffer); + let _ = writer.flush(); + + 'main: loop { + for message in arc.output.iter() { + use ClientMessage::{Disconnect,Message, Update}; + println!("[Client {:?}]: {:?}", &arc.uuid, message); + match message { + Disconnect => { + arc.server_channel + .lock() + .unwrap() + .as_mut() + .unwrap() + .send(ServerMessage::ClientDisconnected(arc.uuid)) + .unwrap(); + break 'main; + } + Message { from, content } => { + let _ = writeln!( + buffer, + "{}", + serde_json::to_string( + &ClientStreamOut::UserMessage { from, content } + ) + .unwrap() + ); + let _ = writer.write_all(&buffer); + let _ = writer.flush(); + } + Update {clients} => { + let client_details_vec: Vec = clients.iter().map(|client| &client.details).cloned().collect(); + let _ = writeln!( + buffer, + "{}", + serde_json::to_string( + &ClientStreamOut::ConnectedClients {clients: client_details_vec} + ).unwrap() + ); + let _ = writer.write_all(&buffer); + let _ = writer.flush(); + } + } + } + } + println!("[Client {:?}]: exited thread 2", &arc.uuid); + }); + } + + fn start(arc: &Arc) { + Client::run(arc) + } +} + +// default value implementation +impl Default for Client { + fn default() -> Self { + let (sender, reciever) = unbounded(); + Client { + username: "generic_client".to_string(), + uuid: Uuid::new_v4(), + address: "127.0.0.1".to_string(), + + details: ClientDetails { + uuid: Uuid::new_v4(), + username: "generic_client".to_string(), + address: "127.0.0.1".to_string(), + }, + + output: reciever, + input: sender, + + server_channel: Mutex::new(None), + + stream: Mutex::new(None), + + stream_reader: Mutex::new(None), + stream_writer: Mutex::new(None), + } + } +} + +// MARK: - used for sorting. +impl PartialEq for Client { + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Client {} + +impl PartialOrd for Client { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Client { + fn cmp(&self, other: &Self) -> Ordering { + self.uuid.cmp(&other.uuid) + } +} + +impl Drop for Client { + fn drop(&mut self) { + println!("[Client] dropped!"); + } +} diff --git a/server/src/client_manager.rs b/server/src/client_manager.rs new file mode 100644 index 0000000..a3f4d51 --- /dev/null +++ b/server/src/client_manager.rs @@ -0,0 +1,114 @@ +// use crate::lib::server::ServerMessages; +use foundation::prelude::IPreemptive; +use std::collections::HashMap; +use std::mem::replace; +use std::sync::Arc; +use std::sync::Mutex; + +use crossbeam_channel::{unbounded, Receiver, Sender}; +use uuid::Uuid; + +use crate::client::Client; +use crate::messages::ClientMessage; +use crate::messages::ClientMgrMessage; +use crate::messages::ServerMessage; +use foundation::prelude::IMessagable; + +/// # ClientManager +/// This struct manages all connected users +#[derive(Debug)] +pub struct ClientManager { + clients: Mutex>>, + + server_channel: Mutex>, + + sender: Sender, + receiver: Receiver, +} + +impl ClientManager { + pub fn new(server_channel: Sender) -> Arc { + let (sender, receiver) = unbounded(); + + Arc::new(ClientManager { + clients: Mutex::default(), + + server_channel: Mutex::new(server_channel), + + sender, + receiver, + }) + } +} + +impl IMessagable> for ClientManager { + fn send_message(&self, msg: ClientMgrMessage) { + self.sender.send(msg).unwrap(); + } + fn set_sender(&self, sender: Sender) { + let mut server_lock = self.server_channel.lock().unwrap(); + let _ = replace(&mut *server_lock, sender); + } +} + +impl IPreemptive for ClientManager { + fn run(arc: &Arc) { + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + + if !arc.receiver.is_empty() { + for message in arc.receiver.try_iter() { + println!("[Client manager]: recieved message: {:?}", message); + use ClientMgrMessage::{Add, Remove, SendMessage, SendClients}; + + match message { + Add(client) => { + println!("[Client Manager]: adding new client"); + Client::start(&client); + let mut lock = arc.clients.lock().unwrap(); + if lock.insert(client.uuid, client).is_none() { + println!("value is new"); + } + }, + Remove(uuid) => { + println!("[Client Manager]: removing client: {:?}", &uuid); + if let Some(client) = + arc.clients.lock().unwrap().remove(&uuid) + { + client.send_message(ClientMessage::Disconnect); + } + }, + SendMessage { to, from, content } => { + let lock = arc.clients.lock().unwrap(); + if let Some(client) = lock.get(&to) { + client.send_message(ClientMessage::Message { + from, + content, + }) + } + }, + SendClients {to} => { + let lock = arc.clients.lock().unwrap(); + if let Some(client) = lock.get(&to) { + let clients_vec: Vec> = lock.values().cloned().collect(); + + client.send_message(ClientMessage::Update { + clients: clients_vec, + }) + } + }, + + + #[allow(unreachable_patterns)] + _ => println!("[Client manager]: not implemented"), + } + } + } + } + } + + fn start(arc: &Arc) { + let arc = arc.clone(); + std::thread::spawn(move || ClientManager::run(&arc)); + } +} diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..dfc409f --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,29 @@ +pub mod client; +pub mod client_manager; +pub mod messages; +pub mod network_manager; +pub mod server; + +use clap::{App, Arg}; + +use foundation::prelude::IPreemptive; +use server::Server; + +fn main() { + let _args = App::new("--rust chat server--") + .version("0.1.5") + .author("Mitchel Hardie , Michael Bailey ") + .about("this is a chat server developed in rust, depending on the version one of two implementations will be used") + .arg( + Arg::with_name("config") + .short("p") + .long("port") + .value_name("PORT") + .help("sets the port the server runs on.") + .takes_value(true)) + .get_matches(); + + let server = Server::new(); + + Server::run(&server); +} diff --git a/server/src/messages.rs b/server/src/messages.rs new file mode 100644 index 0000000..f5d2e11 --- /dev/null +++ b/server/src/messages.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; +use uuid::Uuid; + +use crate::client::Client; + +#[derive(Debug)] +pub enum ClientMessage { + Message { from: Uuid, content: String }, + + Update {clients: Vec>}, + + Disconnect, +} + +#[derive(Debug)] +pub enum ClientMgrMessage { + Remove(Uuid), + Add(Arc), + SendClients {to: Uuid}, + SendMessage { + from: Uuid, + to: Uuid, + content: String, + }, +} + +#[derive(Debug)] +pub enum ServerMessage { + ClientConnected(Arc), + ClientSendMessage { + from: Uuid, + to: Uuid, + content: String, + }, + ClientDisconnected(Uuid), + ClientUpdate(Uuid), +} diff --git a/server/src/network_manager.rs b/server/src/network_manager.rs new file mode 100644 index 0000000..5e450ac --- /dev/null +++ b/server/src/network_manager.rs @@ -0,0 +1,132 @@ +use foundation::prelude::IPreemptive; +use std::io::BufRead; +use std::io::BufReader; +use std::io::BufWriter; +use std::io::Write; +use std::net::TcpListener; +use std::sync::Arc; +use std::thread; + +use crossbeam_channel::Sender; + +use crate::client::Client; +use crate::messages::ServerMessage; +use foundation::messages::network::{NetworkSockIn, NetworkSockOut}; + +pub struct NetworkManager { + listener: TcpListener, + server_channel: Sender, +} + +impl NetworkManager { + pub fn new( + port: String, + server_channel: Sender, + ) -> Arc { + let mut address = "0.0.0.0:".to_string(); + address.push_str(&port); + + let listener = TcpListener::bind(address).expect("Could not bind to address"); + + Arc::new(NetworkManager { + listener, + server_channel, + }) + } +} + +impl IPreemptive for NetworkManager { + fn run(_: &Arc) {} + + fn start(arc: &Arc) { + let arc = arc.clone(); + std::thread::spawn(move || { + // fetch new connections and add them to the client queue + for connection in arc.listener.incoming() { + println!("[NetworkManager]: New Connection!"); + match connection { + Ok(stream) => { + let server_channel = arc.server_channel.clone(); + + // create readers + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); + + let _handle = thread::Builder::new() + .name("NetworkJoinThread".to_string()) + .spawn(move || { + let mut out_buffer: Vec = Vec::new(); + let mut in_buffer: String = String::new(); + + // send request message to connection + + let _ = writeln!( + out_buffer, + "{}", + serde_json::to_string(&NetworkSockOut::Request) + .unwrap() + ); + + let _ = writer.write_all(&out_buffer); + let _ = writer.flush(); + + // try get response + let res = reader.read_line(&mut in_buffer); + if res.is_err() { + return; + } + + //match the response + if let Ok(request) = + serde_json::from_str::(&in_buffer) + { + match request { + NetworkSockIn::Info => { + // send back server info to the connection + writer + .write_all( + serde_json::to_string( + &NetworkSockOut::GotInfo { + server_name: "oof", + server_owner: "michael", + }, + ) + .unwrap() + .as_bytes(), + ) + .unwrap(); + writer.write_all(b"\n").unwrap(); + writer.flush().unwrap(); + } + NetworkSockIn::Connect { + uuid, + username, + address, + } => { + // create client and send to server + let new_client = Client::new( + uuid, + username, + address, + stream.try_clone().unwrap(), + server_channel.clone(), + ); + server_channel + .send(ServerMessage::ClientConnected( + new_client, + )) + .unwrap_or_default(); + } + } + } + }); + } + Err(e) => { + println!("[Network manager]: error getting stream: {:?}", e); + continue; + } + } + } + }); + } +} diff --git a/server/src/server.rs b/server/src/server.rs new file mode 100644 index 0000000..2e7d7ec --- /dev/null +++ b/server/src/server.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; + +use crossbeam_channel::{unbounded, Receiver}; +use uuid::Uuid; + +use crate::client_manager::ClientManager; +use crate::messages::ClientMgrMessage; +use crate::messages::ServerMessage; +use crate::network_manager::NetworkManager; +use foundation::prelude::ICooperative; +use foundation::prelude::IMessagable; +use foundation::prelude::IPreemptive; + +/// # ServerMessages +/// This is used internally to send messages to the server to be dispatched +#[derive(Debug)] +pub enum ServerMessages { + ClientConnected(Arc), + ClientDisconnected(Uuid), +} + +pub struct Server { + client_manager: Arc, + network_manager: Arc, + + receiver: Receiver, +} + +impl Server { + pub fn new() -> Arc { + let (sender, receiver) = unbounded(); + + Arc::new(Server { + client_manager: ClientManager::new(sender.clone()), + + network_manager: NetworkManager::new("5600".to_string(), sender), + receiver, + }) + } +} + +impl ICooperative for Server { + fn tick(&self) { + use ClientMgrMessage::{Add, Remove, SendMessage}; + + // handle new messages loop + if !self.receiver.is_empty() { + for message in self.receiver.try_iter() { + println!("[server]: received message {:?}", &message); + match message { + ServerMessage::ClientConnected(client) => { + self.client_manager.send_message(Add(client)) + } + ServerMessage::ClientDisconnected(uuid) => { + println!("disconnecting client {:?}", uuid); + self.client_manager.send_message(Remove(uuid)); + } + ServerMessage::ClientSendMessage { from, to, content } => self + .client_manager + .send_message(SendMessage { from, to, content }), + ServerMessage::ClientUpdate (_uuid) => println!("not implemented"), + } + } + } + } +} + +impl IPreemptive for Server { + fn run(arc: &std::sync::Arc) { + // start services + NetworkManager::start(&arc.network_manager); + ClientManager::start(&arc.client_manager); + loop { + arc.tick(); + } + } + + fn start(arc: &std::sync::Arc) { + let arc = arc.clone(); + // start thread + std::thread::spawn(move || Server::run(&arc)); + } +} diff --git a/serverctl/Cargo.toml b/serverctl/Cargo.toml new file mode 100644 index 0000000..4293dee --- /dev/null +++ b/serverctl/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "serverctl" +version = "0.1.0" +authors = ["michael-bailey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/serverctl/src/main.rs b/serverctl/src/main.rs new file mode 100644 index 0000000..a30eb95 --- /dev/null +++ b/serverctl/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/src/client_api/mod.rs b/src/client_api/mod.rs deleted file mode 100644 index 1af8627..0000000 --- a/src/client_api/mod.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::{net::TcpStream, io::{Write, Read}, io}; -use crate::{ - server::client::client_profile::Client, - commands::Commands, -}; -use std::time::Duration; -use zeroize::Zeroize; - -pub struct ClientApi { - socket: TcpStream, - addr: String, - - pub on_client_add_handle: fn(Client) -> (), - pub on_client_remove_handle: fn(String) -> (), -} - -impl ClientApi { - pub fn new(addr: &str) -> Result { - let socket = TcpStream::connect(addr)?; - - let on_add = |_client: Client| {println!("Client_api: Client added {:?}", _client)}; - let on_remove = |_uuid: String| {println!("Client_api: Client removed {}", _uuid)}; - let a = Self { - socket, - addr: addr.to_string(), - on_client_add_handle: on_add, - on_client_remove_handle: on_remove, - }; - Ok(a) - } - - pub fn set_on_client_add(&mut self, func: fn(Client) -> ()) { - self.on_client_add_handle = func; - } - - pub fn set_on_client_removed(&mut self, func: fn(String) -> ()) { - self.on_client_remove_handle = func; - } - - pub fn get_info(host: &str) -> Result { - let mut buffer: [u8; 1024] = [0; 1024]; - let addr = host.parse().unwrap(); - let mut stream = TcpStream::connect_timeout(&addr, Duration::from_millis(1000))?; - - let _ = stream.read(&mut buffer)?; - println!("data recieved: {:?}", &buffer[0..20]); - match Commands::from(&mut buffer) { - Commands::Request(None) => { - println!("zeroing"); - buffer.zeroize(); - println!("writing"); - let sending_command = Commands::Info(None).to_string(); - println!("sending string: {:?} as_bytes: {:?}", &sending_command, &sending_command.as_bytes()); - stream.write_all(sending_command.as_bytes())?; - stream.flush()?; - println!("reading"); - let bytes = stream.read(&mut buffer)?; - println!("new buffer size: {:?} contents: {:?}", bytes, &buffer[0..20]); - println!("commanding"); - Ok(Commands::from(String::from(String::from_utf8_lossy(&buffer)))) - }, - _ => { - Err(io::Error::new(io::ErrorKind::InvalidData, "the data was not expected")) - } - } - } - - pub fn get_clients(&self) { - - } -} diff --git a/src/commands/behaviors.rs b/src/commands/behaviors.rs deleted file mode 100644 index a5facd9..0000000 --- a/src/commands/behaviors.rs +++ /dev/null @@ -1,85 +0,0 @@ -struct Request {} - -struct Info {} - -struct Connect {} - -struct Disconnect {} - -struct ClientUpdate {} - -struct ClientInfo {} - -struct ClientRemove {} - -struct Client {} - -struct Success {} - -struct Error {} - -trait ClientRunnables { - fn client_execution(client: &Client); -} - -impl Runnables for Request { - fn run() { - } -} - -impl ClientRunnables for Info { - fn client_execution(client: &Client) { - let params = client.get_server_info(); - let command = Commands::Success(Some(params)); - - client.transmit_data(command.to_string().as_str()); - } -} - -impl Runnables for Connect { - fn run() { - } -} - -impl Runnables for Disconnect { - fn run() { - } -} - -impl ClientRunnables for ClientUpdate { - fn client_execution(client: &Client) { - let mut command = Commands::Success(None); - client.transmit_data(command.to_string().as_str()); - - let data: HashMap = [(String::from("uuid"), client.get_uuid())].iter().cloned().collect(); - let command = Commands::ClientUpdate(Some(data)); - - self.server.update_all_clients(self.uuid.as_str(), command); - - } -} - -impl Runnables for ClientInfo { - fn run() { - } -} - -impl Runnables for ClientRemove { - fn run() { - } -} - -impl Runnables for Client { - fn run() { - } -} - -impl Runnables for Success { - fn run() { - } -} - -impl Runnables for Error { - fn run() { - } -} diff --git a/src/commands/mod.rs b/src/commands/mod.rs deleted file mode 100644 index 60d4605..0000000 --- a/src/commands/mod.rs +++ /dev/null @@ -1,259 +0,0 @@ -use std::string::ToString; -use std::collections::HashMap; -use std::str::FromStr; - -use std::borrow::Borrow; -use regex::Regex; -use std::ops::Index; -use log::info; -use zeroize::Zeroize; -//use dashmap::DashMap; - -#[derive(Clone, Debug)] -pub enum Commands { - /* TODO: this is the new commands system but still needs work. - * Will be fixed soon, but continue with old version at the - * moment. - * - // Common fields: - executable: T, - params: Option>, - - // Variants: - Request {}, - Info {}, - - Connect {}, - Disconnect {}, - - ClientUpdate {}, - ClientInfo {}, - ClientRemove {}, - Client {}, - - Success {}, - Error {}, - */ - - Request(Option>), - Info(Option>), - - HeartBeat(Option>), - - Connect(Option>), - Disconnect(Option>), - - ClientUpdate(Option>), - ClientInfo(Option>), - ClientRemove(Option>), - Client(Option>), - - Success(Option>), - Error(Option>), -} - -#[derive(Debug)] -pub enum CommandParseError { - UnknownCommand, - NoString, -} - -/*trait Operations { - fn execute(&self); -}*/ - -impl Commands { - /*fn get_executable(&self) -> &T { - self.executable - } - - fn get_params(&self) -> &Option> { - self.params - }*/ - - fn compare_params(&self, params: &Option>, other_params: &Option>) -> bool { - match (params, other_params) { - (None, Some(_other_params)) => false, - (Some(_params), None) => false, - (None, None) => true, - (Some(params), Some(other_params)) => { - let mut result = false; - - if params.len() == other_params.len() { - for (key, value) in params.iter() { - if let Some(other_value) = other_params.get(key) { - if value != other_value { - result = false; - break; - } else { - result = true; - } - } - } - } - - result - }, - } - } -} - -/*impl Operations for Commands { - fn execute(&self) { - self.executable.run(); - } -}*/ - -impl PartialEq for Commands { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Commands::Request(params), Commands::Request(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Info(params), Commands::Info(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Connect(params), Commands::Connect(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Disconnect(params), Commands::Disconnect(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::ClientUpdate(params), Commands::ClientUpdate(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::ClientInfo(params), Commands::ClientInfo(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::ClientRemove(params), Commands::ClientRemove(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Client(params), Commands::Client(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Success(params), Commands::Success(other_params)) => self.compare_params(¶ms, &other_params), - (Commands::Error(params), Commands::Error(other_params)) => self.compare_params(¶ms, &other_params), - _ => false, - } - } -} - - -impl ToString for Commands { - - fn to_string(&self) -> std::string::String { - let mut out_string = String::new(); - - let (command, parameters) = match self { - Commands::Request(arguments) => { ("!request:", arguments) }, - Commands::Info(arguments) => { ("!info:", arguments) }, - Commands::HeartBeat(arguments) => {("!heartbeat:", arguments)}, - Commands::Connect(arguments) => { ("!connect:", arguments) }, - Commands::Disconnect(arguments) => { ("!disconnect:", arguments) }, - Commands::ClientUpdate(arguments) => { ("!clientUpdate:", arguments) }, - Commands::ClientInfo(arguments) => { ("!clientInfo:", arguments) }, - Commands::ClientRemove(arguments) => { ("!clientRemove", arguments) } - Commands::Client(arguments) => { ("!client:", arguments) }, - Commands::Success(arguments) => { ("!success:", arguments) }, - Commands::Error(arguments) => { ("!error:", arguments) }, - }; - - out_string.push_str(command); - - if parameters.is_some() { - let hash_map = parameters.borrow().as_ref().unwrap(); - for (k, v) in hash_map.iter() { - out_string.push_str(" "); - out_string.push_str(k.as_str()); - out_string.push_str(":"); - - if v.contains(":") { - out_string.push_str(format!("\"{}\"",v.as_str()).as_str()) - } else { - out_string.push_str(v.as_str()); - } - } - } - out_string - } -} - -impl FromStr for Commands { - type Err = CommandParseError; - - fn from_str(data: &str) -> std::result::Result { - let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9@\-\+\[\]{}_=/.]+|("(.*?)")+)"###).unwrap(); - let mut iter = regex.find_iter(data); - let command_opt = iter.next(); - - if command_opt.is_none() { - return Err(CommandParseError::NoString); - } - let command = command_opt.unwrap().as_str(); - - - println!("command parsed to: {:?}", command); - - let mut map: HashMap = HashMap::new(); - - for i in iter { - let parameter = i.as_str().to_string(); - let parts:Vec<&str> = parameter.split(":").collect(); - - map.insert(parts.index(0).to_string(), parts.index(1).to_string()); - } - - let params = if map.capacity() > 0 {Some(map)} else { None }; - - Ok(match command { - "!request:" => Commands::Request(params), - "!info:" => Commands::Info(params), - - "!heartbeat:" => Commands::HeartBeat(params), - - "!connect:" => Commands::Connect(params), - "!disconnect:" => Commands::Disconnect(params), - - "!clientUpdate:" => Commands::ClientUpdate(params), - "!clientInfo:" => Commands::ClientInfo(params), - "!client:" => Commands::Client(params), - "!clientRemove:" => Commands::ClientRemove(params), - - "!success:" => Commands::Success(params), - "!error:" => Commands::Error(params), - - _ => Commands::Error(None), - }) - } -} - -impl From for Commands { - fn from(data: String) -> Self { - if let Ok(data) = data.as_str().parse() { - data - } else { - info!("Command: failed to parse with"); - Commands::Error(None) - } - } -} - -impl From<&mut [u8; 1024]> for Commands { - fn from(data: &mut [u8; 1024]) -> Self { - let incoming_message = String::from(String::from_utf8_lossy(data)); - data.zeroize(); - Commands::from(incoming_message) - } -} - -// TODO: check if unit tests still work -/*#[cfg(test)] -mod test_commands_v2 { - #![feature(test)] - use super::Commands; - use std::collections::HashMap; - use std::str::FromStr; - use super::CommandParseError; - - #[test] - fn test_creation_from_string() { - let command_result = Commands::from_str("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456"); - } - - #[test] - fn test_to_string() { - - let mut a: HashMap = HashMap::new(); - a.insert("name".to_string(), "michael".to_string()); - a.insert("host".to_string(), "127.0.0.1".to_string()); - a.insert("uuid".to_string(), "123456-1234-1234-123456".to_string()); - - let command = Commands::Connect(Some(a)); - - println!("{:?}", command.to_string()) - } -}*/ diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 4e15e7d..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,104 +0,0 @@ -use std::thread; -use crossbeam::{unbounded , Sender, Receiver}; -use std::sync::Arc; -use std::sync::Mutex; - -enum Message { - NewJob(Job), - Terminate, -} - -#[derive(Debug)] -pub struct ThreadPool{ - workers: Vec, - sender: Sender, -} - -type Job = Box; - -impl ThreadPool{ - /// Create a new ThreadPool. - /// - /// The size is the number of threads in the pool. - /// - /// # Panics - /// - /// The `new` function will panic if the size is zero. - pub fn new(size: usize) -> ThreadPool { - assert!(size > 0); - - let (sender, receiver) = unbounded(); - - let receiver = Arc::new(Mutex::new(receiver)); - - let mut workers = Vec::with_capacity(size); - - for id in 0..size { - // create some threads and store them in the vector - workers.push(Worker::new(id, Arc::clone(&receiver))); - } - - ThreadPool { - workers, - sender, - } - } - - pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static { - let job = Box::new(f); - - self.sender.send(Message::NewJob(job)).unwrap(); - } -} - -#[derive(Debug)] -struct Worker { - id: usize, - thread: Option>, -} - -impl Worker { - fn new(id: usize, receiver: Arc>>) -> Worker { - let thread = thread::spawn(move || { - loop{ - let message = receiver.lock().unwrap().recv().unwrap(); - - match message { - Message::NewJob(job) => { - println!("Worker {} got a job; executing.", id); - job(); - }, - Message::Terminate => { - println!("Worker {} was told to terminate.", id); - break; - }, - } - } - }); - - Worker { - id, - thread: Some(thread), - } - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - println!("Sending terminate message to all workers."); - - for _ in &mut self.workers { - self.sender.send(Message::Terminate).unwrap(); - } - - println!("Shutting down all workers."); - - for worker in &mut self.workers { - println!("Shutting down worker {}", worker.id); - - if let Some(thread) = worker.thread.take() { - thread.join().unwrap(); - } - } - } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 7f651d7..0000000 --- a/src/main.rs +++ /dev/null @@ -1,252 +0,0 @@ -mod client_api; -mod commands; -mod server; -mod lib; - -use cursive::{ - Cursive, - menu::*, - event::Key, - views::{ Dialog, TextView, LinearLayout, ListView, ResizedView, Panel }, - CursiveExt, - align::Align, - view::SizeConstraint, -}; -//use std::sync::Arc; -use std::time::Duration; -use std::sync::Arc; -use crossterm::ErrorKind; -use log::info; -use clap::{App, Arg}; - -use crate::server::server_profile::Server; - -fn main() -> Result<(), ErrorKind> { - let args = App::new("--rust chat server--") - .version("0.1.5") - .author("Mitchel Hardie , Michael Bailey ") - .about("this is a chat server developed in rust, depending on the version one of two implementations will be used") - .arg(Arg::with_name("graphical") - .short('g') - .takes_value(false) - .about("Enables graphical mode")) - .get_matches(); - - if args.is_present("graphical") { - let server = Server::new("Server-01", "0.0.0.0:6000", "noreply@email.com"); - let server_arc = Arc::new(server); - let s1 = server_arc.clone(); - let s2 = s1.clone(); - - cursive::logger::init(); - - info!("Main: init display"); - let mut display = Cursive::default(); - - info!("Main: setting up callbacks"); - display.add_global_callback(Key::Backspace, |s| s.quit()); - display.add_global_callback(Key::Tab, |s| s.toggle_debug_console()); - display.add_global_callback(Key::Esc, |s| s.select_menubar()); - - info!("Main: setting up menu bar"); - let _ = display.menubar() - .add_subtree("Server", - MenuTree::new() - .leaf("about", - |s| s.add_layer(about())) - .delimiter() - .leaf("quit", |s| s.quit())) - .add_subtree("File", - MenuTree::new() - .leaf("Start", move |_s| {let _ = s1.start();}) - .leaf("Stop", move |_s| {let _ = s2.stop();}) - .delimiter() - .leaf("Debug", |s| {s.toggle_debug_console();})); - info!("Main: entering loop"); - display.add_layer(control_panel()); - display.run(); - Ok(()) - } else { - let server = Server::new("Server-01", "0.0.0.0:6000", "noreply@email.com"); - - server.start()?; - loop {std::thread::sleep(Duration::from_secs(1));} - } -} - -fn about() -> Dialog { - Dialog::new() - .content(TextView::new("Rust-Chat-Server\nmade by\n Mitchell Hardie\nMichael Bailey\nMit Licence") - ).button("Close", |s| {let _ = s.pop_layer(); s.add_layer(control_panel())} ) -} - -#[allow(dead_code)] -fn launch_screen() -> Dialog { - Dialog::new() - .content(TextView::new("\ - Server. - * press for menu bar - * press for debug (FIXME) - * press to exit. - ").align(Align::center())) - .button("ok", |s| {s.pop_layer();}) -} - -fn control_panel() -> ResizedView> { - - let mut root = LinearLayout::horizontal(); - let mut left = LinearLayout::vertical(); - let mut right = ListView::new(); - right.add_child("test", TextView::new("")); - right.add_child("test", TextView::new("")); - right.add_delimiter(); - right.add_child("test", TextView::new("")); - right.add_child("test", TextView::new("")); - - left.add_child(TextView::new("Hello world")); - - root.add_child(ResizedView::new(SizeConstraint::Full, SizeConstraint::Full, Panel::new(left))); - root.add_child(ResizedView::new(SizeConstraint::Full, SizeConstraint::Full, Panel::new(right))); - ResizedView::new(SizeConstraint::Fixed(60), SizeConstraint::Fixed(18), Panel::new(root)) -} - -// MARK: - general testing zone -#[cfg(test)] -mod tests { - use crate::server::server_profile::Server; - use crate::client_api::ClientApi; - use std::collections::HashMap; - use crate::commands::Commands; - use std::{thread, time}; - use std::time::Duration; - - #[test] - fn test_server_info() { - // setup the server - let name = "Server-01"; - let address = "0.0.0.0:6000"; - let owner = "noreply@email.com"; - - let server = Server::new(name, address, owner); - let result = server.start(); - - assert_eq!(result.is_ok(), true); - - let dur = time::Duration::from_millis(1000); - thread::sleep(dur); - - let api = ClientApi::get_info("127.0.0.1:6000"); - assert_eq!(api.is_ok(), true); - if let Ok(api) = api { - println!("received: {:?}", api); - let mut map = HashMap::new(); - map.insert("name".to_string(), name.to_string()); - map.insert("owner".to_string(), owner.to_string()); - - let expected = Commands::Info(Some(map)); - println!("expected: {:?}", expected); - assert_eq!(api, expected); - } - } - - #[test] - fn test_server_connect() { - let name = "Server-01"; - let address = "0.0.0.0:6001"; - let owner = "noreply@email.com"; - - let server = Server::new(name, address, owner); - let _ = server.start().unwrap(); - - let api_result = ClientApi::new(address); - assert_eq!(api_result.is_ok(), true); - if let Ok(api) = api_result { - std::thread::sleep(std::time::Duration::from_secs(2)); - } - } -} - -#[cfg(test)] -mod crypto_tests { - use openssl::rsa::{Rsa, Padding}; - use openssl::ssl::{SslMethod, SslAcceptor, SslStream, SslFiletype, SslConnector, SslVerifyMode}; - use std::net::{TcpListener, TcpStream}; - use std::sync::Arc; - use std::thread; - use std::str; - - #[test] - // MARK: - working encryption example for rsa - fn gen_rsa() { - let rsa = Rsa::generate(1024).unwrap(); - - let ref1 = rsa.public_key_to_pem().unwrap(); - let ref2 = rsa.private_key_to_pem().unwrap(); - - let public = str::from_utf8(&ref1).unwrap().to_string(); - let private = str::from_utf8(&ref2).unwrap().to_string(); - - println!("public key size: {}", public.len()); - println!("{}", public); - - println!("private key size: {}", private.len()); - println!("{}", private); - - let data = b"this is a sentence"; - println!("before: {:?}", data); - - let mut buf = vec![0; rsa.size() as usize]; - let encrypted_len = rsa.private_encrypt(data, &mut buf, Padding::PKCS1).unwrap(); - println!("during: {:?}", &buf); - - let mut buf2 = vec![0; rsa.size() as usize]; - let _ = rsa.public_decrypt(&mut buf, &mut buf2, Padding::PKCS1).unwrap(); - println!("after: {:?}", &buf2); - } - - #[test] - fn tls_handshake() { - // spawn the server - thread::spawn(|| { - println!("creating acceptor"); - let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); - acceptor.set_private_key_file("cert.pem", SslFiletype::PEM).unwrap(); - acceptor.set_certificate_chain_file("root.pem").unwrap(); - acceptor.check_private_key().unwrap(); - let acceptor = Arc::new(acceptor.build()); - - let listener = TcpListener::bind("0.0.0.0:6000").unwrap(); - - println!("entering loop"); - loop { - for stream in listener.incoming() { - println!("client accepted"); - match stream { - Ok(stream) => { - let acceptor = acceptor.clone(); - thread::spawn(move || { - let mut stream = acceptor.accept(stream).unwrap(); - - let mut buffer: [u8; 1024] = [0; 1024]; - - stream.ssl_read(&mut buffer).unwrap(); - let result = str::from_utf8(&buffer).unwrap(); - if buffer == "echo".as_bytes() { - let _ = stream.ssl_write("echo".as_bytes()).unwrap(); - } - }); - } - Err(e) => { /* connection failed */ } - } - } - } - }); - - let connector = SslConnector::builder(SslMethod::tls()).unwrap().build(); - - let stream = TcpStream::connect("localhost:6000").unwrap(); - let mut stream = connector.connect("127.0.0.1", stream).unwrap(); - - let _ = stream.ssl_write("echo".as_bytes()).unwrap(); - } -} \ No newline at end of file diff --git a/src/server/client/client_profile.rs b/src/server/client/client_profile.rs deleted file mode 100644 index e71db70..0000000 --- a/src/server/client/client_profile.rs +++ /dev/null @@ -1,221 +0,0 @@ -extern crate regex; - -use std::{ - sync::Arc, - sync::Mutex, - net::{Shutdown, TcpStream}, - io::prelude::*, - io::Error, - //collections::HashMap, - time::{Instant, Duration}, - io, -}; - -use crossbeam::{ - Sender, - Receiver, - TryRecvError, - unbounded -}; - -use openssl::rsa::Rsa; -use log::info; - -use crate::{ - server::{ - //server_profile::Server, - server_profile::ServerMessages, - }, - commands::Commands - -}; - -//use parking_lot::FairMutex; -//use dashmap::DashMap; - -#[derive(Debug)] -pub struct Client { - uuid: String, - username: String, - address: String, - - last_heartbeat: Arc>, - - stream_arc: Arc>, - - pub sender: Sender, - receiver: Receiver, - - server_sender: Sender, -} - -impl Client { - pub fn new(stream: TcpStream, server_sender: Sender, uuid: &str, username: &str, address: &str) -> Self { - let (sender, receiver): (Sender, Receiver) = unbounded(); - stream.set_read_timeout(Some(Duration::from_secs(1))).unwrap(); - - Client { - stream_arc: Arc::new(Mutex::new(stream)), - uuid: uuid.to_string(), - username: username.to_string(), - address: address.to_string(), - - sender, - receiver, - - server_sender, - - last_heartbeat: Arc::new(Mutex::new(Instant::now())), - } - } - - #[allow(dead_code)] - pub fn get_sender(&self) -> &Sender { - &self.sender - } - - #[allow(dead_code)] - pub fn get_uuid(&self) -> String { - self.uuid.clone() - } - - #[allow(dead_code)] - pub fn get_username(&self) -> String { - self.username.clone() - } - - #[allow(dead_code)] - pub fn get_address(&self) -> String { - self.address.clone() - } - - // TODO: - add heartbeat timer. - pub fn handle_connection(&mut self) { - let mut buffer = [0; 1024]; - - // TODO: - Check heartbeat - { - info!("heartbeat") - } - - info!("{}: handling connection", self.uuid); - match self.read_data(&mut buffer) { - Ok(command) => { - // match incomming commands - println!("command"); - match command { - Commands::Disconnect(None) => { - self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())).expect("sending message to server failed"); - self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed"); - }, - Commands::HeartBeat(None) => { - *self.last_heartbeat.lock().unwrap() = Instant::now(); - self.transmit_data(Commands::Success(None).to_string().as_str()); - }, - Commands::ClientUpdate(None) => { - self.transmit_data(Commands::Success(None).to_string().as_str()); - let _ = self.server_sender.send(ServerMessages::RequestUpdate(self.stream_arc.clone())); - }, - Commands::ClientInfo(Some(params)) => { - let uuid = params.get("uuid").unwrap(); - let _ = self.server_sender.send(ServerMessages::RequestInfo(uuid.clone(), self.stream_arc.clone())); - }, - // TODO: may or may not be needed? - Commands::Error(None) => { - }, - _ => { - self.transmit_data(Commands::Error(None).to_string().as_str()); - }, - } - }, - Err(_) => { - // no data was read - }, - } - - println!("buffer"); - // test to see if there is anything for the client to receive from its channel - match self.receiver.try_recv() { - /*command is on the channel*/ - Ok(Commands::ClientRemove(Some(params))) => { - let mut retry: u8 = 3; - 'retry_loop1: loop { - if retry < 1 { - self.transmit_data(Commands::Error(None).to_string().as_str()); - break 'retry_loop1 - } else { - self.transmit_data(Commands::ClientRemove(Some(params.clone())).to_string().as_str()); - - if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) { - break 'retry_loop1; - } else { - retry -= 1; - } - } - } - }, - Ok(Commands::Client(Some(params))) => { - let mut retry: u8 = 3; - 'retry_loop2: loop { - if retry < 1 { - self.transmit_data(Commands::Error(None).to_string().as_str()); - break 'retry_loop2; - } else { - self.transmit_data(Commands::Client(Some(params.clone())).to_string().as_str()); - - if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) { - break 'retry_loop2; - } else { - retry -= 1; - } - } - } - - }, - /*no data available yet*/ - Err(TryRecvError::Empty) => {}, - _ => {}, - } - println!("---Client Thread Exit---"); - } - - // move into a drop perhaps - #[allow(dead_code)] - pub fn disconnect(&mut self){ - self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed"); - } - - pub fn transmit_data(&self, data: &str) { - println!("Transmitting data: {}", data); - - let error_result = self.stream_arc.lock().unwrap().write_all(data.to_string().as_bytes()); - if let Some(error) = error_result.err(){ - match error.kind() { - // handle disconnections - io::ErrorKind::NotConnected => { - let _ = self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())); - }, - _ => { }, - } - } - } - - fn read_data(&mut self, buffer: &mut [u8; 1024]) -> Result { - let _ = self.stream_arc.lock().unwrap().read(buffer)?; - let command = Commands::from(buffer); - - Ok(command) - } - -} - -impl ToString for Client { - fn to_string(&self) -> std::string::String { todo!() } -} - -impl Drop for Client { - fn drop(&mut self) { - let _ = self.stream_arc.lock().unwrap().write_all(Commands::Disconnect(None).to_string().as_bytes()); - let _ = self.stream_arc.lock().unwrap().shutdown(Shutdown::Both); - } -} diff --git a/src/server/client/mod.rs b/src/server/client/mod.rs deleted file mode 100644 index c0ef8d2..0000000 --- a/src/server/client/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod client_profile; diff --git a/src/server/mod.rs b/src/server/mod.rs deleted file mode 100644 index de07bb5..0000000 --- a/src/server/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod client; -pub mod server_profile; diff --git a/src/server/server_profile.rs b/src/server/server_profile.rs deleted file mode 100644 index 1f2a07e..0000000 --- a/src/server/server_profile.rs +++ /dev/null @@ -1,607 +0,0 @@ -extern crate regex; -extern crate rayon; - -use crate::{ - server::{ - client::client_profile::Client, - - }, - commands::Commands -}; - -use std::{ - sync::{Arc, Mutex}, - net::{TcpStream, TcpListener}, - collections::HashMap, - io::prelude::*, - time::Duration, - io::Error, - thread, - io -}; - -use log::info; - -use crossbeam_channel::{Sender, Receiver, unbounded}; -use rust_chat_server::ThreadPool; -//use zeroize::Zeroize; -//use parking_lot::FairMutex; -//use dashmap::DashMap; -//use regex::Regex; - -#[derive(Debug)] -pub enum ServerMessages { - RequestUpdate(Arc>), - RequestInfo(String, Arc>), - Disconnect(String), - Shutdown, -} - -// MARK: - server struct -#[derive(Debug)] -pub struct Server { - name: Arc, - address: Arc, - author: Arc, - - connected_clients: Arc>>, - - thread_pool: ThreadPool, - - sender: Sender, - receiver: Receiver, -} - -// MARK: - server implemetation -impl Server { - pub fn new(name: &str, address: &str, author: &str) -> Self { - let (sender, receiver) = unbounded(); - - Self { - name: Arc::new(name.to_string()), - address: Arc::new(address.to_string()), - author: Arc::new(author.to_string()), - connected_clients: Arc::new(Mutex::new(HashMap::new())), - thread_pool: ThreadPool::new(16), - - sender, - receiver, - } - } - - #[allow(dead_code)] - pub fn get_name(&self) -> String { - self.name.to_string() - } - - #[allow(dead_code)] - pub fn get_address(&self) -> String { - self.address.to_string() - } - - #[allow(dead_code)] - pub fn get_author(&self) -> String { - self.author.to_string() - } - - pub fn start(&self) -> Result<(), io::Error>{ - println!("server: starting server..."); - - // MARK: - creating clones of the server property references - let name = self.name.clone(); - #[allow(dead_code)] - let address = self.address.clone(); - let author = self.author.clone(); - let connected_clients = self.connected_clients.clone(); - let sender = self.sender.clone(); - let receiver = self.receiver.clone(); - - // set up listener and buffer - let mut buffer = [0; 1024]; - let listener = TcpListener::bind(self.get_address())?; - listener.set_nonblocking(true)?; - - println!("server: spawning threads"); - let _ = thread::Builder::new().name("Server Thread".to_string()).spawn(move || { - - 'outer: loop { - std::thread::sleep(Duration::from_millis(100)); - - // get messages from the servers channel. - println!("server: getting messages"); - for i in receiver.try_iter() { - match i { - ServerMessages::Shutdown => { - // TODO: implement disconnecting all clients and shutting down the server. - println!("server: shutting down..."); - break 'outer; - }, - ServerMessages::RequestUpdate(stream_arc) => { - for (_k, v) in connected_clients.lock().unwrap().iter() { - let mut stream = stream_arc.lock().unwrap(); - let _ = Server::transmit_data(&mut stream, v.to_string().as_str()); - - if Server::read_data(&mut stream, &mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) { - println!("Success Confirmed"); - } else { - println!("no success read"); - let error = Commands::Error(None); - let _ = Server::transmit_data(&mut stream, error.to_string().as_str()); - } - } - }, - ServerMessages::RequestInfo(uuid, stream_arc) => { - let mut stream = stream_arc.lock().unwrap(); - - if let Some(client) = connected_clients.lock().unwrap().get(&uuid) { - let params: HashMap = [(String::from("uuid"), client.get_uuid()), (String::from("name"), client.get_username()), (String::from("host"), client.get_address())].iter().cloned().collect(); - let command = Commands::Success(Some(params)); - let _ = Server::transmit_data(&mut stream, command.to_string().as_str()); - } else { - let command = Commands::Success(None); - let _ = Server::transmit_data(&mut stream, command.to_string().as_str()); - } - }, - ServerMessages::Disconnect(uuid) => { - let mut clients = connected_clients.lock().unwrap(); - clients.remove(&uuid.to_string()); - let params: HashMap = [(String::from("uuid"), uuid)].iter().cloned().collect(); - let command = Commands::ClientRemove(Some(params)); - let _ = connected_clients.lock().unwrap().iter().map(move |(_k, v)| {v.get_sender().send(command.clone())}); - }, - } - } - - println!("server: checking for new connections"); - if let Ok((mut stream, _addr)) = listener.accept() { - stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap(); - let _ = stream.set_nonblocking(false); - - let request = Commands::Request(None); - let _ = Server::transmit_data(&mut stream, &request.to_string().as_str()); - - match Server::read_data(&mut stream, &mut buffer) { - Ok(command) => { - println!("Server: new connection sent - {:?}", command); - match command { - Commands::Connect(Some(data)) => { - let uuid = data.get("uuid").unwrap(); - let username = data.get("name").unwrap(); - let address = data.get("host").unwrap(); - - println!("{}", format!("Server: new Client connection: _addr = {}", address )); - - let client = Client::new(stream, sender.clone(), &uuid, &username, &address); - - connected_clients.lock().unwrap().insert(uuid.to_string(), client); - - let params: HashMap = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect(); - let new_client = Commands::Client(Some(params)); - - let _ = connected_clients.lock().unwrap().iter().map(|(_k, v)| v.sender.send(new_client.clone())); - }, - // TODO: - correct connection reset error when getting info. - Commands::Info(None) => { - println!("Server: info requested"); - let params: HashMap = [(String::from("name"), name.to_string().clone()), (String::from("owner"), author.to_string().clone())].iter().cloned().collect(); - let command = Commands::Info(Some(params)); - - let _ = Server::transmit_data(&mut stream, command.to_string().as_str()); - }, - _ => { - println!("Server: Invalid command sent"); - let _ = Server::transmit_data(&mut stream, Commands::Error(None).to_string().as_str()); - }, - } - }, - Err(_) => println!("ERROR: stream closed"), - } - } - // TODO: end - - - // handle each client for messages - println!("server: handing control to clients"); - for (_k, client) in connected_clients.lock().unwrap().iter_mut() { - client.handle_connection(); - } - } - println!("server: stopped"); - }); - println!("server: started"); - Ok(()) - } - - pub fn stop(&self) { - info!("server: sending stop message"); - let _ = self.sender.send(ServerMessages::Shutdown); - } - - fn transmit_data(stream: &mut TcpStream, data: &str) -> Result<(), Error>{ - println!("Transmitting..."); - println!("data: {}", data); - - /* - * This will throw an error and crash any thread, including the main thread, if - * the connection is lost before transmitting. Maybe change to handle any exceptions - * that may occur. - */ - let _ = stream.write(data.to_string().as_bytes())?; - stream.flush()?; - Ok(()) - } - - fn read_data(stream: &mut TcpStream, buffer: &mut [u8; 1024]) -> Result { - let _ = stream.read(buffer)?; - let command = Commands::from(buffer); - - Ok(command) - } -} - -impl ToString for Server { - fn to_string(&self) -> std::string::String { todo!() } -} - -impl Drop for Server { - fn drop(&mut self) { - println!("server dropped"); - let _ = self.sender.send(ServerMessages::Shutdown); - } -} - - -/* The new version of the server no long works with these unit - * tests. - * They will be fixed soon! - * TODO: fix unit tests - */ - - - -/*#[cfg(test)] -mod tests{ - use super::*; - use std::{thread, time}; - use std::sync::Once; - use std::time::Duration; - - lazy_static!{ - static ref SERVER_NAME: &'static str = "test"; - static ref SERVER_ADDRESS: &'static str = "0.0.0.0:6000"; - static ref SERVER_AUTHOR: &'static str = "test"; - static ref SERVER: Server<'static> = Server::new(&SERVER_NAME, &SERVER_ADDRESS, &SERVER_AUTHOR); - } - - static START: Once = Once::new(); - - /* - * These tests must be executed individually to ensure that no errors - * occur, this is due to the fact that the server is created everytime. - * Setup a system for the server to close after every test. - */ - fn setup_server(){ - unsafe{ - START.call_once(|| { - thread::spawn(|| { - SERVER.start(); - }); - }); - - let millis = time::Duration::from_millis(1000); - thread::sleep(millis); - } - } - - fn establish_client_connection(uuid: &str) -> TcpStream { - let mut buffer = [0; 1024]; - - let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap(); - - let mut command = read_data(&stream, &mut buffer); - - assert_eq!(command, Commands::Request(None)); - - let msg: String = format!("!connect: uuid:{uuid} name:\"{name}\" host:\"{host}\"", uuid=uuid, name="alice", host="127.0.0.1"); - transmit_data(&stream, msg.as_str()); - - command = read_data(&stream, &mut buffer); - - assert_eq!(command, Commands::Success(None)); - - stream - } - - fn transmit_data(mut stream: &TcpStream, data: &str){ - stream.write(data.to_string().as_bytes()).unwrap(); - stream.flush().unwrap(); - } - - fn read_data(mut stream: &TcpStream, buffer: &mut [u8; 1024]) -> Commands { - match stream.read(buffer) { - Ok(_) => Commands::from(buffer), - Err(_) => Commands::Error(None), - } - } - - fn force_disconnect(mut stream: &TcpStream){ - let msg = "!disconnect:"; - transmit_data(&stream, msg); - } - - #[test] - fn test_server_connect(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap(); - - stream.read(&mut buffer).unwrap(); - let mut command = Commands::from(&mut buffer); - - assert_eq!(command, Commands::Request(None)); - - let msg = b"!connect: uuid:123456-1234-1234-123456 name:\"alice\" host:\"127.0.0.1\""; - stream.write(msg).unwrap(); - - stream.read(&mut buffer).unwrap(); - command = Commands::from(&mut buffer); - - assert_eq!(command, Commands::Success(None)); - - let msg = b"!disconnect:"; - stream.write(msg).unwrap(); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } - - #[test] - fn test_server_info(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap(); - - let command = read_data(&stream, &mut buffer); - - assert_eq!(command, Commands::Request(None)); - - let msg = "!info:"; - transmit_data(&stream, msg); - - let command = read_data(&stream, &mut buffer); - - let params: HashMap = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect(); - assert_eq!(command, Commands::Success(Some(params))); - } - - #[test] - fn test_client_info(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream = establish_client_connection("1234-5542-2124-155"); - - let msg = "!info:"; - transmit_data(&stream, msg); - - let command = read_data(&stream, &mut buffer); - - let params: HashMap = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect(); - assert_eq!(command, Commands::Success(Some(params))); - - let msg = "!disconnect:"; - transmit_data(&stream, msg); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } - - #[test] - fn test_clientUpdate_solo(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream = establish_client_connection("1222-555-6-7"); - - let msg = "!clientUpdate:"; - transmit_data(&stream, msg); - - let command = read_data(&stream, &mut buffer); - - assert_eq!(command, Commands::Success(None)); - - let msg = "!disconnect:"; - transmit_data(&stream, msg); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } - - - #[test] - fn test_clientUpdate_multi(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream_one = establish_client_connection("0001-776-6-5"); - let mut stream_two = establish_client_connection("0010-776-6-5"); - let mut stream_three = establish_client_connection("0011-776-6-5"); - let mut stream_four = establish_client_connection("0100-776-6-5"); - - let client_uuids: [String; 3] = [String::from("0010-776-6-5"), String::from("0011-776-6-5"), String::from("0100-776-6-5")]; - let mut user_1 = true; - let mut user_2 = true; - let mut user_3 = true; - - for uuid in client_uuids.iter() { - let command = read_data(&stream_one, &mut buffer); - - if *uuid == String::from("0010-776-6-5") && user_1 { - let params: HashMap = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - user_1 = false; - } else if *uuid == String::from("0011-776-6-5") && user_2 { - let params: HashMap = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - user_2 = false; - } else if *uuid == String::from("0100-776-6-5") && user_3 { - let params: HashMap = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - user_3 = false; - } else { - assert!(false); - } - let msg = "!success:"; - transmit_data(&stream_one, msg); - } - - stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap(); - let mut unsuccessful = true; - while unsuccessful { - let msg = "!clientUpdate:"; - transmit_data(&stream_one, msg); - - let command = read_data(&stream_one, &mut buffer); - match command.clone() { - Commands::Error(None) => println!("resending..."), - _ => { - assert_eq!(command, Commands::Success(None)); - unsuccessful = false; - }, - } - } - stream_one.set_read_timeout(None).unwrap(); - - for x in 0..3 { - let command = read_data(&stream_one, &mut buffer); - - let command_clone = command.clone(); - match command{ - Commands::Client(Some(params)) => { - let uuid = params.get("uuid").unwrap(); - - if *uuid == String::from("0010-776-6-5") { - let params: HashMap = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command_clone, Commands::Client(Some(params))); - } else if *uuid == String::from("0011-776-6-5") { - let params: HashMap = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command_clone, Commands::Client(Some(params))); - } else if *uuid == String::from("0100-776-6-5") { - let params: HashMap = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command_clone, Commands::Client(Some(params))); - } else { - assert!(false); - } - }, - _ => assert!(false), - } - - let msg = "!success:"; - transmit_data(&stream_one, msg); - } - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - - let msg = "!disconnect:"; - transmit_data(&stream_one, msg); - transmit_data(&stream_two, msg); - transmit_data(&stream_three, msg); - transmit_data(&stream_four, msg); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } - - #[test] - fn test_clientInfo(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream_one = establish_client_connection("0001-776-6-5"); - let mut stream_two = establish_client_connection("\"0010-776-6-5\""); - - let command = read_data(&stream_one, &mut buffer); - let params: HashMap = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - let msg = "!success:"; - transmit_data(&stream_one, msg); - - - stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap(); - let mut unsuccessful = true; - while unsuccessful { - let msg = "!clientInfo: uuid:\"0010-776-6-5\""; - transmit_data(&stream_one, msg); - - let command = read_data(&stream_one, &mut buffer); - match command.clone() { - Commands::Error(None) => println!("resending..."), - _ => { - let params: HashMap = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Success(Some(params))); - unsuccessful = false; - }, - } - } - stream_one.set_read_timeout(None).unwrap(); - - let msg = "!disconnect:"; - transmit_data(&stream_one, msg); - transmit_data(&stream_two, msg); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } - - #[test] - fn test_client_disconnect(){ - let mut buffer = [0; 1024]; - - setup_server(); - - let mut stream_one = establish_client_connection("0001-776-6-5"); - let mut stream_two = establish_client_connection("0010-776-6-5"); - - let command = read_data(&stream_one, &mut buffer); - let params: HashMap = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - let msg = "!success:"; - transmit_data(&stream_one, msg); - - let msg = "!disconnect:"; - transmit_data(&stream_two, msg); - - let command = read_data(&stream_one, &mut buffer); - let params: HashMap = [(String::from("uuid"), String::from("0010-776-6-5"))].iter().cloned().collect(); - assert_eq!(command, Commands::Client(Some(params))); - - let msg = "!success:"; - transmit_data(&stream_one, msg); - - stream_one.set_read_timeout(Some(Duration::from_millis(2000))).unwrap(); - match stream_one.peek(&mut buffer) { - Ok(_) => assert!(false), - Err(_) => assert!(true), - } - stream_one.set_read_timeout(None).unwrap(); - - let msg = "!disconnect:"; - transmit_data(&stream_one, msg); - - let dur = time::Duration::from_millis(500); - thread::sleep(dur); - } -}*/