added command functions for channels

This commit is contained in:
Mitchell 2020-07-13 21:51:23 +01:00
parent e6f1b22782
commit fbf4e7ddbf
2 changed files with 110 additions and 103 deletions

View File

@ -1,6 +1,6 @@
extern crate regex; extern crate regex;
use crate::server::commands::{ClientCommands, ServerCommands}; use crate::server::commands::{ClientCommands, ServerCommands, Commands};
use crate::server::server_profile::Server; use crate::server::server_profile::Server;
use std::net::{Shutdown, TcpStream}; use std::net::{Shutdown, TcpStream};
@ -14,19 +14,20 @@ use std::time::Duration;
use regex::Regex; use regex::Regex;
#[derive(Clone)] #[derive(Clone)]
pub struct Client{ pub struct Client<'client_lifetime>{
connected: bool, connected: bool,
stream: Arc<TcpStream>, stream: Arc<TcpStream>,
uuid: String, uuid: String,
username: String, username: String,
address: String, address: String,
tx_channel: Sender<ServerCommands>, server: &'client_lifetime Server<'client_lifetime>,
rx_channel: Receiver<ServerCommands>, tx_channel: Sender<Commands>,
rx_channel: Receiver<Commands>,
} }
impl Client{ impl <'a>Client{
pub fn new(stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Client{ pub fn <'a>new(server: &Server, stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Client<'a>{
let (tx_channel, rx_channel): (Sender<ServerCommands>, Receiver<ServerCommands>) = unbounded(); let (tx_channel, rx_channel): (Sender<Commands>, Receiver<Commands>) = unbounded();
Client{ Client{
connected: true, connected: true,
@ -39,11 +40,11 @@ impl Client{
} }
} }
pub fn get_stream(&self) -> &TcpStream{ fn get_stream(&self) -> &TcpStream{
&self.stream &self.stream
} }
pub fn get_transmitter(&self) -> &Sender<ServerCommands>{ pub fn get_transmitter(&self) -> &Sender<Commands>{
&self.tx_channel &self.tx_channel
} }
@ -59,20 +60,27 @@ impl Client{
&self.address &self.address
} }
pub fn disconnect(&mut self){ pub fn handle_connection(&self){
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed"); self.stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
self.connected = false;
}
pub fn handle_connection(&mut self, server: &Server, clients_ref: &Arc<Mutex<HashMap<String, Client>>>){
self.stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
let mut buffer = [0; 1024]; let mut buffer = [0; 1024];
while self.connected { while self.connected {
match self.rx_channel.try_recv(){ match self.rx_channel.try_recv(){
/*command is on the channel*/ /*command is on the channel*/
Ok(command) => { Ok(command) => {
command.execute(self, &mut buffer); match command{
Commands::Info(Some(params)) => {
self.get_stream().write_all(command.to_string);
},
Commands::Disconnect(None) => {
},
Commands::ClientRemove(Some(params)) => {},
Commands::Client(Some(params)) => {},
Commands::Success(data) => {},
_ => {},
}
}, },
/*sender disconnected*/ /*sender disconnected*/
Err(TryRecvError::Disconnected) => {}, Err(TryRecvError::Disconnected) => {},
@ -116,12 +124,17 @@ impl Client{
clients_hashmap.insert(uuid, self.clone()); clients_hashmap.insert(uuid, self.clone());
std::mem::drop(clients_hashmap); std::mem::drop(clients_hashmap);
let new_client = ServerCommands::Client(data.clone()); let new_client = Commands::Client(data.clone());
server.update_all_clients(&new_client); server.update_all_clients(&new_client);
self.transmit_success(&String::from("")); self.transmit_success(&String::from(""));
} }
pub fn disconnect(&mut self){
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
self.connected = false;
}
pub fn transmit_data(&self, data: &str){ pub fn transmit_data(&self, data: &str){
println!("Transmitting..."); println!("Transmitting...");
println!("data: {}", data); println!("data: {}", data);
@ -132,17 +145,18 @@ impl Client{
pub fn confirm_success(&self, buffer: &mut [u8; 1024], data: &String){ pub fn confirm_success(&self, buffer: &mut [u8; 1024], data: &String){
let success_regex = Regex::new(r###"!success:"###).unwrap(); let success_regex = Regex::new(r###"!success:"###).unwrap();
//let mut failing = true;
//while failing{ let _ = match self.get_stream().read(&mut *buffer).unwrap() {
self.get_stream().read(&mut *buffer).unwrap(); Err(error) => self.transmit_error(&String::from("")),
let incoming_message = String::from_utf8_lossy(&buffer[..]); Ok(success) => {
if success_regex.is_match(&incoming_message){ let incoming_message = String::from_utf8_lossy(&buffer[..]);
println!("success"); if success_regex.is_match(&incoming_message){
//failing = false; println!("success");
}else{ }else{
self.transmit_error(&String::from("")); self.transmit_error(&String::from(""));
} }
//} },
};
} }
pub fn transmit_success(&self, data: &String){ pub fn transmit_success(&self, data: &String){

View File

@ -1,7 +1,7 @@
extern crate regex; extern crate regex;
use crate::server::client::client_profile::Client; use crate::server::client::client_profile::Client;
use crate::server::commands::{ClientCommands, ServerCommands}; use crate::server::commands::{ClientCommands, ServerCommands, Commands};
use rust_chat_server::ThreadPool; use rust_chat_server::ThreadPool;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -14,20 +14,23 @@ use dashmap::DashMap;
use std::io::prelude::*; use std::io::prelude::*;
use regex::Regex; use regex::Regex;
pub struct Server{ pub struct Server<'server_lifetime> {
name: String, name: String,
address: String, address: String,
author: String, author: String,
connected_clients: Arc<Mutex<HashMap<String,Client>>>, connected_clients: Arc<Mutex<HashMap<String,Client<'server_lifetime>>>>,
thread_pool: ThreadPool,
} }
// MARK: - server implemetation
impl Server{ impl Server{
pub fn new(name: &String, address: &String, author: &String, connected_clients: &Arc<Mutex<HashMap<String,Client>>>) -> Server{ pub fn new<'server_lifetime>(name: &String, address: &String, author: &String) -> Server<'server_lifetime>{
Server{ Server{
name: name.to_string(), name: name.to_string(),
address: address.to_string(), address: address.to_string(),
author: author.to_string(), author: author.to_string(),
connected_clients: Arc::clone(&connected_clients), connected_clients: Arc::new(Mutex::new(HashMap::new())),
thread_pool: ThreadPool::new(16)
} }
} }
@ -35,81 +38,71 @@ impl Server{
&self.address &self.address
} }
pub fn get_info(&self) -> String{ pub fn start(&self) {
let mut server_details = "".to_string(); let listener = TcpListener::bind(self.get_address()).unwrap();
server_details.push_str(&"name:".to_string());
server_details.push_str(&self.name);
server_details.push_str(&" owner:".to_string());
server_details.push_str(&self.author);
server_details
}
pub fn establish_connection(&self, mut stream: TcpStream) -> Result<Client, bool>{
/*let listener = TcpListener::bind(self.address.clone()).unwrap();
let pool = ThreadPool::new(10);
//let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
//let (clock_tx, _) = (tx.clone(), rx.clone());
//stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
loop{
if let Ok((mut stream, addr)) = listener.accept(){
println!("Connected: {}", addr);
let connected_clients_ref = Arc::clone(&self.connected_clients);
let request = String::from("?request:");
self.transmit_data(&stream, &request);
pool.execute(move || {*/
let mut client_connection: Result<Client, bool> = Err(true);
let mut buffer = [0; 1024]; let mut buffer = [0; 1024];
let request = String::from("?request:"); //stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
self.transmit_data(&stream, &request); loop {
if let Ok((mut stream, addr)) = listener.accept() {
println!("Connected: {}", addr);
let mut stream = Arc::new(stream); let request = Commands::Request(None);
while client_connection.is_err(){ self.transmit_data(&stream, request.to_string().as_str());
Arc::get_mut(&mut stream).unwrap().read(&mut buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]); stream.read(&mut buffer).unwrap();
let command = self.tokenize(&incoming_message);
client_connection = match command{
Ok(cmd) => {
match cmd{
ClientCommands::Connect(data) => {
//connecting = false;
let uuid = data.get("uuid").unwrap();
let username = data.get("name").unwrap();
let address = data.get("host").unwrap();
let stream = Arc::clone(&stream); let incoming_message = String::from_utf8_lossy(&buffer[..]);
let mut client = Client::new(stream, &uuid, &username, &address); let result = Commands::from_string(incoming_message.as_str());
client.connect(self, &self.connected_clients, &data); match result{
//cmd.execute(&mut client, self, &mut buffer, &self.connected_clients); Ok(command) => {
Ok(client) match command{
//client.handle_connection(self, &connected_clients_ref); Commands::Connect(Some(data)) => {
}, let uuid = data.get("uuid").unwrap();
ClientCommands::Info => { let username = data.get("name").unwrap();
let server_details = self.get_info(); let address = data.get("host").unwrap();
self.transmit_data(&stream, &server_details);
Err(true) let stream = Arc::clone(&stream);
}, let mut client = Client::new(self, stream, &uuid, &username, &address);
_ => {
println!("Invalid command!"); self.thread_pool.execute(move || {
Err(true) client.handle_connection();
}, });
}
}, let mut clients_hashmap = self.connected_clients.lock().unwrap();
Err(e) => { clients_hashmap.insert(uuid, client.clone());
println!("{}", e); },
Err(true) Commands::Info(None) => {
}, let params: HashMap<String, String> = HashMap::new();
}; params.insert("name", &self.name);
} params.insert("owner", &self.owner);
client_connection
/*}); let command = Commands::Info(Some(params));
self.transmit_data(&stream, command.to_string().as_str());
},
_ => {
println!("Invalid command!");
self.transmit_data(&stream, Commands::Error(None).to_string.as_str());
},
}
},
Err(e) => {
println!("error: {:?}", e);
self.transmit_data(&stream, Commands::Error(None).to_string.as_str());
},
}
} }
}*/ }
}
pub fn get_info(&self, tx: Sender<Commands>) {
let params: HashMap<String, String> = HashMap::new();
params.insert("name", &self.name);
params.insert("owner", &self.owner);
let command = Commands::Info(Some(params));
tx.send(command).unwrap();
} }
pub fn update_all_clients(&self, notification: &ServerCommands){ pub fn update_all_clients(&self, notification: &ServerCommands){