From fbf4e7ddbf09f35c982ec3c6d3ee89b4b5ebf305 Mon Sep 17 00:00:00 2001 From: Mitchell Date: Mon, 13 Jul 2020 21:51:23 +0100 Subject: [PATCH] added command functions for channels --- src/server/client/client_profile.rs | 72 ++++++++------ src/server/server_profile.rs | 141 +++++++++++++--------------- 2 files changed, 110 insertions(+), 103 deletions(-) diff --git a/src/server/client/client_profile.rs b/src/server/client/client_profile.rs index c34049d..0430c8c 100644 --- a/src/server/client/client_profile.rs +++ b/src/server/client/client_profile.rs @@ -1,6 +1,6 @@ extern crate regex; -use crate::server::commands::{ClientCommands, ServerCommands}; +use crate::server::commands::{ClientCommands, ServerCommands, Commands}; use crate::server::server_profile::Server; use std::net::{Shutdown, TcpStream}; @@ -14,19 +14,20 @@ use std::time::Duration; use regex::Regex; #[derive(Clone)] -pub struct Client{ +pub struct Client<'client_lifetime>{ connected: bool, stream: Arc, uuid: String, username: String, address: String, - tx_channel: Sender, - rx_channel: Receiver, + server: &'client_lifetime Server<'client_lifetime>, + tx_channel: Sender, + rx_channel: Receiver, } -impl Client{ - pub fn new(stream: Arc, uuid: &String, username: &String, address: &String) -> Client{ - let (tx_channel, rx_channel): (Sender, Receiver) = unbounded(); +impl <'a>Client{ + pub fn <'a>new(server: &Server, stream: Arc, uuid: &String, username: &String, address: &String) -> Client<'a>{ + let (tx_channel, rx_channel): (Sender, Receiver) = unbounded(); Client{ connected: true, @@ -39,11 +40,11 @@ impl Client{ } } - pub fn get_stream(&self) -> &TcpStream{ + fn get_stream(&self) -> &TcpStream{ &self.stream } - pub fn get_transmitter(&self) -> &Sender{ + pub fn get_transmitter(&self) -> &Sender{ &self.tx_channel } @@ -59,20 +60,27 @@ impl Client{ &self.address } - pub fn disconnect(&mut self){ - self.stream.shutdown(Shutdown::Both).expect("shutdown call failed"); - self.connected = false; - } - - pub fn handle_connection(&mut self, server: &Server, clients_ref: &Arc>>){ - self.stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap(); + pub fn handle_connection(&self){ + self.stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap(); let mut buffer = [0; 1024]; while self.connected { match self.rx_channel.try_recv(){ /*command is on the channel*/ Ok(command) => { - command.execute(self, &mut buffer); + match command{ + + Commands::Info(Some(params)) => { + self.get_stream().write_all(command.to_string); + }, + Commands::Disconnect(None) => { + + }, + Commands::ClientRemove(Some(params)) => {}, + Commands::Client(Some(params)) => {}, + Commands::Success(data) => {}, + _ => {}, + } }, /*sender disconnected*/ Err(TryRecvError::Disconnected) => {}, @@ -116,12 +124,17 @@ impl Client{ clients_hashmap.insert(uuid, self.clone()); std::mem::drop(clients_hashmap); - let new_client = ServerCommands::Client(data.clone()); + let new_client = Commands::Client(data.clone()); server.update_all_clients(&new_client); self.transmit_success(&String::from("")); } + pub fn disconnect(&mut self){ + self.stream.shutdown(Shutdown::Both).expect("shutdown call failed"); + self.connected = false; + } + pub fn transmit_data(&self, data: &str){ println!("Transmitting..."); println!("data: {}", data); @@ -132,17 +145,18 @@ impl Client{ pub fn confirm_success(&self, buffer: &mut [u8; 1024], data: &String){ let success_regex = Regex::new(r###"!success:"###).unwrap(); - //let mut failing = true; - //while failing{ - self.get_stream().read(&mut *buffer).unwrap(); - let incoming_message = String::from_utf8_lossy(&buffer[..]); - if success_regex.is_match(&incoming_message){ - println!("success"); - //failing = false; - }else{ - self.transmit_error(&String::from("")); - } - //} + + let _ = match self.get_stream().read(&mut *buffer).unwrap() { + Err(error) => self.transmit_error(&String::from("")), + Ok(success) => { + let incoming_message = String::from_utf8_lossy(&buffer[..]); + if success_regex.is_match(&incoming_message){ + println!("success"); + }else{ + self.transmit_error(&String::from("")); + } + }, + }; } pub fn transmit_success(&self, data: &String){ diff --git a/src/server/server_profile.rs b/src/server/server_profile.rs index 9cd6d8f..7a163c7 100644 --- a/src/server/server_profile.rs +++ b/src/server/server_profile.rs @@ -1,7 +1,7 @@ extern crate regex; use crate::server::client::client_profile::Client; -use crate::server::commands::{ClientCommands, ServerCommands}; +use crate::server::commands::{ClientCommands, ServerCommands, Commands}; use rust_chat_server::ThreadPool; use std::collections::VecDeque; @@ -14,20 +14,23 @@ use dashmap::DashMap; use std::io::prelude::*; use regex::Regex; -pub struct Server{ +pub struct Server<'server_lifetime> { name: String, address: String, author: String, - connected_clients: Arc>>, + connected_clients: Arc>>>, + thread_pool: ThreadPool, } +// MARK: - server implemetation impl Server{ - pub fn new(name: &String, address: &String, author: &String, connected_clients: &Arc>>) -> Server{ + pub fn new<'server_lifetime>(name: &String, address: &String, author: &String) -> Server<'server_lifetime>{ Server{ name: name.to_string(), address: address.to_string(), author: author.to_string(), - connected_clients: Arc::clone(&connected_clients), + connected_clients: Arc::new(Mutex::new(HashMap::new())), + thread_pool: ThreadPool::new(16) } } @@ -35,81 +38,71 @@ impl Server{ &self.address } - pub fn get_info(&self) -> String{ - let mut server_details = "".to_string(); - server_details.push_str(&"name:".to_string()); - server_details.push_str(&self.name); - server_details.push_str(&" owner:".to_string()); - server_details.push_str(&self.author); - - server_details - } - - pub fn establish_connection(&self, mut stream: TcpStream) -> Result{ - /*let listener = TcpListener::bind(self.address.clone()).unwrap(); - let pool = ThreadPool::new(10); - //let (tx,rx): (Sender>, Receiver>) = unbounded(); - //let (clock_tx, _) = (tx.clone(), rx.clone()); - - //stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap(); - loop{ - if let Ok((mut stream, addr)) = listener.accept(){ - println!("Connected: {}", addr); - - let connected_clients_ref = Arc::clone(&self.connected_clients); - let request = String::from("?request:"); - self.transmit_data(&stream, &request); - - pool.execute(move || {*/ - let mut client_connection: Result = Err(true); + pub fn start(&self) { + let listener = TcpListener::bind(self.get_address()).unwrap(); let mut buffer = [0; 1024]; - let request = String::from("?request:"); - self.transmit_data(&stream, &request); + //stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap(); + loop { + if let Ok((mut stream, addr)) = listener.accept() { + println!("Connected: {}", addr); - let mut stream = Arc::new(stream); - while client_connection.is_err(){ - Arc::get_mut(&mut stream).unwrap().read(&mut buffer).unwrap(); + let request = Commands::Request(None); + self.transmit_data(&stream, request.to_string().as_str()); - let incoming_message = String::from_utf8_lossy(&buffer[..]); - let command = self.tokenize(&incoming_message); - client_connection = match command{ - Ok(cmd) => { - match cmd{ - ClientCommands::Connect(data) => { - //connecting = false; - let uuid = data.get("uuid").unwrap(); - let username = data.get("name").unwrap(); - let address = data.get("host").unwrap(); + stream.read(&mut buffer).unwrap(); - let stream = Arc::clone(&stream); - let mut client = Client::new(stream, &uuid, &username, &address); - client.connect(self, &self.connected_clients, &data); - //cmd.execute(&mut client, self, &mut buffer, &self.connected_clients); - Ok(client) - //client.handle_connection(self, &connected_clients_ref); - }, - ClientCommands::Info => { - let server_details = self.get_info(); - self.transmit_data(&stream, &server_details); - Err(true) - }, - _ => { - println!("Invalid command!"); - Err(true) - }, - } - }, - Err(e) => { - println!("{}", e); - Err(true) - }, - }; - } - client_connection - /*}); + let incoming_message = String::from_utf8_lossy(&buffer[..]); + let result = Commands::from_string(incoming_message.as_str()); + match result{ + Ok(command) => { + match command{ + Commands::Connect(Some(data)) => { + let uuid = data.get("uuid").unwrap(); + let username = data.get("name").unwrap(); + let address = data.get("host").unwrap(); + + let stream = Arc::clone(&stream); + let mut client = Client::new(self, stream, &uuid, &username, &address); + + self.thread_pool.execute(move || { + client.handle_connection(); + }); + + let mut clients_hashmap = self.connected_clients.lock().unwrap(); + clients_hashmap.insert(uuid, client.clone()); + }, + Commands::Info(None) => { + let params: HashMap = HashMap::new(); + params.insert("name", &self.name); + params.insert("owner", &self.owner); + + let command = Commands::Info(Some(params)); + + self.transmit_data(&stream, command.to_string().as_str()); + }, + _ => { + println!("Invalid command!"); + self.transmit_data(&stream, Commands::Error(None).to_string.as_str()); + }, + } + }, + Err(e) => { + println!("error: {:?}", e); + self.transmit_data(&stream, Commands::Error(None).to_string.as_str()); + }, + } } - }*/ + } + } + + pub fn get_info(&self, tx: Sender) { + let params: HashMap = HashMap::new(); + params.insert("name", &self.name); + params.insert("owner", &self.owner); + + let command = Commands::Info(Some(params)); + tx.send(command).unwrap(); } pub fn update_all_clients(&self, notification: &ServerCommands){