diff --git a/src/server/ServerV3.rs b/src/server/ServerV3.rs new file mode 100644 index 0000000..6f5eb98 --- /dev/null +++ b/src/server/ServerV3.rs @@ -0,0 +1,281 @@ +use std::{sync::{Mutex, Arc}, net::{TcpStream, TcpListener}, collections::HashMap, io, io::{Write, Read}, thread}; +use crate::{ + server::client::clientV3::Client, + commands::Commands +}; +use crossbeam_channel::{Sender, Receiver, unbounded}; +use log::info; +use std::time::Duration; + +#[derive(Debug)] +pub enum ServerMessages { + RequestUpdate(Arc>), + RequestInfo(String, Arc>), + Disconnect(String), + Shutdown, +} + +pub enum ServerState { + starting, + started, + stopping, + stopped, +} + +// MARK: - server struct +pub struct Server { + pub name: String, + pub address: String, + pub owner: String, + + pub state: ServerState, + + connected_clients: HashMap, + + sender: Sender, + receiver: Receiver, + listener: Option, + + buffer: [u8; 1024], + + client_list_changed_handle: Box, + + // metrics + pub o2s_rqst: usize, + pub c2s_msgs: usize, + pub s2s_msgs: usize, + pub s2c_msgs: usize, +} + +// MARK: - server implemetation +impl Server { + pub fn new(name: &str, address: &str, author: &str) -> Result { + // creating server channels + let (sender, receiver) = unbounded(); + + Ok( + Self { + // server data + name: name.to_string(), + address: address.to_string(), + owner: author.to_string(), + connected_clients: HashMap::new(), + state: ServerState::ready, + + // messages & connections + sender, + receiver, + listener: None, + + buffer: [0; 1024], + + // event handles + client_list_changed_handle: Box::new(|_s| info!("Server: client list changed.")), + + // metrics + o2s_rqst: 0, + c2s_msgs: 0, + s2s_msgs: 0, + s2c_msgs: 0, + } + ) + } + + #[allow(dead_code)] + pub fn get_name(&self) -> String { + self.name.clone() + } + + #[allow(dead_code)] + pub fn get_address(&self) -> String { + self.address.clone() + } + + #[allow(dead_code)] + pub fn get_owner(&self) -> String { + self.owner.clone() + } + + pub fn tick(&mut self) { + + // check to see if this server is ready to execute things. + if self.state != ServerState::ready { + () + } + + // check for any server messages in the channel + println!("server: getting messages"); + for i in self.receiver.try_iter() { + match i { + // server calls + ServerMessages::Shutdown => { + self.s2s_msgs += 1; + + println!("server: shutting down..."); + + for (k, v) in self.connected_clients.iter() { + v.sender.send(Commands::Disconnect(None)); + } + self.state = ServerState::stopping; + }, + + // client requests + ServerMessages::RequestUpdate(stream_arc) => { + self.c2s_msgs += 1; + + for (_k, v) in self.connected_clients.iter() { + let mut stream = stream_arc.lock().unwrap(); + let _ = Server::send_data(&mut stream, v.to_string().as_str()); + let data = Server::recv_data(&mut stream, &mut self.buffer).unwrap_or(Commands::Error(None)); + + if data == Commands::Success(None) { + println!("Success Confirmed"); + } else { + println!("No success read"); + let error = Commands::Error(None); + let _ = Server::send_data(&mut stream, error.to_string().as_str()); + } + } + }, + + // client requests for info + ServerMessages::RequestInfo(uuid, stream_arc) => { + self.c2s_msgs += 1; + + let mut stream = stream_arc.lock().unwrap(); + + if let Some(client) = self.connected_clients.get(&uuid) { + + let params: HashMap = [ + (String::from("uuid"), client.get_uuid()), + (String::from("name"), client.get_username()), + (String::from("host"), client.get_address()) + ].iter().cloned().collect(); + + let command = Commands::Success(Some(params)); + let _ = Server::send_data(&mut stream, command.to_string().as_str()); + + } else { + let command = Commands::Success(None); + let _ = Server::send_data(&mut stream, command.to_string().as_str()); + } + }, + + // client disconnect requests + ServerMessages::Disconnect(uuid) => { + self.c2s_msgs += 1; + + self.connected_clients.remove(&uuid.to_string()); + + let params: HashMap = [(String::from("uuid"), uuid)].iter().cloned().collect(); + + let command = Commands::ClientRemove(Some(params)); + let _ = self.connected_clients.iter().map(move |(_k, v)| {v.get_sender().send(command.clone())}); + + }, + } + } + + println!("server: checking for new connections"); + if let Ok((mut stream, _addr)) = self.listener.accept() { + let _ = stream.set_read_timeout(Some(Duration::from_millis(1000))); + let _ = stream.set_nonblocking(false); + + let request = Commands::Request(None); + let _ = Server::send_data(&mut stream, &request.to_string().as_str()); + + match Server::recv_data(&mut stream, &mut self.buffer) { + + + Ok(Commands::Connect(Some(data))) => { + self.o2s_rqst += 1; + + let uuid = data.get("uuid").unwrap(); + let username = data.get("name").unwrap(); + let address = data.get("host").unwrap(); + + info!("{}", format!("Server: new client from {}", address )); + + let client = Client::new(stream, self.sender.clone(), &uuid, &username, &address); + + self.connected_clients.insert(uuid.to_string(), client); + + let params: HashMap = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect(); + let new_client = Commands::Client(Some(params)); + + let _ = self.connected_clients.iter().map( |(_k, v)| v.sender.send(new_client.clone())); + }, + + + Ok(Commands::Info(None)) => { + self.o2s_rqst += 1; + + println!("Server: info requested"); + let params: HashMap = [(String::from("name"), self.name.to_string().clone()), (String::from("owner"), self.owner.to_string().clone())].iter().cloned().collect(); + let command = Commands::Info(Some(params)); + + let _ = Server::send_data(&mut stream, command.to_string().as_str()); + }, + + Err(_) => println!("ERROR: stream closed"), + + // TODO: - correct connection reset error when getting info. + _ => { + println!("Server: Invalid command sent"); + let _ = Server::send_data(&mut stream, Commands::Error(None).to_string().as_str()); + }, + } + } + + println!("server: handing control to clients"); + for (_k, client) in self.connected_clients.iter_mut() { + client.handle_connection(); + } + } + + pub fn start(&mut self) -> Result<(), io::Error> { + + let listener = TcpListener::bind(self.address)?; + listener.set_nonblocking(true)?; + + self.listener = Some(listener); + } + + pub fn stop(&mut self) { + info!("server: sending stop message"); + let _ = self.sender.send(ServerMessages::Shutdown); + self.state = ServerState::stopping; + } + + fn send_data(stream: &mut TcpStream, data: &str) -> Result<(), io::Error>{ + println!("Transmitting..."); + println!("data: {}", data); + + /* + * This will throw an error and crash any thread, including the main thread, if + * the connection is lost before transmitting. Maybe change to handle any exceptions + * that may occur. + */ + let _ = stream.write(data.to_string().as_bytes())?; + stream.flush()?; + Ok(()) + } + + fn recv_data(stream: &mut TcpStream, buffer: &mut [u8; 1024]) -> Result { + let _ = stream.read(buffer)?; + let command = Commands::from(buffer); + + Ok(command) + } +} + +impl ToString for Server { + fn to_string(&self) -> std::string::String { todo!() } +} + +impl Drop for Server { + fn drop(&mut self) { + println!("server dropped"); + let _ = self.sender.send(ServerMessages::Shutdown); + } +} diff --git a/src/server/client/clientV3.rs b/src/server/client/clientV3.rs new file mode 100644 index 0000000..df257c3 --- /dev/null +++ b/src/server/client/clientV3.rs @@ -0,0 +1,217 @@ +extern crate regex; + +use std::{ + sync::Arc, + sync::Mutex, + net::{Shutdown, TcpStream}, + io::prelude::*, + io::Error, + //collections::HashMap, + time::{Instant, Duration}, + io, +}; + +use crossbeam_channel::{ + Sender, + Receiver, + TryRecvError, + unbounded +}; + +use log::info; + +use crate::{ + server::ServerV3::ServerMessages, + commands::Commands, +}; + + + +#[derive(Debug)] +pub struct Client { + uuid: String, + username: String, + address: String, + + last_heartbeat: Instant, + + stream: Arc>, + + pub sender: Sender, + receiver: Receiver, + + server_sender: Sender, +} + +impl Client { + pub fn new(stream: TcpStream, server_sender: Sender, uuid: &str, username: &str, address: &str) -> Self { + let (sender, receiver): (Sender, Receiver) = unbounded(); + stream.set_read_timeout(Some(Duration::from_secs(1))).unwrap(); + + Client { + stream: Arc::new(Mutex::new(stream)), + uuid: uuid.to_string(), + username: username.to_string(), + address: address.to_string(), + + sender, + receiver, + + server_sender, + + last_heartbeat: Instant::now(), + } + } + + #[allow(dead_code)] + pub fn get_sender(&self) -> &Sender { + &self.sender + } + + #[allow(dead_code)] + pub fn get_uuid(&self) -> String { + self.uuid.clone() + } + + #[allow(dead_code)] + pub fn get_username(&self) -> String { + self.username.clone() + } + + #[allow(dead_code)] + pub fn get_address(&self) -> String { + self.address.clone() + } + + // TODO: - add heartbeat timer. + pub fn handle_connection(&mut self) { + let mut buffer = [0; 1024]; + + // TODO: - Check heartbeat + { + //info!("heartbeat") + } + + info!("{}: handling connection", self.uuid); + match self.read_data(&mut buffer) { + + + Ok(Commands::Disconnect(None)) => { + self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())).expect("sending message to server failed"); + self.stream.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed"); + }, + + Ok(Commands::HeartBeat(None)) => { + self.last_heartbeat = Instant::now(); + self.send_data(Commands::Success(None).to_string().as_str()); + }, + + Ok(Commands::ClientUpdate(None)) => { + self.send_data(Commands::Success(None).to_string().as_str()); + let _ = self.server_sender.send(ServerMessages::RequestUpdate(self.stream.clone())); + }, + + Ok(Commands::ClientInfo(Some(params))) => { + let uuid = params.get("uuid").unwrap(); + let _ = self.server_sender.send(ServerMessages::RequestInfo(uuid.clone(), self.stream.clone())); + }, + + Ok(Commands::Error(None)) => { + self.send_data(Commands::Error(None).to_string().as_str()); + }, + + _ => { + self.send_data(Commands::Error(None).to_string().as_str()); + }, + + Err(_) => { + // No data was read + }, + } + + println!("buffer"); + // test to see if there is anything for the client to receive from its channel + match self.receiver.try_recv() { + /*command is on the channel*/ + Ok(Commands::ClientRemove(Some(params))) => { + let mut retry: u8 = 3; + 'retry_loop1: loop { + if retry < 1 { + self.send_data(Commands::Error(None).to_string().as_str()); + break 'retry_loop1 + } else { + self.send_data(Commands::ClientRemove(Some(params.clone())).to_string().as_str()); + + if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) { + break 'retry_loop1; + } else { + retry -= 1; + } + } + } + }, + Ok(Commands::Client(Some(params))) => { + let mut retry: u8 = 3; + 'retry_loop2: loop { + if retry < 1 { + self.send_data(Commands::Error(None).to_string().as_str()); + break 'retry_loop2; + } else { + self.send_data(Commands::Client(Some(params.clone())).to_string().as_str()); + + if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) { + break 'retry_loop2; + } else { + retry -= 1; + } + } + } + + }, + /*No data available yet*/ + Err(TryRecvError::Empty) => {}, + _ => {}, + } + println!("---Client Thread Exit---"); + } + + // move into a drop perhaps + #[allow(dead_code)] + pub fn disconnect(&mut self){ + self.stream.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed"); + } + + pub fn send_data(&self, data: &str) { + println!("Transmitting data: {}", data); + + let error_result = self.stream.lock().unwrap().write_all(data.to_string().as_bytes()); + if let Some(error) = error_result.err(){ + match error.kind() { + // handle disconnections + io::ErrorKind::NotConnected => { + let _ = self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())); + }, + _ => { }, + } + } + } + + fn read_data(&mut self, buffer: &mut [u8; 1024]) -> Result { + let _ = self.stream.lock().unwrap().read(buffer)?; + let command = Commands::from(buffer); + + Ok(command) + } + +} + +impl ToString for Client { + fn to_string(&self) -> std::string::String { todo!() } +} + +impl Drop for Client { + fn drop(&mut self) { + let _ = self.stream.lock().unwrap().write_all(Commands::Disconnect(None).to_string().as_bytes()); + let _ = self.stream.lock().unwrap().shutdown(Shutdown::Both); + } +}