tmp fix with channel system

removed the sync thread system where every client would wait for each client to finish. New system is completely based of channels and messages sent between threads
This commit is contained in:
Mitchell 2020-07-12 21:47:32 +01:00
parent 1276aaa955
commit 69f192393d
11 changed files with 297 additions and 351 deletions

View File

@ -1,17 +1,35 @@
//mod client_management;
mod server;
use crate::server::client::client_profile::Client;
use crate::server::server_profile::Server;
use std::net::{TcpStream, TcpListener};
use rust_chat_server::ThreadPool;
use std::sync::{Arc, Barrier, Mutex};
use std::collections::HashMap;
use std::collections::VecDeque;
fn main(){
let server_name = String::from("Server-01");
let server_address = String::from("0.0.0.0:6000");
let server_author = String::from("nope@live.co.uk");
let connected_clients: Arc<Mutex<HashMap<String,Client>>> = Arc::new(Mutex::new(HashMap::new()));
let server = Server::new(&server_name, &server_address, &server_author);
server.start();
let server = Arc::new(Server::new(&server_name, &server_address, &server_author, &connected_clients));
//server.start();
let listener = TcpListener::bind(server.get_address()).unwrap();
let pool = ThreadPool::new(10);
//stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
loop{
if let Ok((mut stream, addr)) = listener.accept(){
println!("Connected: {}", addr);
let server = Arc::clone(&server);
let connected_clients = Arc::clone(&connected_clients);
pool.execute(move || {
match server.establish_connection(stream){
Ok(mut client) => client.handle_connection(&server, &connected_clients),
Err(error) => println!("---connction to client failed---"),
}
});
}
}
}

View File

@ -1,18 +1,17 @@
extern crate regex;
use crate::server::commands::Commands;
use crate::server::utility;
use crate::server::commands::{ClientCommands, ServerCommands};
use crate::server::server_profile::Server;
use std::collections::VecDeque;
use std::net::{Shutdown, TcpStream};
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{Receiver, TryRecvError};
use std::sync::{Arc, Mutex};
use crossbeam_channel::{Receiver, TryRecvError, unbounded, Sender};
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::time::Duration;
use regex::Regex;
#[derive(Clone)]
pub struct Client{
@ -21,22 +20,32 @@ pub struct Client{
uuid: String,
username: String,
address: String,
tx_channel: Sender<ServerCommands>,
rx_channel: Receiver<ServerCommands>,
}
impl Client{
pub fn new(stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Client{
let (tx_channel, rx_channel): (Sender<ServerCommands>, Receiver<ServerCommands>) = unbounded();
Client{
connected: true,
stream: stream,
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
tx_channel: tx_channel,
rx_channel: rx_channel,
}
}
pub fn get_stream(&self) -> &TcpStream{
&self.stream
}
pub fn get_transmitter(&self) -> &Sender<ServerCommands>{
&self.tx_channel
}
pub fn get_uuid(&self) -> &String{
&self.uuid
@ -55,53 +64,42 @@ impl Client{
self.connected = false;
}
pub fn handle_connection(&mut self, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, client_rx: Receiver<Arc<Barrier>>){
//self.stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
pub fn handle_connection(&mut self, server: &Server, clients_ref: &Arc<Mutex<HashMap<String, Client>>>){
self.stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
let mut buffer = [0; 1024];
while self.connected {
if !message_queue.lock().is_empty() {
match client_rx.try_recv(){
/*handle our data*/
Ok(sync_group) => {
println!("data present");
let data = utility::format_data(message_queue);
let command = utility::match_outbound_command(&data.get("command").unwrap());
println!("waiting 1");
sync_group.wait();
println!("executing");
command.execute(&self, &mut buffer, &data);
println!("waiting 2");
sync_group.wait();
println!("client updated");
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
match self.rx_channel.try_recv(){
/*command is on the channel*/
Ok(command) => {
command.execute(self, &mut buffer);
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
match self.stream.peek(&mut buffer){
Ok(_) => {
//self.stream.lock().unwrap().read(&mut buffer).unwrap();
self.get_stream().read(&mut buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]);
//let data: Vec<String> = utility::tokenize(&incoming_message);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
let command = server.tokenize(&incoming_message);
println!("Request: {}", incoming_message);
//println!("Data: {:?}", data);
//let command = utility::match_command(&data[0]);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
println!("Error!");
} else {
println!("command executing...");
command.execute(self, &mut buffer, &data, clients_ref, message_queue);
match command{
Ok(cmd) => {
match cmd{
ClientCommands::Connect(_) => println!("Error!"),
_ => {
println!("command executing...");
cmd.execute(self, server, &mut buffer, clients_ref);
},
}
},
Err(e) => println!("{}", e),
}
},
Err(_) => {
@ -110,5 +108,58 @@ impl Client{
}
}
println!("---thread exit---");
}
pub fn connect(&self, server: &Server, connected_clients: &Arc<Mutex<HashMap<String, Client>>>, data: &HashMap<String, String>){
let mut clients_hashmap = connected_clients.lock().unwrap();
let uuid = self.get_uuid().to_string();
clients_hashmap.insert(uuid, self.clone());
std::mem::drop(clients_hashmap);
let new_client = ServerCommands::Client(data.clone());
server.update_all_clients(&new_client);
self.transmit_success(&String::from(""));
}
pub fn transmit_data(&self, data: &str){
println!("Transmitting...");
println!("data: {}", data);
self.get_stream().write(data.to_string().as_bytes()).unwrap();
self.get_stream().flush().unwrap();
}
pub fn confirm_success(&self, buffer: &mut [u8; 1024], data: &String){
let success_regex = Regex::new(r###"!success:"###).unwrap();
//let mut failing = true;
//while failing{
self.get_stream().read(&mut *buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]);
if success_regex.is_match(&incoming_message){
println!("success");
//failing = false;
}else{
self.transmit_error(&String::from(""));
}
//}
}
pub fn transmit_success(&self, data: &String){
let mut success_message = "!success:".to_string();
if !data.is_empty(){
success_message.push_str(&" ".to_string());
success_message.push_str(&data.to_string());
}
self.transmit_data(&success_message);
}
fn transmit_error(&self, data: &String){
let mut error_message = "!error:".to_string();
if !data.is_empty(){
error_message.push_str(&" ".to_string());
error_message.push_str(&data.to_string());
}
self.transmit_data(&error_message);
}
}

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;

View File

@ -1,11 +1,5 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
// send a list of all clients.
// client !clientupdate:
// server !client: name:"vobo" uuid:24653526-23464562-46346-3563563 host:"127.0.0.1"
// server !client: name:"bovo" uuid:24643526-23464562-46346-3563563 host:"127.0.0.1"
pub fn format_client_data(uuid: &String, client: &Client) -> String{
["!client: username:",client.get_username(), " uuid:", uuid, " host:\"", client.get_address(), "\""].concat()
}

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;

View File

@ -1,12 +0,0 @@
pub fn get_server_info() -> String{
let mut server_details = "".to_string();
let server_name = String::from("Server-01");
let server_owner = String::from("mickyb18");
server_details.push_str(&"name:".to_string());
server_details.push_str(&server_name.to_string());
server_details.push_str(&" owner:".to_string());
server_details.push_str(&server_owner.to_string());
server_details
}

View File

@ -11,185 +11,111 @@ mod test;
mod message;
use crate::server::client::client_profile::Client;
use crate::server::utility;
use crate::server::server_profile::Server;
use std::collections::VecDeque;
use parking_lot::FairMutex;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::TcpStream;
use std::time::Duration;
use dashmap::DashMap;
pub enum Commands{
#[derive(Clone)]
pub enum ClientCommands{
Info,
Connect,
Connect(HashMap<String, String>),
Disconnect,
ClientUpdate,
ClientInfo,
ClientInfo(HashMap<String, String>),
Unknown,
}
pub enum OutboundCommands{
Client,
ClientRemove,
#[derive(Clone)]
pub enum ServerCommands{
Client(HashMap<String, String>),
ClientRemove(HashMap<String, String>),
Unknown,
}
enum InboundReturns{
Success,
Error,
}
enum OutboundReturns{
Success,
Error,
}
impl Commands{
pub fn execute(&self, client: &mut Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>){
impl ClientCommands{
pub fn execute(&self, client: &mut Client, server: &Server, buffer: &mut [u8; 1024], connected_clients: &Arc<Mutex<HashMap<String, Client>>>){
let stream = client.get_stream();
match *self{
Commands::Info => {
let server_details = info::get_server_info();
match &*self{
ClientCommands::Info => {
let server_details = server.get_info();
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &server_details);
client.transmit_success(&server_details);
},
Commands::Connect => {
connect::add_client(clients_ref, client);
ClientCommands::Connect(data) => {
connect::add_client(connected_clients, client);
let mut message = "!client: username:".to_string();
message.push_str(&client.get_username().to_string());
message.push_str(&" host:".to_string());
message.push_str(&client.get_address().to_string());
message.push_str(&" uuid:".to_string());
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
let new_client = ServerCommands::Client(data.clone());
server.update_all_clients(&new_client);
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
client.transmit_success(&String::from(""));
},
Commands::Disconnect => {
disconnect::remove_client(clients_ref, client);
ClientCommands::Disconnect => {
disconnect::remove_client(connected_clients, client);
let mut message = "!clientRemove: uuid:".to_string();
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
let mut data: HashMap<String, String> = HashMap::new();
data.insert("uuid".to_string(), client.get_uuid().to_string());
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
let old_client = ServerCommands::ClientRemove(data);
server.update_all_clients(&old_client);
client.transmit_success(&String::from(""));
client.disconnect();
println!("disconnected!");
},
Commands::ClientUpdate => {
let in_success = InboundReturns::Success;
let clients_hashmap = clients_ref.lock().unwrap();
ClientCommands::ClientUpdate => {
let clients_hashmap = connected_clients.lock().unwrap();
for (key, value) in clients_hashmap.iter(){
let formatted_data = client_update::format_client_data(&key, &value);
utility::transmit_data(&stream, &formatted_data);
client.transmit_data(&formatted_data);
in_success.execute(&stream, buffer, &formatted_data);
client.confirm_success(buffer, &formatted_data);
}
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
in_success.execute(&stream, buffer, &String::from("!success:"));
client.transmit_success(&String::from(""));
client.confirm_success(buffer, &String::from("!success:"));
},
Commands::ClientInfo => {
let requested_data = client_info::get_client_data(clients_ref, data);
utility::transmit_data(&stream, &requested_data);
ClientCommands::ClientInfo(data) => {
let requested_data = client_info::get_client_data(connected_clients, &data);
client.transmit_data(&requested_data);
},
Commands::Unknown => {
println!("Uknown Command!");
ClientCommands::Unknown => {
println!("Unknown Command");
},
}
}
}
impl OutboundCommands{
pub fn execute(&self, client: &Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>){
let stream = client.get_stream();
match *self{
OutboundCommands::Client => {
impl ServerCommands{
pub fn execute(&self, client: &mut Client, buffer: &mut [u8; 1024]){
match &*self{
ServerCommands::Client(data) => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" username:");
message.push_str(&data.get("username").unwrap());
message.push_str(&"!client: name:");
message.push_str(&data.get("name").unwrap());
message.push_str(&" host:");
message.push_str(&data.get("host").unwrap());
message.push_str(&" uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
client.transmit_data(&message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
client.confirm_success(buffer, &message);
},
OutboundCommands::ClientRemove => {
ServerCommands::ClientRemove(data) => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" uuid:");
message.push_str(&"!client: uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
client.transmit_data(&message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
client.confirm_success(buffer, &message);
},
OutboundCommands::Unknown => {
ServerCommands::Unknown => {
println!("Unknown Command!");
},
}
}
}
impl InboundReturns{
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &String){
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
match *self{
InboundReturns::Success => {
let mut failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
utility::transmit_data(stream, data);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
},
InboundReturns::Error => {},
}
}
}
impl OutboundReturns{
pub fn execute(&self, stream: &TcpStream, data: &String){
match *self{
OutboundReturns::Success => {
let mut message = "!success:".to_string();
if !data.is_empty(){
message.push_str(&" ".to_string());
message.push_str(&data.to_string());
}
utility::transmit_data(stream, &message);
},
OutboundReturns::Error => {},
}
}
}

View File

@ -1,5 +1,3 @@
pub mod client;
pub mod commands;
pub mod server_profile;
pub mod utility;

View File

@ -1,80 +1,55 @@
extern crate regex;
use crate::server::client::client_profile::Client;
use crate::server::commands::Commands;
use crate::server::utility;
use crate::server::commands::{ClientCommands, ServerCommands};
use rust_chat_server::ThreadPool;
//use crate::client_management::client_profile::Client;
//use crate::server::commands::Commands;
//use crate::server::commands::network;
use std::collections::VecDeque;
use std::net::TcpListener;
use std::net::{TcpStream, TcpListener};
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{unbounded, Sender, Receiver};
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::thread;
use regex::Regex;
pub struct Server{
name: String,
address: String,
author: String,
connected_clients: Arc<Mutex<HashMap<String,Client>>>,
message_queue: Arc<FairMutex<VecDeque<String>>>,
}
impl Server{
pub fn new(name: &String, address: &String, author: &String) -> Server{
let connected_clients: Arc<Mutex<HashMap<String,Client>>> = Arc::new(Mutex::new(HashMap::new()));
let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(VecDeque::new()));
pub fn new(name: &String, address: &String, author: &String, connected_clients: &Arc<Mutex<HashMap<String,Client>>>) -> Server{
Server{
name: name.to_string(),
address: address.to_string(),
author: author.to_string(),
connected_clients: connected_clients,
message_queue: message_queue,
connected_clients: Arc::clone(&connected_clients),
}
}
pub fn start(&self){
let listener = TcpListener::bind(self.address.clone()).unwrap();
let pool = ThreadPool::new(10);
//let connected_clients = Arc::new(Mutex::new(self.connected_clients.clone()));
//let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(self.message_queue.clone()));
let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
let (clock_tx, _) = (tx.clone(), rx.clone());
pub fn get_address(&self) -> &String{
&self.address
}
thread::spawn({
let connected_clients = Arc::clone(&self.connected_clients);
let message_queue = Arc::clone(&self.message_queue);
move || {
loop{
let online_clients = connected_clients.lock().unwrap().len();
if !message_queue.lock().is_empty(){
println!("message on queue detected");
let sync_group = Arc::new(Barrier::new(online_clients+1));
println!("sending to threads... {}",online_clients);
for _ in 0..online_clients{
println!("thread");
clock_tx.send(sync_group.clone()).unwrap();
}
println!("all threads updated!");
sync_group.wait();
println!("data removed");
message_queue.lock().pop_front();
sync_group.wait();
println!("clock finished!");
}
}
}
});
pub fn get_info(&self) -> String{
let mut server_details = "".to_string();
server_details.push_str(&"name:".to_string());
server_details.push_str(&self.name);
server_details.push_str(&" owner:".to_string());
server_details.push_str(&self.author);
server_details
}
pub fn establish_connection(&self, mut stream: TcpStream) -> Result<Client, bool>{
/*let listener = TcpListener::bind(self.address.clone()).unwrap();
let pool = ThreadPool::new(10);
//let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
//let (clock_tx, _) = (tx.clone(), rx.clone());
//stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
loop{
@ -82,43 +57,119 @@ impl Server{
println!("Connected: {}", addr);
let connected_clients_ref = Arc::clone(&self.connected_clients);
let message_queue_ref = Arc::clone(&self.message_queue);
let (_ , client_rx) = (tx.clone(), rx.clone());
let request = String::from("?request:");
self.transmit_data(&stream, &request);
pool.execute(move || {
let mut buffer = [0; 1024];
let request = String::from("?request:");
utility::transmit_data(&stream, &request);
stream.read(&mut buffer).unwrap();
let stream = Arc::new(stream);
pool.execute(move || {*/
let mut client_connection: Result<Client, bool> = Err(true);
let mut buffer = [0; 1024];
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
/*
* Change so that command is paassed in and then matches how to break the
* data up
*/
//let (uuid, username) = utility::extract_fields(&data);
let uuid = data.get("uuid").unwrap();
let username = data.get("username").unwrap();
let address = data.get("host").unwrap();
let request = String::from("?request:");
self.transmit_data(&stream, &request);
let mut client = Client::new(stream, &uuid, &username, &address);
command.execute(&mut client, &mut buffer, &data, &connected_clients_ref, &message_queue_ref);
client.handle_connection(&connected_clients_ref, &message_queue_ref, client_rx);
//process_connection(&stream, &clients_ref, &message_ref, &address.to_string(), client_rx);
}else{
//error
let mut stream = Arc::new(stream);
while client_connection.is_err(){
Arc::get_mut(&mut stream).unwrap().read(&mut buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let command = self.tokenize(&incoming_message);
client_connection = match command{
Ok(cmd) => {
match cmd{
ClientCommands::Connect(data) => {
//connecting = false;
let uuid = data.get("uuid").unwrap();
let username = data.get("name").unwrap();
let address = data.get("host").unwrap();
let stream = Arc::clone(&stream);
let mut client = Client::new(stream, &uuid, &username, &address);
client.connect(self, &self.connected_clients, &data);
//cmd.execute(&mut client, self, &mut buffer, &self.connected_clients);
Ok(client)
//client.handle_connection(self, &connected_clients_ref);
},
ClientCommands::Info => {
let server_details = self.get_info();
self.transmit_data(&stream, &server_details);
Err(true)
},
_ => {
println!("Invalid command!");
Err(true)
},
}
});
},
Err(e) => {
println!("{}", e);
Err(true)
},
};
}
client_connection
/*});
}
}*/
}
pub fn update_all_clients(&self, notification: &ServerCommands){
let clients = self.connected_clients.lock().unwrap();
for client in clients.values(){
let tx = client.get_transmitter();
tx.send(notification.clone()).unwrap();
}
}
fn transmit_data(&self, mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
pub fn tokenize(&self, incoming_message: &str) -> Result<ClientCommands, &'static str>{
let command_regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap();
if command_regex.is_match(incoming_message){
let command = self.match_command(&incoming_message.to_string());
let command = match command{
ClientCommands::Connect(mut addons) => {
self.regex_data(&command_regex, &incoming_message.replace("!connect: ", ""), &mut addons);
ClientCommands::Connect(addons)
},
ClientCommands::ClientInfo(mut addons) => {
self.regex_data(&command_regex, &incoming_message.replace("!clientInfo: ", ""), &mut addons);
ClientCommands::ClientInfo(addons)
},
_ => {
println!("no addons");
command
},
};
Ok(command)
} else {
Err("data did not match regex!")
}
}
fn match_command(&self, command: &String) -> ClientCommands{
match command{
_ if command.starts_with("!info:") => ClientCommands::Info,
_ if command.starts_with("!connect:") => ClientCommands::Connect(HashMap::new()),
_ if command.starts_with("!disconnect:") => ClientCommands::Disconnect,
_ if command.starts_with("!clientUpdate:") => ClientCommands::ClientUpdate,
_ if command.starts_with("!clientInfo:") => ClientCommands::ClientInfo(HashMap::new()),
_ => ClientCommands::Unknown,
}
}
fn regex_data(&self, command_regex: &Regex, data: &str, command_addons: &mut HashMap<String, String>){
for figure in command_regex.find_iter(data){
let segment = figure.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
command_addons.insert(contents[0].to_string(), contents[1].to_string());
}
}
}

View File

@ -1,77 +0,0 @@
use std::net::TcpStream;
use std::io::Write;
use std::collections::HashMap;
use regex::Regex;
use crate::server::commands::{Commands, OutboundCommands};
use std::sync::Arc;
use parking_lot::FairMutex;
use std::collections::VecDeque;
pub fn transmit_data(mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
pub fn tokenize(incoming_message: &str) -> HashMap<String, String>{
let mut data: HashMap<String, String> = HashMap::new();
for mat in Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap().find_iter(incoming_message){
if match match_command(&mat.as_str().to_string()) { Commands::Unknown => false, _ => true,} || match match_outbound_command(&mat.as_str().to_string()) { OutboundCommands::Unknown => false, _ => true,}{
data.insert("command".to_string(), mat.as_str().to_string());
}else{
let segment = mat.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
data.insert(contents[0].to_string(), contents[1].to_string());
}
}
data
}
pub fn match_command(command: &String) -> Commands{
match command.as_str(){
"!info:" => Commands::Info,
"!connect:" => Commands::Connect,
"!disconnect:" => Commands::Disconnect,
"!clientUpdate:" => Commands::ClientUpdate,
"!clientInfo:" => Commands::ClientInfo,
_ => Commands::Unknown,
}
}
pub fn match_outbound_command(command: &String) -> OutboundCommands{
match command.as_str(){
"!client:" => OutboundCommands::Client,
"!clientRemove:" => OutboundCommands::ClientRemove,
_ => OutboundCommands::Unknown,
}
}
pub fn format_data(message_queue: &Arc<FairMutex<VecDeque<String>>>) -> HashMap<String, String>{
//copy data from queue
let locked_message_queue = message_queue.lock();
let message = locked_message_queue.get(0).unwrap();
println!("msg: {}", message);
tokenize(&message)
}
pub fn extract_fields(data: &Vec<String>) -> (String, String){
let mut uuid = String::from("");
let mut username = String::from("");
for field in data{
if field.contains("uuid:"){
let contents: Vec<&str> = field.split(":").collect();
uuid.push_str(contents[1]);
}else if field.contains("username:"){
let contents: Vec<&str> = field.split(":").collect();
username.push_str(contents[1]);
}
}
(uuid, username)
}