From d56a7209c235ef7517217724b6912020b8db1692 Mon Sep 17 00:00:00 2001 From: michael-bailey Date: Mon, 3 Aug 2020 12:39:50 +0100 Subject: [PATCH] Update server_profile.rs + added thread spawning. +added channels for server communication outside of the thread. + added main loop process. ~ changes the tcp listener to be non blocking. + added loop labels to break out of. + impl drop for the server that signals the thread to break. --- src/server/server_profile.rs | 283 ++++++++++++++++++++--------------- 1 file changed, 164 insertions(+), 119 deletions(-) diff --git a/src/server/server_profile.rs b/src/server/server_profile.rs index 0446524..ba264a6 100644 --- a/src/server/server_profile.rs +++ b/src/server/server_profile.rs @@ -1,35 +1,71 @@ extern crate regex; +extern crate rayon; -use crate::server::client::client_profile::Client; -use crate::server::commands::{Commands}; +use crate::{ + server::{ + client::client_profile::Client, + commands::{Commands} + } +}; +use std::{ + sync::{Arc, Mutex}, + net::{TcpStream, TcpListener}, + collections::HashMap, + io::prelude::*, + thread, + io +}; +use log::info; + +use crossbeam_channel::{Sender, Receiver, unbounded}; use rust_chat_server::ThreadPool; -use std::net::{TcpStream, TcpListener}; -use std::sync::{Arc, Mutex}; -use crossbeam_channel::Sender; -use parking_lot::FairMutex; -use std::collections::HashMap; -use dashmap::DashMap; -use std::io::prelude::*; -use regex::Regex; +use zeroize::Zeroize; +use std::time::Duration; + +#[derive(Debug)] +pub enum ServerMessages { + #[allow(dead_code)] + RequestUpdate(String), + #[allow(dead_code)] + RequestInfo(String, String), + #[allow(dead_code)] + RequestDisconnect(String), + #[allow(dead_code)] + Shutdown, +} + +// MARK: - server struct + +pub struct Server { + pub name: String, + pub address: String, + pub author: String, + + connected_clients: Arc>>, -pub struct Server<'z> { - name: &'z str, - address: &'z str, - author: &'z str, - connected_clients: Arc>>>, thread_pool: ThreadPool, + + sender: Sender, + receiver: Receiver, + } // MARK: - server implemetation -impl<'z> Server<'z> { - pub fn new(name: &'z str, address: &'z str, author: &'z str) -> Self { +impl Server { + pub fn new(name: &str, address: &str, author: &str) -> Self { + let (sender, receiver) = unbounded(); + Self { - name: name, - address: address, - author: author, + name: name.to_string(), + address: address.to_string(), + author: author.to_string(), connected_clients: Arc::new(Mutex::new(HashMap::new())), thread_pool: ThreadPool::new(16), + + + sender, + receiver, } } @@ -37,62 +73,108 @@ impl<'z> Server<'z> { self.address.to_string() } - pub fn start(&'static self) { - let listener = TcpListener::bind(self.get_address()).unwrap(); + pub fn start(&self) -> Result<(), io::Error>{ + info!("server: starting server..."); + // clone elements for thread + let client_map = self.connected_clients.clone(); + let sender = self.sender.clone(); + let receiver = self.receiver.clone(); + + let server_details = (self.name.clone(), self.author.clone(), self.address.clone()); + + // set up listener and buffer + let listener = TcpListener::bind(self.get_address())?; + listener.set_nonblocking(true); + let mut buffer = [0; 1024]; - loop { - if let Ok((mut stream, addr)) = listener.accept() { - println!("Server: new connection, {}", addr); + info!("server: spawning threads"); + thread::Builder::new().name("Server Thread".to_string()).spawn(move || { + 'outer: loop { + // get messages from the servers channel. + info!("server: getting messages"); + for i in receiver.try_iter() { + match i { + ServerMessages::Shutdown => { + // TODO: implement disconnecting all clients and shutting down the server + info!("server: shutting down..."); - let request = Commands::Request(None); - //request.to_string(); - self.transmit_data(&stream, &request.to_string().as_str()); - - stream.read(&mut buffer).unwrap(); - - let incoming_message = String::from(String::from_utf8_lossy(&buffer)); - let command = Commands::from(incoming_message); - match command { - Commands::Connect(Some(data)) => { - let uuid = data.get("uuid").unwrap(); - let username = data.get("name").unwrap(); - let address = data.get("host").unwrap(); - - let stream = Arc::new(stream); - let mut client = Client::new(self, stream, &uuid, &username, &address); - - let mut clients_hashmap = self.connected_clients.lock().unwrap(); - clients_hashmap.insert(uuid.to_string(), client.get_transmitter().clone()); - std::mem::drop(clients_hashmap); - - self.thread_pool.execute(move || { - client.handle_connection(); - }); - - let params: HashMap = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect(); - let new_client = Commands::Client(Some(params)); - - self.update_all_clients(new_client); - }, - Commands::Info(None) => { - let mut params: HashMap = HashMap::new(); - params.insert(String::from("name"), self.name.to_string().clone()); - params.insert(String::from("owner"), self.author.to_string().clone()); - - let command = Commands::Info(Some(params)); - - self.transmit_data(&stream, command.to_string().as_str()); - }, - _ => { - println!("Invalid command!"); - self.transmit_data(&stream, Commands::Error(None).to_string().as_str()); - }, + break 'outer; + }, + _ => {} + } } - } - } + + info!("server: checking for new connections"); + if let Ok((mut stream, addr)) = listener.accept() { + stream.set_read_timeout(Some(Duration::from_millis(100))).unwrap(); + + let request = Commands::Request(None); + //request.to_string(); + stream.write_all(&request.to_string().as_bytes()); + stream.flush(); + stream.read(&mut buffer).unwrap(); + + let incoming_message = String::from(String::from_utf8_lossy(&buffer)); + let command = Commands::from(incoming_message); + // clears the buffer. + buffer.zeroize(); + + match command { + Commands::Connect(Some(data)) => { + let uuid = data.get("uuid").unwrap(); + let username = data.get("name").unwrap(); + let address = data.get("host").unwrap(); + + info!("{}", format!("Server: new Client connection: addr = {}", address )); + + let client = Client::new(stream, sender.clone(), uuid.clone(), username.clone(), address.clone()); + + client_map.lock().unwrap().insert(uuid.to_string(), client); + + let params: HashMap = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect(); + let new_client = Commands::Client(Some(params)); + + client_map.lock().unwrap().iter().map(|(k, v)| v.sender.send(new_client.clone())); + }, + Commands::Info(None) => { + info!("Server: info requested"); + let mut params: HashMap = HashMap::new(); + params.insert(String::from("name"), server_details.0.clone()); + params.insert(String::from("owner"), server_details.1.clone()); + + let command = Commands::Info(Some(params)); + + stream.write_all(&command.to_string().as_bytes()); + stream.flush(); + }, + _ => { + info!("Server: Invalid command sent"); + stream.write_all(Commands::Error(None).to_string().as_bytes()); + stream.flush(); + }, + } + } + // TODO: end - + + // handle each client for messages + info!("server: handing control to clients"); + for (_k, v) in client_map.lock().unwrap().iter() { + v.handle_connection(); + } + } + info!("server: stopped") + }); + info!("server: started"); + Ok(()) } + pub fn stop(&self) { + info!("server: sending stop message"); + self.sender.send(ServerMessages::Shutdown); + } + + #[allow(dead_code)] pub fn get_info(&self, tx: Sender) { let mut params: HashMap = HashMap::new(); params.insert(String::from("name"), self.name.to_string().clone()); @@ -102,10 +184,11 @@ impl<'z> Server<'z> { tx.send(command).unwrap(); } + #[allow(dead_code)] pub fn update_all_clients(&self, command: Commands){ let clients = self.connected_clients.lock().unwrap(); - for tx in clients.values(){ - tx.send(command.clone()).unwrap(); + for client in clients.values(){ + client.sender.send(command.clone()).unwrap(); } } @@ -121,53 +204,15 @@ impl<'z> Server<'z> { stream.write(data.to_string().as_bytes()).unwrap(); stream.flush().unwrap(); } +} - //deprecated - /* - pub fn tokenize(&self, incoming_message: &str) -> Result{ - let command_regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap(); - - if command_regex.is_match(incoming_message){ - let command = self.match_command(&incoming_message.to_string()); - let command = match command{ - ClientCommands::Connect(mut addons) => { - self.regex_data(&command_regex, &incoming_message.replace("!connect: ", ""), &mut addons); - ClientCommands::Connect(addons) - }, - ClientCommands::ClientInfo(mut addons) => { - self.regex_data(&command_regex, &incoming_message.replace("!clientInfo: ", ""), &mut addons); - ClientCommands::ClientInfo(addons) - }, - _ => { - println!("no addons"); - command - }, - }; - Ok(command) - } else { - Err("data did not match regex!") - } - } - - - fn match_command(&self, command: &String) -> ClientCommands{ - match command{ - _ if command.starts_with("!info:") => ClientCommands::Info, - _ if command.starts_with("!connect:") => ClientCommands::Connect(HashMap::new()), - _ if command.starts_with("!disconnect:") => ClientCommands::Disconnect, - _ if command.starts_with("!clientUpdate:") => ClientCommands::ClientUpdate, - _ if command.starts_with("!clientInfo:") => ClientCommands::ClientInfo(HashMap::new()), - _ => ClientCommands::Unknown, - } - } - */ - - fn regex_data(&self, command_regex: &Regex, data: &str, command_addons: &mut HashMap){ - for figure in command_regex.find_iter(data){ - let segment = figure.as_str().to_string(); - let contents: Vec<&str> = segment.split(":").collect(); - println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string()); - command_addons.insert(contents[0].to_string(), contents[1].to_string()); - } +impl Drop for Server { + fn drop(&mut self) { + println!("server dropped"); + let _ = self.sender.send(ServerMessages::Shutdown); } } + +struct ServerDelegate { + +} \ No newline at end of file