Merge pull request #2 from Mitch161/channel-structure

Channel structure merge
This commit is contained in:
Mitch161 2020-08-06 12:46:26 +01:00 committed by GitHub
commit 6d3ffc5ae5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1175 additions and 457 deletions

4
.gitignore vendored
View File

@ -9,4 +9,6 @@ Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
.DS_Store
.idea
.idea
*.properties

View File

@ -15,6 +15,7 @@ crossbeam-utils = "0.7"
crossbeam-queue = "0.2"
dashmap = "3.11.4"
async-std = "1.6.2"
lazy_static = "1.4.0"
[profile.dev]
opt-level = 0

View File

@ -1,5 +1,5 @@
use std::thread;
use std::sync::mpsc;
use crossbeam::{unbounded , Sender, Receiver};
use std::sync::Arc;
use std::sync::Mutex;
@ -10,7 +10,7 @@ enum Message {
pub struct ThreadPool{
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
sender: Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
@ -26,7 +26,7 @@ impl ThreadPool{
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let (sender, receiver) = unbounded();
let receiver = Arc::new(Mutex::new(receiver));
@ -56,7 +56,7 @@ struct Worker {
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || {
loop{
let message = receiver.lock().unwrap().recv().unwrap();

View File

@ -1,17 +1,23 @@
//mod client_management;
#[macro_use]
extern crate lazy_static;
mod server;
use crate::server::client::client_profile::Client;
use crate::server::server_profile::Server;
use std::collections::HashMap;
use std::collections::VecDeque;
fn main(){
fn main(){
lazy_static!{
static ref SERVER_NAME: &'static str = "Server-01";
static ref SERVER_ADDRESS: &'static str = "0.0.0.0:6000";
static ref SERVER_AUTHOR: &'static str = "noreply@email.com";
static ref SERVER: Server<'static> = Server::new(&SERVER_NAME, &SERVER_ADDRESS, &SERVER_AUTHOR);
}
/*
let server_name = String::from("Server-01");
let server_address = String::from("0.0.0.0:6000");
let server_author = String::from("nope@live.co.uk");
let server_author = String::from("noreply@email.com");
*/
let server = Server::new(&server_name, &server_address, &server_author);
server.start();
//let server = Server::new(server_name, server_address, server_author);
SERVER.start();
}

View File

@ -1,114 +1,258 @@
extern crate regex;
use crate::server::commands::Commands;
use crate::server::utility;
use crate::server::server_profile::Server;
use crate::server::commands::{Commands};
use std::collections::VecDeque;
use std::net::{Shutdown, TcpStream};
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{Receiver, TryRecvError};
use std::sync::Arc;
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::time::Duration;
use std::io::Error;
use crossbeam::{Sender, Receiver, TryRecvError};
use crossbeam_channel::unbounded;
#[derive(Clone)]
pub struct Client{
pub struct Client<'a> {
connected: bool,
stream: Arc<TcpStream>,
uuid: String,
username: String,
address: String,
server: &'a Server<'a>,
tx_channel: Sender<Commands>,
rx_channel: Receiver<Commands>,
}
impl Client{
pub fn new(stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Client{
Client{
impl<'a> Client<'a> {
pub fn new(server: &'a Server<'static>, stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Self{
let (tx_channel, rx_channel): (Sender<Commands>, Receiver<Commands>) = unbounded();
Client {
connected: true,
stream: stream,
stream,
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
server,
tx_channel,
rx_channel,
}
}
pub fn get_stream(&self) -> &TcpStream{
#[allow(dead_code)]
fn get_stream(&self) -> &TcpStream{
&self.stream
}
#[allow(dead_code)]
pub fn get_transmitter(&self) -> &Sender<Commands>{
&self.tx_channel
}
#[allow(dead_code)]
pub fn get_uuid(&self) -> &String{
&self.uuid
}
#[allow(dead_code)]
pub fn get_username(&self) -> &String{
&self.username
}
#[allow(dead_code)]
pub fn get_address(&self) -> &String{
&self.address
}
pub fn handle_connection(&mut self){
self.stream.set_read_timeout(Some(Duration::from_millis(500))).unwrap();
//self.stream.set_nonblocking(true).expect("set_nonblocking call failed");
while self.connected {
match self.rx_channel.try_recv() {
/*command is on the channel*/
Ok(command) => {
let a = command.clone();
match command {
/*this might be useless*/
Commands::Info(Some(_params)) => {
self.transmit_data(a.to_string().as_str());
},
Commands::Disconnect(None) => {
},
Commands::ClientUpdate(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone()), (String::from("name"), self.username.clone()), (String::from("host"), self.address.clone())].iter().cloned().collect();
let command = Commands::Client(Some(data));
self.server.update_client(uuid.as_str(), &command);
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone()), (String::from("name"), self.username.clone()), (String::from("host"), self.address.clone())].iter().cloned().collect();
let command = Commands::Success(Some(data));
self.server.update_client(uuid.as_str(), &command);
},
Commands::ClientRemove(Some(params)) => {
let command = Commands::Client(Some(params));
self.transmit_data(command.to_string().as_str());
self.confirm_success();
},
Commands::Client(Some(_params)) => {
self.transmit_data(a.to_string().as_str());
self.confirm_success();
},
Commands::Success(_params) => {
self.transmit_data(a.to_string().as_str());
},
_ => {
println!("---Invalid Channel Command---");
},
}
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
/*
* if multiple commands are written to the stream before it reads, all the commands
* could be read at once, causing the program to ignore all commands after the firet
* one. Ethier make sure commands sent require a response before sending the next one
* or make a system to check for these issues.
*/
match self.read_data() {
Ok(command) => {
match command {
Commands::Info(None) => {
let params: HashMap<String, String> = [(String::from("name"), self.server.get_name()), (String::from("owner"), self.server.get_author())].iter().cloned().collect();
let command = Commands::Success(Some(params));
self.transmit_data(command.to_string().as_str());
},
Commands::Disconnect(None) => {
self.connected = false;
self.server.remove_client(self.uuid.as_str());
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
let params: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
let command = Commands::ClientRemove(Some(params));
self.server.update_all_clients(self.uuid.as_str(), command);
},
Commands::ClientUpdate(None) => {
let mut command = Commands::Success(None);
self.transmit_data(command.to_string().as_str());
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
command = Commands::ClientUpdate(Some(data));
self.server.update_all_clients(self.uuid.as_str(), command);
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
let command = Commands::ClientInfo(Some(data));
self.server.update_client(uuid.as_str(), &command);
},
Commands::Error(None) => {
},
_ => {
println!("---Invalid Command---");
},
}
},
Err(_) => {
//println!("no data read");
},
}
}
println!("---Thread Exit---");
}
pub fn transmit_data(&self, data: &str){
println!("Transmitting...");
println!("{} data: {}", self.uuid, data);
self.get_stream().write(data.to_string().as_bytes()).unwrap();
self.get_stream().flush().unwrap();
}
fn read_data(&self) -> Result<Commands, Error> {
let mut buffer = [0; 1024];
self.get_stream().read(&mut buffer)?;
let command = Commands::from(&buffer);
Ok(command)
}
fn confirm_success(&self){
//self.stream.set_nonblocking(false).expect("set_nonblocking call failed");
//self.stream.set_read_timeout(Some(Duration::from_millis(3000))).expect("set_read_timeout call failed");
match self.read_data() {
Ok(command) => {
match command {
Commands::Success(_params) => {
println!("Success Confirmed");
},
_ => {
let error = Commands::Error(None);
self.transmit_data(error.to_string().as_str());
},
}
},
Err(_) => {
println!("no success read");
let error = Commands::Error(None);
self.transmit_data(error.to_string().as_str());
},
}
//self.stream.set_read_timeout(None).expect("set_read_timeout call failed");
//self.stream.set_nonblocking(true).expect("set_nonblocking call failed");
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn disconnect(&mut self){
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
self.connected = false;
}
pub fn handle_connection(&mut self, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, client_rx: Receiver<Arc<Barrier>>){
//self.stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut buffer = [0; 1024];
while self.connected {
if !message_queue.lock().is_empty() {
match client_rx.try_recv(){
/*handle our data*/
Ok(sync_group) => {
println!("data present");
let data = utility::format_data(message_queue);
let command = utility::match_outbound_command(&data.get("command").unwrap());
println!("waiting 1");
sync_group.wait();
println!("executing");
command.execute(&self, &mut buffer, &data);
println!("waiting 2");
sync_group.wait();
println!("client updated");
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
}
match self.stream.peek(&mut buffer){
Ok(_) => {
//self.stream.lock().unwrap().read(&mut buffer).unwrap();
self.get_stream().read(&mut buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]);
//let data: Vec<String> = utility::tokenize(&incoming_message);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
println!("Request: {}", incoming_message);
//println!("Data: {:?}", data);
//let command = utility::match_command(&data[0]);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
println!("Error!");
} else {
println!("command executing...");
command.execute(self, &mut buffer, &data, clients_ref, message_queue);
}
},
Err(_) => {
println!("no data peeked");
},
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn transmit_success(&self, data: &String){
let mut success_message = "!success:".to_string();
if !data.is_empty(){
success_message.push_str(&" ".to_string());
success_message.push_str(&data.to_string());
}
println!("---thread exit---");
self.transmit_data(&success_message);
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
fn transmit_error(&self, data: &String){
let mut error_message = "!error:".to_string();
if !data.is_empty(){
error_message.push_str(&" ".to_string());
error_message.push_str(&data.to_string());
}
self.transmit_data(&error_message);
}
}

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
@ -27,4 +26,4 @@ pub fn get_client_data(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, data:
},
None => String::from("client not online"),
}
}
}*/

View File

@ -1,11 +1,5 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
/*use crate::server::client::client_profile::Client;
// send a list of all clients.
// client !clientupdate:
// server !client: name:"vobo" uuid:24653526-23464562-46346-3563563 host:"127.0.0.1"
// server !client: name:"bovo" uuid:24643526-23464562-46346-3563563 host:"127.0.0.1"
pub fn format_client_data(uuid: &String, client: &Client) -> String{
["!client: username:",client.get_username(), " uuid:", uuid, " host:\"", client.get_address(), "\""].concat()
}
}*/

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
@ -9,5 +8,6 @@ use dashmap::DashMap;
pub fn add_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
let uuid = client.get_uuid().to_string();
clients_hashmap.insert(uuid, client.clone());
//clients_hashmap.insert(uuid, client.clone());
}
*/

View File

@ -1,5 +1,4 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
@ -8,4 +7,4 @@ use std::collections::HashMap;
pub fn remove_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
clients_hashmap.remove(client.get_uuid()).unwrap();
}
}*/

View File

@ -1,12 +0,0 @@
pub fn get_server_info() -> String{
let mut server_details = "".to_string();
let server_name = String::from("Server-01");
let server_owner = String::from("mickyb18");
server_details.push_str(&"name:".to_string());
server_details.push_str(&server_name.to_string());
server_details.push_str(&" owner:".to_string());
server_details.push_str(&server_owner.to_string());
server_details
}

View File

@ -10,186 +10,187 @@ mod client;
mod test;
mod message;
use crate::server::client::client_profile::Client;
use crate::server::utility;
use std::collections::VecDeque;
use parking_lot::FairMutex;
use std::sync::Mutex;
use std::sync::Arc;
use std::string::ToString;
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::TcpStream;
use std::time::Duration;
use dashmap::DashMap;
use std::borrow::Borrow;
use regex::Regex;
use std::ops::Index;
pub enum Commands{
Info,
Connect,
Disconnect,
ClientUpdate,
ClientInfo,
Unknown,
#[derive(Clone, Debug)]
pub enum Commands {
Request(Option<HashMap<String, String>>),
Info(Option<HashMap<String, String>>),
Connect(Option<HashMap<String, String>>),
Disconnect(Option<HashMap<String, String>>),
ClientUpdate(Option<HashMap<String, String>>),
ClientInfo(Option<HashMap<String, String>>),
ClientRemove(Option<HashMap<String, String>>),
Client(Option<HashMap<String, String>>),
Success(Option<HashMap<String, String>>),
Error(Option<HashMap<String, String>>),
}
pub enum OutboundCommands{
Client,
ClientRemove,
Unknown,
}
enum InboundReturns{
Success,
Error,
}
enum OutboundReturns{
Success,
Error,
}
impl Commands{
pub fn execute(&self, client: &mut Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>){
let stream = client.get_stream();
match *self{
Commands::Info => {
let server_details = info::get_server_info();
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &server_details);
},
Commands::Connect => {
connect::add_client(clients_ref, client);
let mut message = "!client: username:".to_string();
message.push_str(&client.get_username().to_string());
message.push_str(&" host:".to_string());
message.push_str(&client.get_address().to_string());
message.push_str(&" uuid:".to_string());
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
impl Commands {
fn compare_params(&self, params: &Option<HashMap<String, String>>, other_params: &Option<HashMap<String, String>>) -> bool {
match (params, other_params) {
(None, Some(_other_params)) => false,
(Some(_params), None) => false,
(None, None) => true,
(Some(params), Some(other_params)) => {
let mut result = false;
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
},
Commands::Disconnect => {
disconnect::remove_client(clients_ref, client);
let mut message = "!clientRemove: uuid:".to_string();
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
client.disconnect();
println!("disconnected!");
},
Commands::ClientUpdate => {
let in_success = InboundReturns::Success;
let clients_hashmap = clients_ref.lock().unwrap();
for (key, value) in clients_hashmap.iter(){
let formatted_data = client_update::format_client_data(&key, &value);
utility::transmit_data(&stream, &formatted_data);
in_success.execute(&stream, buffer, &formatted_data);
}
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
in_success.execute(&stream, buffer, &String::from("!success:"));
},
Commands::ClientInfo => {
let requested_data = client_info::get_client_data(clients_ref, data);
utility::transmit_data(&stream, &requested_data);
},
Commands::Unknown => {
println!("Uknown Command!");
},
}
}
}
impl OutboundCommands{
pub fn execute(&self, client: &Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>){
let stream = client.get_stream();
match *self{
OutboundCommands::Client => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" username:");
message.push_str(&data.get("username").unwrap());
message.push_str(&" host:");
message.push_str(&data.get("host").unwrap());
message.push_str(&" uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
},
OutboundCommands::ClientRemove => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
},
OutboundCommands::Unknown => {
println!("Unknown Command!");
},
}
}
}
impl InboundReturns{
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &String){
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
match *self{
InboundReturns::Success => {
let mut failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
utility::transmit_data(stream, data);
},
_ => panic!("Fatal Error {}", e),
if params.len() == other_params.len() {
for (key, value) in params.iter() {
if let Some(other_value) = other_params.get(key) {
if value != other_value {
result = false;
break;
} else {
result = true;
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
}
}
result
},
InboundReturns::Error => {},
}
}
}
impl OutboundReturns{
pub fn execute(&self, stream: &TcpStream, data: &String){
match *self{
OutboundReturns::Success => {
let mut message = "!success:".to_string();
if !data.is_empty(){
message.push_str(&" ".to_string());
message.push_str(&data.to_string());
}
utility::transmit_data(stream, &message);
},
OutboundReturns::Error => {},
impl PartialEq for Commands {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Commands::Request(params), Commands::Request(other_params)) => self.compare_params(&params, &other_params),
(Commands::Info(params), Commands::Info(other_params)) => self.compare_params(&params, &other_params),
(Commands::Connect(params), Commands::Connect(other_params)) => self.compare_params(&params, &other_params),
(Commands::Disconnect(params), Commands::Disconnect(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientUpdate(params), Commands::ClientUpdate(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientInfo(params), Commands::ClientInfo(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientRemove(params), Commands::ClientRemove(other_params)) => self.compare_params(&params, &other_params),
(Commands::Client(params), Commands::Client(other_params)) => self.compare_params(&params, &other_params),
(Commands::Success(params), Commands::Success(other_params)) => self.compare_params(&params, &other_params),
(Commands::Error(params), Commands::Error(other_params)) => self.compare_params(&params, &other_params),
_ => false,
}
}
}
impl ToString for Commands {
fn to_string(&self) -> std::string::String {
let mut out_string = String::new();
let (command, parameters) = match self {
Commands::Request(arguments) => { ("!request:", arguments) },
Commands::Info(arguments) => { ("!info:", arguments) },
Commands::Connect(arguments) => { ("!connect:", arguments) },
Commands::Disconnect(arguments) => { ("!disconnect:", arguments) },
Commands::ClientUpdate(arguments) => { ("!clientUpdate:", arguments) },
Commands::ClientInfo(arguments) => { ("!clientInfo:", arguments) },
Commands::Client(arguments) => { ("!client:", arguments) },
Commands::Success(arguments) => { ("!success:", arguments) },
Commands::Error(arguments) => { ("!error:", arguments) },
_ => { ("!error:", &None) }
};
out_string.push_str(command);
if parameters.is_some() {
let hash_map = parameters.borrow().as_ref().unwrap();
for (k, v) in hash_map.iter() {
out_string.push_str(" ");
out_string.push_str(k.as_str());
out_string.push_str(":");
out_string.push_str(v.as_str())
}
}
out_string
}
}
impl From<&str> for Commands {
fn from(data: &str) -> Self {
let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap();
let mut iter = regex.find_iter(data);
let command = iter.next().unwrap().as_str();
println!("command: {:?}", command);
let mut map: HashMap<String, String> = HashMap::new();
for i in iter {
let parameter = i.as_str().to_string();
let parts:Vec<&str> = parameter.split(":").collect();
map.insert(parts.index(0).to_string(), parts.index(1).to_string());
}
let params = if map.capacity() > 0 {Some(map)} else { None };
match command {
"!request:" => Commands::Request(params),
"!info:" => Commands::Info(params),
"!connect:" => Commands::Connect(params),
"!disconnect:" => Commands::Disconnect(params),
"!clientUpdate:" => Commands::ClientUpdate(params),
"!clientInfo:" => Commands::ClientInfo(params),
"!client:" => Commands::Client(params),
"!clientRemove:" => Commands::ClientRemove(params),
"!success:" => Commands::Success(params),
"!error:" => Commands::Error(params),
_ => Commands::Error(params),
}
}
}
impl From<String> for Commands {
fn from(data: String) -> Self {
Commands::from(data.as_str())
}
}
impl From<&[u8; 1024]> for Commands {
fn from(data: &[u8; 1024]) -> Self {
let incoming_message = String::from(String::from_utf8_lossy(data));
Commands::from(incoming_message.as_str())
}
}
#[cfg(test)]
mod test_commands_v2 {
use super::Commands;
use std::collections::HashMap;
#[test]
fn test_creation_from_string() {
let command_result = Commands::from("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456");
()
}
#[test]
fn test_to_string() {
let mut a: HashMap<String, String> = HashMap::new();
a.insert("name".to_string(), "michael".to_string());
a.insert("host".to_string(), "127.0.0.1".to_string());
a.insert("uuid".to_string(), "123456-1234-1234-123456".to_string());
let command = Commands::Connect(Some(a));
println!("{:?}", command.to_string())
}
}

View File

@ -1,5 +1,3 @@
pub mod client;
pub mod commands;
pub mod server_profile;
pub mod utility;

View File

@ -1,124 +1,509 @@
extern crate regex;
use crate::server::client::client_profile::Client;
use crate::server::commands::Commands;
use crate::server::utility;
use crate::server::commands::{Commands};
use rust_chat_server::ThreadPool;
//use crate::client_management::client_profile::Client;
//use crate::server::commands::Commands;
//use crate::server::commands::network;
use std::collections::VecDeque;
use std::net::TcpListener;
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::net::{TcpStream, TcpListener};
use std::sync::{Arc, Mutex};
use crossbeam_channel::Sender;
use parking_lot::FairMutex;
use std::collections::HashMap;
use std::io::Error;
use dashmap::DashMap;
use std::io::prelude::*;
use std::thread;
use regex::Regex;
pub struct Server{
name: String,
address: String,
author: String,
connected_clients: Arc<Mutex<HashMap<String,Client>>>,
message_queue: Arc<FairMutex<VecDeque<String>>>,
pub struct Server<'z> {
name: &'z str,
address: &'z str,
author: &'z str,
connected_clients: Arc<Mutex<HashMap<String, Sender<Commands>>>>,
thread_pool: ThreadPool,
}
impl Server{
pub fn new(name: &String, address: &String, author: &String) -> Server{
let connected_clients: Arc<Mutex<HashMap<String,Client>>> = Arc::new(Mutex::new(HashMap::new()));
let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(VecDeque::new()));
Server{
name: name.to_string(),
address: address.to_string(),
author: author.to_string(),
connected_clients: connected_clients,
message_queue: message_queue,
// MARK: - server implemetation
impl<'z> Server<'z> {
pub fn new(name: &'z str, address: &'z str, author: &'z str) -> Self {
Self {
name: name,
address: address,
author: author,
connected_clients: Arc::new(Mutex::new(HashMap::new())),
thread_pool: ThreadPool::new(16),
}
}
pub fn start(&self){
let listener = TcpListener::bind(self.address.clone()).unwrap();
let pool = ThreadPool::new(10);
//let connected_clients = Arc::new(Mutex::new(self.connected_clients.clone()));
//let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(self.message_queue.clone()));
let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
let (clock_tx, _) = (tx.clone(), rx.clone());
pub fn get_name(&self) -> String{
self.name.to_string()
}
thread::spawn({
let connected_clients = Arc::clone(&self.connected_clients);
let message_queue = Arc::clone(&self.message_queue);
move || {
loop{
let online_clients = connected_clients.lock().unwrap().len();
if !message_queue.lock().is_empty(){
println!("message on queue detected");
let sync_group = Arc::new(Barrier::new(online_clients+1));
println!("sending to threads... {}",online_clients);
for _ in 0..online_clients{
println!("thread");
clock_tx.send(sync_group.clone()).unwrap();
pub fn get_address(&self) -> String{
self.address.to_string()
}
pub fn get_author(&self) -> String{
self.author.to_string()
}
pub fn start(&'static self) {
let listener = TcpListener::bind(self.get_address()).unwrap();
loop {
if let Ok((mut stream, addr)) = listener.accept() {
println!("Server: new connection, {}", addr);
let request = Commands::Request(None);
self.transmit_data(&stream, &request.to_string().as_str());
match self.read_data(&stream) {
Ok(command) => {
match command {
Commands::Connect(Some(data)) => {
let uuid = data.get("uuid").unwrap();
let username = data.get("name").unwrap();
let address = data.get("host").unwrap();
let stream = Arc::new(stream);
let mut client = Client::new(self, stream, &uuid, &username, &address);
let tx = client.get_transmitter();
let mut clients_hashmap = self.connected_clients.lock().unwrap();
clients_hashmap.insert(uuid.to_string(), tx.clone());
std::mem::drop(clients_hashmap);
let success = Commands::Success(None);
tx.send(success).unwrap();
self.thread_pool.execute(move || {
client.handle_connection();
});
let params: HashMap<String, String> = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect();
let new_client = Commands::Client(Some(params));
self.update_all_clients(uuid.as_str(), new_client);
},
Commands::Info(None) => {
let params: HashMap<String, String> = [(String::from("name"), self.name.to_string().clone()), (String::from("owner"), self.author.to_string().clone())].iter().cloned().collect();
let command = Commands::Success(Some(params));
self.transmit_data(&stream, command.to_string().as_str());
},
_ => {
println!("Invalid command!");
self.transmit_data(&stream, Commands::Error(None).to_string().as_str());
},
}
println!("all threads updated!");
sync_group.wait();
println!("data removed");
message_queue.lock().pop_front();
sync_group.wait();
println!("clock finished!");
}
},
Err(_) => println!("ERROR: stream closed"),
}
}
});
}
}
}
//stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
loop{
if let Ok((mut stream, addr)) = listener.accept(){
println!("Connected: {}", addr);
pub fn update_client(&self, uuid: &str, command: &Commands){
let clients = self.connected_clients.lock().unwrap();
let tx = clients.get(&uuid.to_string()).unwrap();
tx.send(command.clone()).unwrap();
}
let connected_clients_ref = Arc::clone(&self.connected_clients);
let message_queue_ref = Arc::clone(&self.message_queue);
let (_ , client_rx) = (tx.clone(), rx.clone());
pool.execute(move || {
let mut buffer = [0; 1024];
let request = String::from("?request:");
utility::transmit_data(&stream, &request);
stream.read(&mut buffer).unwrap();
let stream = Arc::new(stream);
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
/*
* Change so that command is paassed in and then matches how to break the
* data up
*/
//let (uuid, username) = utility::extract_fields(&data);
let uuid = data.get("uuid").unwrap();
let username = data.get("username").unwrap();
let address = data.get("host").unwrap();
let mut client = Client::new(stream, &uuid, &username, &address);
command.execute(&mut client, &mut buffer, &data, &connected_clients_ref, &message_queue_ref);
client.handle_connection(&connected_clients_ref, &message_queue_ref, client_rx);
//process_connection(&stream, &clients_ref, &message_ref, &address.to_string(), client_rx);
}else{
//error
}
});
pub fn update_all_clients(&self, uuid: &str, command: Commands){
let clients = self.connected_clients.lock().unwrap();
for (client_uuid, tx) in clients.iter() {
if uuid != client_uuid.to_string() {
tx.send(command.clone()).unwrap();
}
}
}
pub fn remove_client(&self, uuid: &str){
let mut clients = self.connected_clients.lock().unwrap();
clients.remove(&uuid.to_string());
}
fn transmit_data(&self, mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
/*
* This will throw an error and crash any thread, including the main thread, if
* the connection is lost before transmitting. Maybe change to handle any exceptions
* that may occur.
*/
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
fn read_data(&self, mut stream: &TcpStream) -> Result<Commands, Error> {
let mut buffer = [0; 1024];
stream.read(&mut buffer)?;
let command = Commands::from(&buffer);
Ok(command)
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn get_info(&self, tx: Sender<Commands>) {
let mut params: HashMap<String, String> = HashMap::new();
params.insert(String::from("name"), self.name.to_string().clone());
params.insert(String::from("owner"), self.author.to_string().clone());
let command = Commands::Info(Some(params));
tx.send(command).unwrap();
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
fn regex_data(&self, command_regex: &Regex, data: &str, command_addons: &mut HashMap<String, String>){
for figure in command_regex.find_iter(data){
let segment = figure.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
command_addons.insert(contents[0].to_string(), contents[1].to_string());
}
}
}
#[cfg(test)]
mod tests{
use super::*;
use std::{thread, time};
use std::sync::Once;
use std::time::Duration;
lazy_static!{
static ref SERVER_NAME: &'static str = "test";
static ref SERVER_ADDRESS: &'static str = "0.0.0.0:6000";
static ref SERVER_AUTHOR: &'static str = "test";
static ref SERVER: Server<'static> = Server::new(&SERVER_NAME, &SERVER_ADDRESS, &SERVER_AUTHOR);
}
static START: Once = Once::new();
/*
* These tests must be executed individually to ensure that no errors
* occur, this is due to the fact that the server is created everytime.
* Setup a system for the server to close after every test.
*/
fn setup_server(){
unsafe{
START.call_once(|| {
thread::spawn(|| {
SERVER.start();
});
});
let millis = time::Duration::from_millis(1000);
thread::sleep(millis);
}
}
fn establish_client_connection(uuid: &str) -> TcpStream {
let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
let mut command = read_data(&stream);
assert_eq!(command, Commands::Request(None));
let msg: String = format!("!connect: uuid:{uuid} name:\"{name}\" host:\"{host}\"", uuid=uuid, name="alice", host="127.0.0.1");
transmit_data(&stream, msg.as_str());
command = read_data(&stream);
assert_eq!(command, Commands::Success(None));
stream
}
fn transmit_data(mut stream: &TcpStream, data: &str){
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
fn read_data(mut stream: &TcpStream) -> Commands {
let mut buffer = [0; 1024];
match stream.read(&mut buffer) {
Ok(_) => Commands::from(&buffer),
Err(_) => Commands::Error(None),
}
}
fn force_disconnect(mut stream: &TcpStream){
let msg = "!disconnect:";
transmit_data(&stream, msg);
}
#[test]
fn test_server_connect(){
let mut buffer = [0; 1024];
setup_server();
let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
stream.read(&mut buffer).unwrap();
let mut command = Commands::from(&buffer);
assert_eq!(command, Commands::Request(None));
let msg = b"!connect: uuid:123456-1234-1234-123456 name:\"alice\" host:\"127.0.0.1\"";
stream.write(msg).unwrap();
stream.read(&mut buffer).unwrap();
command = Commands::from(&buffer);
assert_eq!(command, Commands::Success(None));
let msg = b"!disconnect:";
stream.write(msg).unwrap();
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
#[test]
fn test_server_info(){
let mut buffer = [0; 1024];
setup_server();
let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
let command = read_data(&stream);
assert_eq!(command, Commands::Request(None));
let msg = "!info:";
transmit_data(&stream, msg);
let command = read_data(&stream);
let params: HashMap<String, String> = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect();
assert_eq!(command, Commands::Success(Some(params)));
}
#[test]
fn test_client_info(){
setup_server();
let mut stream = establish_client_connection("1234-5542-2124-155");
let msg = "!info:";
transmit_data(&stream, msg);
let command = read_data(&stream);
let params: HashMap<String, String> = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect();
assert_eq!(command, Commands::Success(Some(params)));
let msg = "!disconnect:";
transmit_data(&stream, msg);
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
#[test]
fn test_clientUpdate_solo(){
setup_server();
let mut stream = establish_client_connection("1222-555-6-7");
let msg = "!clientUpdate:";
transmit_data(&stream, msg);
let command = read_data(&stream);
assert_eq!(command, Commands::Success(None));
let msg = "!disconnect:";
transmit_data(&stream, msg);
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
#[test]
fn test_clientUpdate_multi(){
setup_server();
let mut stream_one = establish_client_connection("0001-776-6-5");
let mut stream_two = establish_client_connection("0010-776-6-5");
let mut stream_three = establish_client_connection("0011-776-6-5");
let mut stream_four = establish_client_connection("0100-776-6-5");
let client_uuids: [String; 3] = [String::from("0010-776-6-5"), String::from("0011-776-6-5"), String::from("0100-776-6-5")];
let mut user_1 = true;
let mut user_2 = true;
let mut user_3 = true;
for uuid in client_uuids.iter() {
let command = read_data(&stream_one);
if *uuid == String::from("0010-776-6-5") && user_1 {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
user_1 = false;
} else if *uuid == String::from("0011-776-6-5") && user_2 {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
user_2 = false;
} else if *uuid == String::from("0100-776-6-5") && user_3 {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
user_3 = false;
} else {
assert!(false);
}
let msg = "!success:";
transmit_data(&stream_one, msg);
}
stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut unsuccessful = true;
while unsuccessful {
let msg = "!clientUpdate:";
transmit_data(&stream_one, msg);
let command = read_data(&stream_one);
match command.clone() {
Commands::Error(None) => println!("resending..."),
_ => {
assert_eq!(command, Commands::Success(None));
unsuccessful = false;
},
}
}
stream_one.set_read_timeout(None).unwrap();
for x in 0..3 {
let command = read_data(&stream_one);
let command_clone = command.clone();
match command{
Commands::Client(Some(params)) => {
let uuid = params.get("uuid").unwrap();
if *uuid == String::from("0010-776-6-5") {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command_clone, Commands::Client(Some(params)));
} else if *uuid == String::from("0011-776-6-5") {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command_clone, Commands::Client(Some(params)));
} else if *uuid == String::from("0100-776-6-5") {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command_clone, Commands::Client(Some(params)));
} else {
assert!(false);
}
},
_ => assert!(false),
}
let msg = "!success:";
transmit_data(&stream_one, msg);
}
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
let msg = "!disconnect:";
transmit_data(&stream_one, msg);
transmit_data(&stream_two, msg);
transmit_data(&stream_three, msg);
transmit_data(&stream_four, msg);
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
#[test]
fn test_clientInfo(){
setup_server();
let mut stream_one = establish_client_connection("0001-776-6-5");
let mut stream_two = establish_client_connection("\"0010-776-6-5\"");
let command = read_data(&stream_one);
let params: HashMap<String, String> = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
let msg = "!success:";
transmit_data(&stream_one, msg);
stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut unsuccessful = true;
while unsuccessful {
let msg = "!clientInfo: uuid:\"0010-776-6-5\"";
transmit_data(&stream_one, msg);
let command = read_data(&stream_one);
match command.clone() {
Commands::Error(None) => println!("resending..."),
_ => {
let params: HashMap<String, String> = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Success(Some(params)));
unsuccessful = false;
},
}
}
stream_one.set_read_timeout(None).unwrap();
let msg = "!disconnect:";
transmit_data(&stream_one, msg);
transmit_data(&stream_two, msg);
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
#[test]
fn test_client_disconnect(){
let mut tmp_buffer = [0; 1024];
setup_server();
let mut stream_one = establish_client_connection("0001-776-6-5");
let mut stream_two = establish_client_connection("0010-776-6-5");
let command = read_data(&stream_one);
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
let msg = "!success:";
transmit_data(&stream_one, msg);
let msg = "!disconnect:";
transmit_data(&stream_two, msg);
let command = read_data(&stream_one);
let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5"))].iter().cloned().collect();
assert_eq!(command, Commands::Client(Some(params)));
let msg = "!success:";
transmit_data(&stream_one, msg);
stream_one.set_read_timeout(Some(Duration::from_millis(2000))).unwrap();
match stream_one.peek(&mut tmp_buffer) {
Ok(_) => assert!(false),
Err(_) => assert!(true),
}
stream_one.set_read_timeout(None).unwrap();
let msg = "!disconnect:";
transmit_data(&stream_one, msg);
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
}

View File

@ -1,77 +0,0 @@
use std::net::TcpStream;
use std::io::Write;
use std::collections::HashMap;
use regex::Regex;
use crate::server::commands::{Commands, OutboundCommands};
use std::sync::Arc;
use parking_lot::FairMutex;
use std::collections::VecDeque;
pub fn transmit_data(mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
pub fn tokenize(incoming_message: &str) -> HashMap<String, String>{
let mut data: HashMap<String, String> = HashMap::new();
for mat in Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap().find_iter(incoming_message){
if match match_command(&mat.as_str().to_string()) { Commands::Unknown => false, _ => true,} || match match_outbound_command(&mat.as_str().to_string()) { OutboundCommands::Unknown => false, _ => true,}{
data.insert("command".to_string(), mat.as_str().to_string());
}else{
let segment = mat.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
data.insert(contents[0].to_string(), contents[1].to_string());
}
}
data
}
pub fn match_command(command: &String) -> Commands{
match command.as_str(){
"!info:" => Commands::Info,
"!connect:" => Commands::Connect,
"!disconnect:" => Commands::Disconnect,
"!clientUpdate:" => Commands::ClientUpdate,
"!clientInfo:" => Commands::ClientInfo,
_ => Commands::Unknown,
}
}
pub fn match_outbound_command(command: &String) -> OutboundCommands{
match command.as_str(){
"!client:" => OutboundCommands::Client,
"!clientRemove:" => OutboundCommands::ClientRemove,
_ => OutboundCommands::Unknown,
}
}
pub fn format_data(message_queue: &Arc<FairMutex<VecDeque<String>>>) -> HashMap<String, String>{
//copy data from queue
let locked_message_queue = message_queue.lock();
let message = locked_message_queue.get(0).unwrap();
println!("msg: {}", message);
tokenize(&message)
}
pub fn extract_fields(data: &Vec<String>) -> (String, String){
let mut uuid = String::from("");
let mut username = String::from("");
for field in data{
if field.contains("uuid:"){
let contents: Vec<&str> = field.split(":").collect();
uuid.push_str(contents[1]);
}else if field.contains("username:"){
let contents: Vec<&str> = field.split(":").collect();
username.push_str(contents[1]);
}
}
(uuid, username)
}

View File

@ -0,0 +1,61 @@
use std::string::ToString;
use std::sync::{Arc, Mutex, Weak};
use std::net::TcpStream;
use crate::server_v2::Serverv2;
use std::sync::mpsc::{Receiver, Sender, channel, TryRecvError};
use crate::server_v2::commands_v2::Commandsv2;
#[derive(Clone)]
pub struct ClientV2 {
pub uuid: String,
pub username: String,
pub address: String,
stream: Arc<Mutex<TcpStream>>,
server_reference: Weak<Serverv2>,
tx: Sender<Commandsv2>,
rx: Receiver<Commandsv2>,
}
impl ClientV2 {
pub fn new(stream: Arc<Mutex<TcpStream>>, server: Arc<Serverv2>, uuid: &String, username: &String, address: &String) -> ClientV2 {
let (tx, rx) = channel();
ClientV2 {
stream: stream,
server_reference: Arc::downgrade(&server),
tx,
rx,
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
}
}
pub fn run(&self) {
loop {
match self.rx.try_recv() {
Ok(Command) => {
}
Err(TryRecvError::Empty) => { }
Err(TryRecvError::Disconnected) => {
}
}
}
}
pub fn get_tx(&self) -> Sender<Commandsv2> {
self.tx.clone()
}
}

View File

@ -0,0 +1,107 @@
use std::collections::HashMap;
use std::borrow::Borrow;
use regex::Regex;
use std::ops::Index;
pub enum Commands {
Request(Option<HashMap<String, String>>),
Info(Option<HashMap<String, String>>),
Connect(Option<HashMap<String, String>>),
Disconnect(Option<HashMap<String, String>>),
ClientUpdate(Option<HashMap<String, String>>),
ClientInfo(Option<HashMap<String, String>>),
ClientRemove(Option<HashMap<String, String>>),
Client(Option<HashMap<String, String>>),
Success(Option<HashMap<String, String>>),
Error(Option<HashMap<String, String>>),
}
impl Commands {
pub fn to_String(&self) -> String {
let mut out_string = String::new();
let (command, parameters) = match self {
Commands::Info(arguments) => { ("!info:", arguments) },
Commands::Connect(arguments) => { ("!connect:", arguments) },
Commands::Disconnect(arguments) => { ("!disconnect:", arguments) },
Commands::ClientUpdate(arguments) => { ("!clientUpdate:", arguments) },
Commands::ClientInfo(arguments) => { ("!clientInfo:", arguments) },
Commands::Error(arguments) => { ("!error:", arguments) },
_ => { ("!error:", &None) }
};
out_string.push_str(command);
if parameters.is_some() {
let hash_map = parameters.borrow().as_ref().unwrap();
for (k, v) in hash_map.iter() {
out_string.push_str(" ");
out_string.push_str(k.as_str());
out_string.push_str(":");
out_string.push_str(v.as_str())
}
}
out_string
}
pub fn from_string(data: &str) -> Result<Commandsv2, &'static str> {
let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap();
let mut iter = regex.find_iter(data);
let command = iter.next().unwrap().as_str();
println!("command: {:?}", command);
let mut map: HashMap<String, String> = HashMap::new();
for i in iter {
let parameter = i.as_str().to_string();
let mut parts:Vec<&str> = parameter.split(":").collect();
map.insert(parts.index(0).to_string(), parts.index(1).to_string());
}
let params = if map.capacity() > 1 {Some(map)} else { None };
match command {
"!info:" => Ok(Commands::Info(params)),
"!connect:" => Ok(Commands::Connect(params)),
"!clientInfo:" => Ok(Commands::ClientInfo(params)),
"!clientUpdate:" => Ok(Commands::ClientUpdate(params)),
"!disconnect:" => Ok(Commands::Disconnect(params)),
"!error:" => Ok(Commands::Error(params)),
_ => { Err("NOT IMPLEMENTED") }
}
}
}
#[cfg(test)]
mod test_commands_v2 {
use crate::server_v2::commands_v2::Commandsv2;
use std::collections::HashMap;
#[test]
fn test_creation_from_string() {
let command_result = Commandsv2::from_string("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456");
assert!(command_result.is_ok(), true);
let command = command_result.unwrap_or(Commandsv2::Error(None));
()
}
#[test]
fn test_to_string() {
let mut a: HashMap<String, String> = HashMap::new();
a.insert("name".to_string(), "michael".to_string());
a.insert("host".to_string(), "127.0.0.1".to_string());
a.insert("uuid".to_string(), "123456-1234-1234-123456".to_string());
let command = Commandsv2::Connect(Some(a));
println!("{:?}", command.to_String())
}
}

110
src/server_v2/mod.rs Normal file
View File

@ -0,0 +1,110 @@
pub mod client_v2;
pub mod commands_v2;
use client_v2::ClientV2;
use std::{
collections::{HashMap, VecDeque},
io,
thread,
sync::{
mpsc::{channel, Sender, Receiver},
Arc,
Mutex,
},
ops::Deref,
borrow::Borrow,
time::Duration,
net::TcpListener,
io::Read,
};
use crate::server_v2::commands_v2::Commandsv2;
use crate::lib::ThreadPool;
use std::sync::mpsc::TryRecvError;
use crate::server_v2::commands_v2::Commandsv2::Disconnect;
enum server_message {
start,
stop,
kick(String),
}
pub struct Serverv2 {
name: String,
host: String,
owner: String,
rx: Arc<Mutex<Receiver<server_message>>>,
tx: Arc<Mutex<Sender<server_message>>>,
connected_clients: Arc<Mutex<HashMap<String, ClientV2>>>,
thread_pool: ThreadPool,
}
impl Serverv2 {
pub fn new(name: String, host: String, owner: String) -> Serverv2 {
let (tx,rx) = channel();
Serverv2 {
name,
host,
owner,
rx: Arc::new(Mutex::new(rx)),
tx: Arc::new(Mutex::new(tx)),
connected_clients: Arc::new(Mutex::new(HashMap::new())),
thread_pool: ThreadPool::new(16)
}
}
pub fn start(&self) -> Result<(), io::Error> {
let listener = TcpListener::bind("0.0.0.0:6001")?;
// accepting clients
thread::spawn(move || {
match rx.lock().unwrap().try_recv() {
Ok(a) => {}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
self.connected_clients.lock()
.unwrap()
.iter()
.map(|(id, client)| {
let tx = client.get_tx();
tx.send(Disconnect(None));
});
}
}
});
Ok(())
}
pub fn stop(&self) {
}
pub fn add_client(&self, client: ClientV2) -> Result<(), &str> {
let mut client_map = self.connected_clients.lock().unwrap();
if client_map.contains_key(client.uuid.as_str()) {
return Err("!exists:");
}
client_map.insert(client.uuid.to_string(), client);
self.thread_pool.execute(|| {client.run()});
Ok(())
}
pub fn get_tx(&self, mesage: server_message) {
self.tx.clone();
}
fn log(mesaage: &str) {
println!("Server: {}", mesaage);
}
}