diff --git a/.gitignore b/.gitignore index d422aab..ad414a9 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ Cargo.lock .vscode/launch.json *.cer *.pem +.vscode/settings.json diff --git a/Cargo.toml b/Cargo.toml index 5227718..4d850dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,20 +8,20 @@ edition = "2018" [dependencies] regex = "1" -uuid = "0.8" -crossbeam = "0.7" -crossbeam-channel = "0.4" -crossbeam-utils = "0.7" -crossbeam-queue = "0.2" -parking_lot = "0.10" -dashmap = "3.11.4" +crossbeam = "0.8.0" +crossbeam-channel = "0.5.0" +crossbeam-queue = "0.3.1" +parking_lot = "0.11.1" +dashmap = "4.0.2" rayon = "1.3.1" zeroize = "1.1.0" -crossterm = "0.17.7" -clap = "3.0.0-beta.1" +crossterm = "0.19.0" +clap = "2.33.3" log = "0.4" -serde = { version = "1.0", features = ["derive"] } url = "2.2.0" +uuid = {version = "0.8", features = ["serde", "v4"]} +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" [profile.dev] diff --git a/src/commands/behaviors.rs b/src/lib/commands/behaviors.rs similarity index 100% rename from src/commands/behaviors.rs rename to src/lib/commands/behaviors.rs diff --git a/src/commands/mod.rs b/src/lib/commands/mod.rs similarity index 97% rename from src/commands/mod.rs rename to src/lib/commands/mod.rs index a4a50a2..6d88eee 100644 --- a/src/commands/mod.rs +++ b/src/lib/commands/mod.rs @@ -230,6 +230,14 @@ impl From<&mut [u8; 1024]> for Commands { } } +impl From<&mut Vec> for Commands { + fn from(data: &mut Vec) -> Self { + let incoming_message = String::from(String::from_utf8_lossy(data)); + data.zeroize(); + Commands::from(incoming_message) + } +} + // TODO: check if unit tests still work /*#[cfg(test)] mod test_commands_v2 { diff --git a/src/lib/foundation/mod.rs b/src/lib/foundation/mod.rs new file mode 100644 index 0000000..a5269c4 --- /dev/null +++ b/src/lib/foundation/mod.rs @@ -0,0 +1,7 @@ +pub trait IMessagable { + fn send_message(&self, msg: M); +} + +pub trait ICooperative { + fn tick(&self); +} \ No newline at end of file diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 3dbcdd0..1e515f8 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -1,12 +1,14 @@ // pub mod commands; pub mod prelude; pub mod server; +pub mod foundation; +pub mod commands; use std::sync::Arc; use std::sync::Mutex; use std::thread; -use crossbeam::{unbounded, Receiver, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; enum Message { NewJob(Job), diff --git a/src/lib/server/client_management/client/mod.rs b/src/lib/server/client_management/client/mod.rs index 2401199..d0ace26 100644 --- a/src/lib/server/client_management/client/mod.rs +++ b/src/lib/server/client_management/client/mod.rs @@ -2,19 +2,42 @@ // pub mod client_v3; pub mod traits; -use serde::{Serialize, Deserialize}; -use std::net::TcpStream; -use std::sync::Weak; -use std::sync::Arc; -use uuid::Uuid; use std::cmp::Ordering; +use std::net::TcpStream; +use std::sync::Mutex; +use std::sync::Arc; +use std::io::{BufReader, BufWriter}; +use std::io::BufRead; -use super::ClientManager; -use traits::TClient; +use uuid::Uuid; +use serde::{Serialize, Deserialize}; +use crossbeam_channel::{Sender, Receiver, unbounded}; +use traits::IClient; +use crate::lib::foundation::{ICooperative, IMessagable}; +use crate::lib::server::ServerMessages; + +/// # ClientMessage +/// This enum defined the message that a client can receive from the server +/// This uses the serde library to transform to and from json. +#[derive(Serialize, Deserialize)] pub enum ClientMessage { - a, - b, + Disconnect {id: String}, + Update {id: String}, + + ServerMessage {id: String, msg: String}, + + NewMessage {id: String, from_user_id: String, msg: String}, + NewgroupMessage {id: String, from_group_id: String, from_user_id: String, msg: String}, +} + +/// # ClientSocketMessage +/// This enum defines a message that can be sent from a client to the server once connected +/// This uses the serde library to transform to and from json. +#[derive(Serialize, Deserialize)] +pub enum ClientSocketMessage { + Disconnect {id: String}, + SendMessage {id: String, to_user_id: String, msg: String} } /// # Client @@ -27,31 +50,129 @@ pub enum ClientMessage { /// /// - stream: The socket for the connected client. /// - owner: An optional reference to the owning object. -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize)] pub struct Client { - pub uuid: String, + pub uuid: Uuid, username: String, address: String, - #[serde(skip)] - stream: Option, + // non serializable + #[serde(skip)] + server_channel: Option>, #[serde(skip)] - owner: Option> + input: Sender, + + #[serde(skip)] + output: Receiver, + + #[serde(skip)] + stream: Mutex>, + + #[serde(skip)] + stream_reader: Mutex>>, + + #[serde(skip)] + stream_writer: Mutex>>, + } -impl TClient for Client { - fn new(uuid: Uuid, name: String, addr: String) -> Arc { todo!() } +// client funciton implmentations +impl IClient for Client { + fn new( + uuid: String, + username: String, + address: String, + stream: TcpStream, + server_channel: Sender + ) -> Arc { + let (sender, receiver) = unbounded(); - fn send(&self, bytes: Vec) -> Result<(), &str> { todo!() } + let out_stream = stream.try_clone().unwrap(); + let in_stream = stream.try_clone().unwrap(); + + Arc::new(Client { + username, + uuid: Uuid::parse_str(&uuid).expect("invalid id"), + address, + + server_channel: Some(server_channel), + + input: sender, + output: receiver, + + stream: Mutex::new(Some(stream)), + + stream_reader: Mutex::new(Some(BufReader::new(in_stream))), + stream_writer: Mutex::new(Some(BufWriter::new(out_stream))), + }) + } + + // MARK: - removeable + fn send(&self, _bytes: Vec) -> Result<(), &str> { todo!() } fn recv(&self) -> Option> { todo!() } - - fn send_msg(&self, msg: ClientMessage) -> Result<(), &str> { todo!() } - fn recv_msg(&self) -> Option { todo!() } - - fn tick(&self) { } + // Mark: end - } +impl IMessagable for Client{ + fn send_message(&self, msg: ClientMessage) { + self.input.send(msg).expect("failed to send message to client."); + } +} + +// cooperative multitasking implementation +impl ICooperative for Client { + fn tick(&self) { + // aquire locks (so value isn't dropped) + let mut reader_lock = self.stream_reader.lock().unwrap(); + let mut writer_lock = self.stream_writer.lock().unwrap(); + + // aquiring mutable buffers + let reader = reader_lock.as_mut().unwrap(); + let _writer = writer_lock.as_mut().unwrap(); + + // create buffer + let mut buffer = String::new(); + + // loop over all lines that have been sent. + while let Ok(_size) = reader.read_line(&mut buffer) { + let command = serde_json::from_str::(buffer.as_str()).unwrap(); + + match command { + ClientSocketMessage::Disconnect {id} => println!("got Disconnect from id: {:?}", id), + _ => println!("New command found"), + } + } + + + // handle incomming messages + + } +} + +// default value implementation +impl Default for Client { + fn default() -> Self { + let (sender, reciever) = unbounded(); + Client { + username: "generic_client".to_string(), + uuid: Uuid::new_v4(), + address: "127.0.0.1".to_string(), + + output: reciever, + input: sender, + + server_channel: None, + + stream: Mutex::new(None), + + stream_reader: Mutex::new(None), + stream_writer: Mutex::new(None), + } + } +} + +// MARK: - used for sorting. impl PartialEq for Client { fn eq(&self, other: &Self) -> bool { self.uuid == other.uuid @@ -61,14 +182,14 @@ impl PartialEq for Client { impl Eq for Client { } +impl PartialOrd for Client { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl Ord for Client { fn cmp(&self, other: &Self) -> Ordering { self.uuid.cmp(&other.uuid) } } - -impl PartialOrd for Client { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} \ No newline at end of file diff --git a/src/lib/server/client_management/client/traits.rs b/src/lib/server/client_management/client/traits.rs index 9129eb6..b0e3b23 100644 --- a/src/lib/server/client_management/client/traits.rs +++ b/src/lib/server/client_management/client/traits.rs @@ -1,6 +1,9 @@ use std::sync::Arc; +use std::net::TcpStream; -use uuid::Uuid; +use crossbeam_channel::Sender; + +use crate::lib::server::ServerMessages; /// # TClient /// This trait represents the methods that a client must implement @@ -12,14 +15,15 @@ use uuid::Uuid; /// - recv: if there is a message in the queue, returns the message /// - send_msg: sends a event message to the client /// - recv_msg: used by the client to receive and process event messages -pub trait TClient { - fn new(uuid: Uuid, name: String, addr: String) -> Arc; +pub trait IClient { + fn new( + uuid: String, + username: String, + address: String, + stream: TcpStream, + server_channel: Sender + ) -> Arc; fn send(&self, bytes: Vec) -> Result<(), &str>; fn recv(&self) -> Option>; - - fn send_msg(&self, msg: TClientMessage) -> Result<(), &str>; - fn recv_msg(&self) -> Option; - - fn tick(&self); } \ No newline at end of file diff --git a/src/lib/server/client_management/mod.rs b/src/lib/server/client_management/mod.rs index 8c3c640..206af7e 100644 --- a/src/lib/server/client_management/mod.rs +++ b/src/lib/server/client_management/mod.rs @@ -1,110 +1,102 @@ pub mod client; -mod traits; +pub mod traits; +// use crate::lib::server::ServerMessages; use std::sync::Arc; use std::sync::Mutex; -use std::sync::Weak; +use std::collections::HashMap; use crossbeam_channel::{unbounded, Receiver, Sender}; - use uuid::Uuid; use self::client::Client; use self::client::ClientMessage; -// use client::client_v3::Client; use self::traits::TClientManager; -use client::traits::TClient; +use crate::lib::server::ServerMessages; +use crate::lib::foundation::IMessagable; +use crate::lib::foundation::ICooperative; -enum ClientManagerMessages {} +enum ClientManagerMessages { + #[allow(dead_code)] + DropAll, + #[allow(dead_code)] + MessageClient, +} /// # ClientManager /// This struct manages all connected users +#[derive(Debug)] pub struct ClientManager { - clients: Mutex>>, + clients: Mutex>>, - weak_self: Mutex>>, + server_channel: Sender, sender: Sender, receiver: Receiver, } impl ClientManager { - pub fn new() -> Arc { + pub fn new(server_channel: Sender) -> Arc { - let channels = unbounded(); + let (sender, receiver) = unbounded(); - let manager_ref: Arc = Arc::new(ClientManager { + Arc::new(ClientManager { clients: Mutex::default(), - weak_self: Mutex::default(), + server_channel, - sender: channels.0, - receiver: channels.1, - }); - - // get the reference - { - let mut lock = manager_ref.weak_self.lock().unwrap(); - let tmp = manager_ref.clone(); - *lock = Some(Arc::downgrade(&tmp)); - } - - manager_ref.set_ref(manager_ref.clone()); - manager_ref - } - - pub fn get_ref(&self) -> Weak { - self.weak_self.lock().unwrap().clone().unwrap() - } - - fn set_ref(&self, reference: Arc) { - let mut lock = self.weak_self.lock().unwrap(); - *lock = Some(Arc::downgrade(&reference)); + sender, + receiver, + }) } } impl TClientManager for ClientManager { fn add_client(&self, client: std::sync::Arc) { - self.clients.lock().unwrap().push(client); + self.clients.lock().unwrap().insert(client.uuid, client); } fn remove_client(&self, uuid: Uuid) { - let uuid_str = uuid.to_string(); - let mut client_list = self.clients.lock().unwrap(); - client_list.sort(); - if let Ok(index) = client_list.binary_search_by(move |client| client.uuid.cmp(&uuid_str)) { - client_list.remove(index); - } + let _ = self.clients.lock().unwrap().remove(&uuid); } - fn message_client(&self, id: Uuid, msg: ClientMessage) -> Result<(), &str> { - let uuid_str = id.to_string(); - let mut client_list = self.clients.lock().unwrap(); - client_list.sort(); - if let Ok(index) = client_list.binary_search_by(move |client| client.uuid.cmp(&uuid_str)) { - if let Some(client) = client_list.get(index) { - let _ = client.send_msg(msg); - } - } - Ok(()) - } - - fn tick(&self) { - let client_list = self.clients.lock().unwrap(); - let _ = client_list.iter().map(|client| client.tick()); + fn send_message_to_client(&self, uuid: Uuid, msg: ClientMessage) { + let clients = self.clients.lock().unwrap(); + let client = clients.get(&uuid).unwrap(); + client.send_message(msg); } } +impl ICooperative for ClientManager { + fn tick(&self) { + + for message in self.receiver.iter() { + match message { + ClientManagerMessages::DropAll => { + println!("cannot drop all clients yet") + } + _ => println!("[Client Manager]: method not implemented") + } + } + + // allocate time for clients. + let clients = self.clients.lock().unwrap(); + let _ = clients.iter().map(|(_uuid, client)| client.tick()); + } +} + + #[cfg(test)] mod test { - use super::ClientManager; - use std::sync::Arc; + // use super::ClientManager; + // use std::sync::Arc; + // use crate::lib::Foundation::{IOwner}; #[test] fn test_get_ref() { - let client_manager = ClientManager::new(); - let _cm_ref = client_manager.get_ref(); - assert_eq!(Arc::weak_count(&client_manager), 2); + // let client_manager = ClientManager::new(); + // let _cm_ref = client_manager.get_ref(); + // assert_eq!(Arc::weak_count(&client_manager), 2); } #[test] diff --git a/src/lib/server/client_management/traits.rs b/src/lib/server/client_management/traits.rs index 06c6243..d2bc94a 100644 --- a/src/lib/server/client_management/traits.rs +++ b/src/lib/server/client_management/traits.rs @@ -1,3 +1,4 @@ +use crate::lib::server::client_management::client::ClientMessage; use std::sync::Arc; use uuid::Uuid; @@ -7,7 +8,6 @@ use uuid::Uuid; */ pub trait TClientManager { fn add_client(&self, client: Arc); - fn remove_client(&self, id: Uuid); - fn message_client(&self, id: Uuid, msg: TClientMessage) -> Result<(), &str>; - fn tick(&self, ); + fn remove_client(&self, uuid: Uuid); + fn send_message_to_client(&self, uuid: Uuid, msg: ClientMessage); } \ No newline at end of file diff --git a/src/lib/server/mod.rs b/src/lib/server/mod.rs index 5ab86fa..a2af064 100644 --- a/src/lib/server/mod.rs +++ b/src/lib/server/mod.rs @@ -1,5 +1,65 @@ pub mod client_management; -// pub mod server; -// pub mod server_v3; +pub mod network_manager; -pub struct _Server {} +use uuid::Uuid; +use crate::lib::server::network_manager::NetworkManager; +use std::sync::Arc; + +use crossbeam_channel::{Receiver, unbounded}; + +use crate::lib::server::client_management::ClientManager; +use crate::lib::server::client_management::traits::TClientManager; +use crate::lib::foundation::{ICooperative}; +use client_management::client::Client; + +/// # ServerMessages +/// This is used internally +#[derive(Debug)] +pub enum ServerMessages { + ClientConnected(Arc), + + #[allow(dead_code)] + ClientDisconnected(Uuid), +} + +pub struct Server { + client_manager: Arc, + network_manager: Arc, + + receiver: Receiver, +} + +impl Server { + pub fn new() -> Arc { + let (sender, receiver) = unbounded(); + + Arc::new(Server { + client_manager: ClientManager::new(sender.clone()), + + network_manager: NetworkManager::new("5600".to_string(), sender.clone()), + receiver, + }) + } +} + +impl ICooperative for Server{ + fn tick(&self) { + + // handle new messages loop + for message in self.receiver.try_iter() { + match message { + ServerMessages::ClientConnected(client) => { + self.client_manager.add_client(client); + }, + ServerMessages::ClientDisconnected(uuid) => { + self.client_manager.remove_client(uuid); + } + } + } + + // alocate time for other components + self.network_manager.tick(); + self.client_manager.tick(); + + } +} diff --git a/src/lib/server/network_manager/mod.rs b/src/lib/server/network_manager/mod.rs new file mode 100644 index 0000000..532b2a3 --- /dev/null +++ b/src/lib/server/network_manager/mod.rs @@ -0,0 +1,117 @@ +use crate::lib::server::Client; +use std::net::TcpListener; +use std::sync::Arc; +use std::io::BufReader; +use std::io::BufWriter; +use std::io::Write; +use std::io::BufRead; + +use serde::{Deserialize, Serialize}; +use crossbeam_channel::Sender; + +use crate::lib::server::ServerMessages; +use crate::lib::foundation::ICooperative; +use crate::lib::server::client_management::client::traits::IClient; + + +/// # NetworkSockIn +/// these messages can be sent by a client on connecting +#[derive(Serialize, Deserialize)] +enum NetworkSockIn { + Info, + Connect {uuid: String, username: String, address: String}, +} + +/// # NetworkSockOut +/// these messages are sent by the network manager on connecting and requesting +#[derive(Serialize, Deserialize)] +enum NetworkSockOut<'a> { + Request, + GotInfo {server_name: &'a str, server_owner: &'a str} +} + +// these are control signals from the server. +// pub enum NetworkMessages { + +// } + +pub struct NetworkManager { + listener: TcpListener, + server_channel: Sender, +} + +impl NetworkManager { + pub fn new( + port: String, + server_channel: Sender + ) -> Arc { + let mut address = "0.0.0.0:".to_string(); + address.push_str(&port); + + let listener = TcpListener::bind(address) + .expect("Could not bind to address"); + + Arc::new(NetworkManager { + listener, + server_channel, + }) + } +} + +impl ICooperative for NetworkManager { + fn tick(&self) { + // get all new connections + // handle each request + for connection in self.listener.incoming() { + if let Ok(stream) = connection { + + // create buffered writers + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); + + let mut buffer = String::new(); + + // request is always sent on new connection + writer.write_all( + serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes() + ).unwrap_or_default(); + writer.write_all(b"\n").unwrap_or_default(); + writer.flush().unwrap_or_default(); + + // read the new request into a buffer + let res = reader.read_line(&mut buffer); + if res.is_err() {continue;} + + // turn into enum for pattern matching + if let Ok(request) = serde_json::from_str::(&buffer) { + // perform action based on the enum + match request { + NetworkSockIn::Info => { + writer.write_all( + serde_json::to_string( + &NetworkSockOut::GotInfo { + server_name: "oof", + server_owner: "michael" + } + ).unwrap().as_bytes() + ).unwrap(); + writer.flush().unwrap(); + } + NetworkSockIn::Connect { uuid, username, address } => { + let new_client = Client::new( + uuid, + username, + address, + stream.try_clone().unwrap(), + self.server_channel.clone() + ); + self.server_channel.send( + ServerMessages::ClientConnected(new_client) + ).unwrap_or_default(); + } + } + } + } + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 57abb62..1f92c55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ mod lib; use clap::{App, Arg}; + +use crate::lib::server::Server; +use crate::lib::foundation::ICooperative; fn main() { let _args = App::new("--rust chat server--") @@ -8,15 +11,19 @@ fn main() { .author("Mitchel Hardie , Michael Bailey ") .about("this is a chat server developed in rust, depending on the version one of two implementations will be used") .arg( - Arg::new("config") - .short('p') + Arg::with_name("config") + .short("p") .long("port") .value_name("PORT") - .about("sets the port the server listens on.") + .help("sets the port the server runs on.") .takes_value(true)) .get_matches(); - // creating the server object + let server = Server::new(); + + loop { + server.tick(); + } }