Merge pull request #4 from Mitch161/cmd--optimization

ref-method and command merge
This commit is contained in:
Mitch161 2020-08-19 22:29:43 +01:00 committed by GitHub
commit dc54a5d952
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 556 additions and 484 deletions

3
.gitignore vendored
View File

@ -11,4 +11,5 @@ Cargo.lock
.DS_Store
.idea
*.properties
*.properties
.vscode/launch.json

View File

@ -1,6 +1,6 @@
[package]
name = "rust-chat-server"
version = "0.1.0"
version = "0.1.5"
authors = ["Mitchell <mitchellhardie1@gmail.com>"]
edition = "2018"
@ -17,14 +17,13 @@ dashmap = "3.11.4"
async-std = "1.6.2"
lazy_static = "1.4.0"
rayon = "1.3.1"
diesel = { version = "1.4.5", features = ["sqlite"] }
zeroize = "1.1.0"
cursive = { version = "0.15.0", default-features = false, features = ["crossterm-backend"]}
crossterm = "0.17.7"
clap = "3.0.0-beta.1"
log = "0.4"
[profile.dev]
opt-level = 0

View File

@ -1,17 +1,10 @@
use std::{
net::TcpStream,
io::{Write, Read}
};
use std::{net::TcpStream, io::{Write, Read}, io};
use crate::{
server::client::client_profile::Client,
commands::Commands,
};
use zeroize::Zeroize;
use std::time::Duration;
use async_std::net::SocketAddrV4;
use std::str::FromStr;
use std::net::SocketAddr;
use zeroize::Zeroize;
pub struct ClientApi {
socket: TcpStream,
@ -22,44 +15,52 @@ pub struct ClientApi {
}
impl ClientApi {
pub fn new(addr: &str) -> Self {
let socket = TcpStream::connect(addr).expect("connection failed");
pub fn new(addr: &str) -> Result<Self, io::Error> {
let socket = TcpStream::connect(addr)?;
let on_add = |_client: Client| {println!("Client_api: Client added {:?}", _client)};
let on_remove = |_uuid: String| {println!("Client_api: Client removed {}", _uuid)};
Self {
let a = Self {
socket,
addr: addr.to_string(),
on_client_add_handle: on_add,
on_client_remove_handle: on_remove,
}
};
Ok(a)
}
pub fn set_on_client_add(&mut self, Fn: fn(Client) -> ()) {
self.on_client_add_handle = Fn;
pub fn set_on_client_add(&mut self, func: fn(Client) -> ()) {
self.on_client_add_handle = func;
}
pub fn set_on_client_removed(&mut self, Fn: fn(String) -> ()) {
self.on_client_remove_handle = Fn;
pub fn set_on_client_removed(&mut self, func: fn(String) -> ()) {
self.on_client_remove_handle = func;
}
pub fn get_info(host: &str) -> Option<Commands> {
pub fn get_info(host: &str) -> Result<Commands, io::Error> {
let mut buffer: [u8; 1024] = [0; 1024];
let addr = SocketAddr::from_str(host).ok()?;
let mut stream = TcpStream::connect_timeout(&addr, Duration::from_millis(500)).ok()?;
stream.read(&mut buffer).ok()?;
let addr = host.parse().unwrap();
let mut stream = TcpStream::connect_timeout(&addr, Duration::from_millis(1000))?;
let _ = stream.read(&mut buffer)?;
println!("data recieved: {:?}", &buffer[0..20]);
match Commands::from(&mut buffer) {
Commands::Request(None) => {
stream.write_all(Commands::Info(None).to_string().as_bytes()).unwrap();
stream.read(&mut buffer).ok()?;
Some(Commands::from(String::from(String::from_utf8_lossy(&buffer))))
println!("zeroing");
buffer.zeroize();
println!("writing");
let sending_command = Commands::Info(None).to_string();
println!("sending string: {:?} as_bytes: {:?}", &sending_command, &sending_command.as_bytes());
stream.write_all(sending_command.as_bytes())?;
stream.flush()?;
println!("reading");
let bytes = stream.read(&mut buffer)?;
println!("new buffer size: {:?} contents: {:?}", bytes, &buffer[0..20]);
println!("commanding");
Ok(Commands::from(String::from(String::from_utf8_lossy(&buffer))))
},
_ => {
None
Err(io::Error::new(io::ErrorKind::InvalidData, "the data was not expected"))
}
}
}

85
src/commands/behaviors.rs Normal file
View File

@ -0,0 +1,85 @@
struct Request {}
struct Info {}
struct Connect {}
struct Disconnect {}
struct ClientUpdate {}
struct ClientInfo {}
struct ClientRemove {}
struct Client {}
struct Success {}
struct Error {}
trait ClientRunnables {
fn client_execution(client: &Client);
}
impl Runnables for Request {
fn run() {
}
}
impl ClientRunnables for Info {
fn client_execution(client: &Client) {
let params = client.get_server_info();
let command = Commands::Success(Some(params));
client.transmit_data(command.to_string().as_str());
}
}
impl Runnables for Connect {
fn run() {
}
}
impl Runnables for Disconnect {
fn run() {
}
}
impl ClientRunnables for ClientUpdate {
fn client_execution(client: &Client) {
let mut command = Commands::Success(None);
client.transmit_data(command.to_string().as_str());
let data: HashMap<String, String> = [(String::from("uuid"), client.get_uuid())].iter().cloned().collect();
let command = Commands::ClientUpdate(Some(data));
self.server.update_all_clients(self.uuid.as_str(), command);
}
}
impl Runnables for ClientInfo {
fn run() {
}
}
impl Runnables for ClientRemove {
fn run() {
}
}
impl Runnables for Client {
fn run() {
}
}
impl Runnables for Success {
fn run() {
}
}
impl Runnables for Error {
fn run() {
}
}

View File

@ -1,16 +1,45 @@
use std::string::ToString;
use std::collections::HashMap;
use dashmap::DashMap;
use std::str::FromStr;
use std::borrow::Borrow;
use regex::Regex;
use std::ops::Index;
use log::info;
use zeroize::Zeroize;
//use dashmap::DashMap;
#[derive(Clone, Debug)]
pub enum Commands {
/* TODO: this is the new commands system but still needs work.
* Will be fixed soon, but continue with old version at the
* moment.
*
// Common fields:
executable: T,
params: Option<HashMap<String, String>>,
// Variants:
Request {},
Info {},
Connect {},
Disconnect {},
ClientUpdate {},
ClientInfo {},
ClientRemove {},
Client {},
Success {},
Error {},
*/
Request(Option<HashMap<String, String>>),
Info(Option<HashMap<String, String>>),
HeartBeat(Option<HashMap<String, String>>),
Connect(Option<HashMap<String, String>>),
Disconnect(Option<HashMap<String, String>>),
@ -23,7 +52,25 @@ pub enum Commands {
Error(Option<HashMap<String, String>>),
}
#[derive(Debug)]
pub enum CommandParseError {
UnknownCommand,
NoString,
}
/*trait Operations {
fn execute(&self);
}*/
impl Commands {
/*fn get_executable(&self) -> &T {
self.executable
}
fn get_params(&self) -> &Option<HashMap<String,String>> {
self.params
}*/
fn compare_params(&self, params: &Option<HashMap<String, String>>, other_params: &Option<HashMap<String, String>>) -> bool {
match (params, other_params) {
(None, Some(_other_params)) => false,
@ -51,6 +98,12 @@ impl Commands {
}
}
/*impl<T> Operations for Commands<T> {
fn execute(&self) {
self.executable.run();
}
}*/
impl PartialEq for Commands {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
@ -67,9 +120,9 @@ impl PartialEq for Commands {
_ => false,
}
}
}
impl ToString for Commands {
fn to_string(&self) -> std::string::String {
@ -78,14 +131,15 @@ impl ToString for Commands {
let (command, parameters) = match self {
Commands::Request(arguments) => { ("!request:", arguments) },
Commands::Info(arguments) => { ("!info:", arguments) },
Commands::HeartBeat(arguments) => {("!heartbeat:", arguments)},
Commands::Connect(arguments) => { ("!connect:", arguments) },
Commands::Disconnect(arguments) => { ("!disconnect:", arguments) },
Commands::ClientUpdate(arguments) => { ("!clientUpdate:", arguments) },
Commands::ClientInfo(arguments) => { ("!clientInfo:", arguments) },
Commands::ClientRemove(arguments) => { ("!clientRemove", arguments) }
Commands::Client(arguments) => { ("!client:", arguments) },
Commands::Success(arguments) => { ("!success:", arguments) },
Commands::Error(arguments) => { ("!error:", arguments) },
_ => { ("!error:", &None) }
};
out_string.push_str(command);
@ -96,21 +150,33 @@ impl ToString for Commands {
out_string.push_str(" ");
out_string.push_str(k.as_str());
out_string.push_str(":");
out_string.push_str(v.as_str())
if v.contains(":") {
out_string.push_str(format!("\"{}\"",v.as_str()).as_str())
} else {
out_string.push_str(v.as_str());
}
}
}
out_string
}
}
impl From<&str> for Commands {
fn from(data: &str) -> Self {
let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap();
let mut iter = regex.find_iter(data);
let command = iter.next().unwrap().as_str();
impl FromStr for Commands {
type Err = CommandParseError;
println!("command: {:?}", command);
fn from_str(data: &str) -> std::result::Result<Self, Self::Err> {
let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9@\-\+\[\]{}_=/.]+|("(.*?)")+)"###).unwrap();
let mut iter = regex.find_iter(data);
let command_opt = iter.next();
if command_opt.is_none() {
return Err(CommandParseError::NoString);
}
let command = command_opt.unwrap().as_str();
println!("command parsed to: {:?}", command);
let mut map: HashMap<String, String> = HashMap::new();
@ -123,10 +189,12 @@ impl From<&str> for Commands {
let params = if map.capacity() > 0 {Some(map)} else { None };
match command {
Ok(match command {
"!request:" => Commands::Request(params),
"!info:" => Commands::Info(params),
"!heartbeat:" => Commands::HeartBeat(params),
"!connect:" => Commands::Connect(params),
"!disconnect:" => Commands::Disconnect(params),
@ -138,45 +206,42 @@ impl From<&str> for Commands {
"!success:" => Commands::Success(params),
"!error:" => Commands::Error(params),
_ => Commands::Error(params),
}
_ => Commands::Error(None),
})
}
}
impl From<String> for Commands {
fn from(data: String) -> Self {
Commands::from(data.as_str())
if let Ok(data) = data.as_str().parse() {
data
} else {
info!("Command: failed to parse with");
Commands::Error(None)
}
}
}
/*impl From<&[u8; 1024]> for Commands {
fn from(data: &[u8; 1024]) -> Self {
let incoming_message = String::from(String::from_utf8_lossy(data));
data.zeroize();
Commands::from(incoming_message.as_str())
}
}*/
impl From<&mut [u8; 1024]> for Commands {
fn from(data: &mut [u8; 1024]) -> Self {
let incoming_message = String::from(String::from_utf8_lossy(data));
data.zeroize();
Commands::from(incoming_message.as_str())
Commands::from(incoming_message)
}
}
// TODO: check if unit tests still work
/*#[cfg(test)]
mod test_commands_v2 {
#![feature(test)]
//extern crate test;
use super::Commands;
use std::collections::HashMap;
use test::Bencher;
use std::str::FromStr;
use super::CommandParseError;
#[test]
fn test_creation_from_string() {
let command_result = Commands::from("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456");
()
let command_result = Commands::from_str("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456");
}
#[test]
@ -191,9 +256,4 @@ mod test_commands_v2 {
println!("{:?}", command.to_string())
}
#[bench]
fn benchmark(b: &mut Bencher) {
b.iter(|| Commands::from("!connect: host:192.168.0.1 name:\"michael-bailey\" uuid:123456-1234-1234-123456"))
}
}*/

View File

@ -4,23 +4,24 @@ extern crate lazy_static;
mod client_api;
mod commands;
mod server;
mod lib;
use crate::server::server_profile::Server;
use client_api::ClientApi;
use crossterm::ErrorKind;
use cursive::{
Cursive,
menu::*,
event::Key,
views::{ Dialog, TextView, LinearLayout, ListView, ResizedView, Panel },
Rect,
CursiveExt,
align::{Align, HAlign},
align::Align,
view::SizeConstraint,
};
use std::sync::Arc;
//use std::sync::Arc;
use std::time::Duration;
use crossterm::ErrorKind;
use log::info;
use clap::{App, Arg};
use crate::server::server_profile::Server;
fn main() -> Result<(), ErrorKind> {
lazy_static!{
@ -29,52 +30,65 @@ fn main() -> Result<(), ErrorKind> {
static ref SERVER_AUTHOR: &'static str = "noreply@email.com";
static ref SERVER: Server<'static> = Server::new(&SERVER_NAME, &SERVER_ADDRESS, &SERVER_AUTHOR);
}
//let server = Server::new("Server-01", "0.0.0.0:6000", "noreply@email.com");
/*server_name: &'static str = "Server-01";
server_address: &'static str = "0.0.0.0:6000";
server_author: &'static str = "noreply@email.com";
let server = Server::new(&server_name, &server_address, &server_author);*/
/*let server_arc = Arc::new(SERVER);
let s1 = server_arc.clone();
let s2 = s1.clone();*/
cursive::logger::init();
let args = App::new("--rust chat server--")
.version("0.1.5")
.author("Mitchel Hardie <mitch161>, Michael Bailey <michael-bailey>")
.about("this is a chat server developed in rust, depending on the version one of two implementations will be used")
.arg(Arg::with_name("graphical")
.short('g')
.takes_value(false)
.about("Enables graphical mode"))
.get_matches();
info!("Main: init Display");
let mut Display = Cursive::default();
if args.is_present("graphical") {
//let server = Server::new("Server-01", "0.0.0.0:6000", "noreply@email.com");
//let server_arc = Arc::new(server);
//let s1 = server_arc.clone();
//let s2 = s1.clone();
info!("Main: setting up callbacks");
Display.add_global_callback(Key::Backspace, |s| s.quit());
Display.add_global_callback(Key::Tab, |s| s.toggle_debug_console());
Display.add_global_callback(Key::Esc, |s| s.select_menubar());
cursive::logger::init();
info!("Main: setting up menu bar");
let _ = Display.menubar()
.add_subtree("Server",
MenuTree::new()
.leaf("About",
|s| s.add_layer(About()))
.delimiter()
.leaf("quit", |s| s.quit()))
.add_subtree("File",
MenuTree::new()
.leaf("Start", move |s| {SERVER.start();})
.leaf("Stop", move |s| {SERVER.stop();})
.delimiter()
.leaf("Debug", |s| {s.toggle_debug_console();}));
info!("Main: entering loop");
Display.add_layer(Control_Panel());
Display.run();
Ok(())
info!("Main: init display");
let mut display = Cursive::default();
info!("Main: setting up callbacks");
display.add_global_callback(Key::Backspace, |s| s.quit());
display.add_global_callback(Key::Tab, |s| s.toggle_debug_console());
display.add_global_callback(Key::Esc, |s| s.select_menubar());
info!("Main: setting up menu bar");
let _ = display.menubar()
.add_subtree("Server",
MenuTree::new()
.leaf("about",
|s| s.add_layer(about()))
.delimiter()
.leaf("quit", |s| s.quit()))
.add_subtree("File",
MenuTree::new()
.leaf("Start", move |_s| {let _ = SERVER.start();})
.leaf("Stop", move |_s| {let _ = SERVER.stop();})
.delimiter()
.leaf("Debug", |s| {s.toggle_debug_console();}));
info!("Main: entering loop");
display.add_layer(control_panel());
display.run();
Ok(())
} else {
//let server = Server::new("Server-01", "0.0.0.0:6000", "noreply@email.com");
SERVER.start()?;
loop {std::thread::sleep(Duration::from_secs(1));}
}
}
fn About() -> Dialog {
fn about() -> Dialog {
Dialog::new()
.content(TextView::new("Rust-Chat-Server\nmade by\n Mitchell Hardie\nMichael Bailey\nMit Licence")
).button("Close", |s| {let _ = s.pop_layer(); s.add_layer(Control_Panel())} )
).button("Close", |s| {let _ = s.pop_layer(); s.add_layer(control_panel())} )
}
fn Launch_screen() -> Dialog {
fn launch_screen() -> Dialog {
Dialog::new()
.content(TextView::new("\
Server.
@ -85,7 +99,7 @@ fn Launch_screen() -> Dialog {
.button("ok", |s| {s.pop_layer();})
}
fn Control_Panel() -> ResizedView<Panel<LinearLayout>> {
fn control_panel() -> ResizedView<Panel<LinearLayout>> {
let mut root = LinearLayout::horizontal();
let mut left = LinearLayout::vertical();
@ -104,14 +118,14 @@ fn Control_Panel() -> ResizedView<Panel<LinearLayout>> {
}
// MARK: - general testing zone
/*#[cfg(test)]
#[cfg(test)]
mod tests {
#![feature(test)]
use super::Server;
use crate::server::server_profile::Server;
use crate::client_api::ClientApi;
use std::thread::spawn;
use std::collections::HashMap;
use crate::commands::Commands;
use std::{thread, time};
use std::time::Duration;
#[test]
fn test_server_info() {
@ -121,22 +135,40 @@ mod tests {
let owner = "noreply@email.com";
let server = Server::new(name, address, owner);
let _ = server.start().unwrap();
let result = server.start();
assert_eq!(result.is_ok(), true);
let dur = time::Duration::from_millis(1000);
thread::sleep(dur);
let api = ClientApi::get_info("127.0.0.1:6000");
if api.is_some() {
assert_eq!(api.is_ok(), true);
if let Ok(api) = api {
println!("received: {:?}", api);
let mut map = HashMap::new();
map.insert("name".to_string(), name.to_string());
map.insert("owner".to_string(), owner.to_string());
let expected = Commands::Info(Some(map));
let api = api.unwrap();
println!("expected: {:?}", expected);
assert_eq!(api, expected);
} else {
return
}
}
}*/
#[test]
fn test_server_connect() {
let name = "Server-01";
let address = "0.0.0.0:6001";
let owner = "noreply@email.com";
let server = Server::new(name, address, owner);
let _ = server.start().unwrap();
let api_result = ClientApi::new(address);
assert_eq!(api_result.is_ok(), true);
if let Ok(api) = api_result {
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}

View File

@ -5,48 +5,57 @@ use std::{
sync::Mutex,
net::{Shutdown, TcpStream},
io::prelude::*,
time::Duration,
io::Error,
collections::HashMap,
//collections::HashMap,
time::{Instant, Duration},
io,
};
use crossbeam::{Sender, Receiver, TryRecvError, unbounded};
use zeroize::Zeroize;
use crossbeam::{
Sender,
Receiver,
TryRecvError,
unbounded
};
//use zeroize::Zeroize;
use log::info;
use crate::{
server::{
server_profile::Server,
//server_profile::Server,
server_profile::ServerMessages,
},
commands::Commands
};
use parking_lot::FairMutex;
use dashmap::DashMap;
//use parking_lot::FairMutex;
//use dashmap::DashMap;
#[derive(Debug)]
pub struct Client<'a> {
connected: bool,
stream: TcpStream,
pub uuid: String,
pub username: String,
pub address: String,
pub struct Client {
uuid: String,
username: String,
address: String,
last_heartbeat: Arc<Mutex<Instant>>,
stream_arc: Arc<Mutex<TcpStream>>,
pub sender: Sender<Commands>,
receiver: Receiver<Commands>,
server: &'a Server<'a>,
server_sender: Sender<ServerMessages>,
}
impl<'a> Client<'a> {
pub fn new(server: &'a Server<'static>, stream: TcpStream, uuid: &str, username: &str, address: &str) -> Self {
impl Client {
pub fn new(stream: TcpStream, server_sender: Sender<ServerMessages>, uuid: &str, username: &str, address: &str) -> Self {
let (sender, receiver): (Sender<Commands>, Receiver<Commands>) = unbounded();
stream.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
Client {
connected: true,
stream: stream,
stream_arc: Arc::new(Mutex::new(stream)),
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
@ -54,220 +63,159 @@ impl<'a> Client<'a> {
sender,
receiver,
server,
server_sender,
last_heartbeat: Arc::new(Mutex::new(Instant::now())),
}
}
#[allow(dead_code)]
fn get_stream(&self) -> &TcpStream{
&self.stream
}
#[allow(dead_code)]
pub fn get_transmitter(&self) -> &Sender<Commands>{
pub fn get_sender(&self) -> &Sender<Commands> {
&self.sender
}
#[allow(dead_code)]
pub fn get_uuid(&self) -> &String{
&self.uuid
pub fn get_uuid(&self) -> String {
self.uuid.clone()
}
#[allow(dead_code)]
pub fn get_username(&self) -> &String{
&self.username
pub fn get_username(&self) -> String {
self.username.clone()
}
#[allow(dead_code)]
pub fn get_address(&self) -> &String{
&self.address
pub fn get_address(&self) -> String {
self.address.clone()
}
pub fn handle_connection(&mut self){
//self.stream.set_nonblocking(true).expect("set_nonblocking call failed");
// TODO: - add heartbeat timer.
pub fn handle_connection(&mut self) {
let mut buffer = [0; 1024];
// TODO: - Check heartbeat
{
info!("heartbeat")
}
while self.connected {
println!("{}: channel checks", self.uuid);
match self.receiver.try_recv() {
/*command is on the channel*/
Ok(command) => {
let a = command.clone();
match command {
/*this might be useless*/
Commands::Info(Some(_params)) => {
self.transmit_data(a.to_string().as_str());
},
Commands::Disconnect(None) => {
},
Commands::ClientUpdate(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone()), (String::from("name"), self.username.clone()), (String::from("host"), self.address.clone())].iter().cloned().collect();
let command = Commands::Client(Some(data));
info!("{}: handling connection", self.uuid);
match self.read_data(&mut buffer) {
Ok(command) => {
// match incomming commands
println!("command");
match command {
Commands::Disconnect(None) => {
self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())).expect("sending message to server failed");
self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
},
Commands::HeartBeat(None) => {
*self.last_heartbeat.lock().unwrap() = Instant::now();
self.transmit_data(Commands::Success(None).to_string().as_str());
},
Commands::ClientUpdate(None) => {
self.transmit_data(Commands::Success(None).to_string().as_str());
let _ = self.server_sender.send(ServerMessages::RequestUpdate(self.stream_arc.clone()));
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let _ = self.server_sender.send(ServerMessages::RequestInfo(uuid.clone(), self.stream_arc.clone()));
},
// TODO: may or may not be needed?
Commands::Error(None) => {
},
_ => {
self.transmit_data(Commands::Error(None).to_string().as_str());
},
}
},
Err(_) => {
// no data was read
},
}
self.server.update_client(uuid.as_str(), &command);
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone()), (String::from("name"), self.username.clone()), (String::from("host"), self.address.clone())].iter().cloned().collect();
let command = Commands::Success(Some(data));
self.server.update_client(uuid.as_str(), &command);
},
Commands::ClientRemove(Some(params)) => {
let command = Commands::Client(Some(params));
self.transmit_data(command.to_string().as_str());
self.confirm_success(&mut buffer);
},
Commands::Client(Some(_params)) => {
self.transmit_data(a.to_string().as_str());
self.confirm_success(&mut buffer);
},
Commands::Success(_params) => {
self.transmit_data(a.to_string().as_str());
},
_ => {
println!("---Invalid Channel Command---");
},
println!("buffer");
// test to see if there is anything for the client to receive from its channel
match self.receiver.try_recv() {
/*command is on the channel*/
Ok(Commands::ClientRemove(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop1: loop {
if retry < 1 {
self.transmit_data(Commands::Error(None).to_string().as_str());
break 'retry_loop1
} else {
self.transmit_data(Commands::ClientRemove(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop1;
} else {
retry -= 1;
}
}
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
/*
* if multiple commands are written to the stream before it reads, all the commands
* could be read at once, causing the program to ignore all commands after the firet
* one. Ethier make sure commands sent require a response before sending the next one
* or make a system to check for these issues.
*/
match self.read_data(&mut buffer) {
Ok(command) => {
match command {
Commands::Info(None) => {
let params: HashMap<String, String> = [(String::from("name"), self.server.get_name()), (String::from("owner"), self.server.get_author())].iter().cloned().collect();
let command = Commands::Success(Some(params));
self.transmit_data(command.to_string().as_str());
},
Commands::Disconnect(None) => {
self.connected = false;
self.server.remove_client(self.uuid.as_str());
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
let params: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
let command = Commands::ClientRemove(Some(params));
self.server.update_all_clients(self.uuid.as_str(), command);
},
Commands::ClientUpdate(None) => {
let mut command = Commands::Success(None);
self.transmit_data(command.to_string().as_str());
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
let command = Commands::ClientUpdate(Some(data));
self.server.update_all_clients(self.uuid.as_str(), command);
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let data: HashMap<String, String> = [(String::from("uuid"), self.uuid.clone())].iter().cloned().collect();
let command = Commands::ClientInfo(Some(data));
self.server.update_client(uuid.as_str(), &command);
},
Commands::Error(None) => {
},
_ => {
println!("---Invalid Command---");
},
}
},
Ok(Commands::Client(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop2: loop {
if retry < 1 {
self.transmit_data(Commands::Error(None).to_string().as_str());
break 'retry_loop2;
} else {
self.transmit_data(Commands::Client(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop2;
} else {
retry -= 1;
}
}
}
},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
_ => {},
}
println!("---Client Thread Exit---");
}
// move into a drop perhaps
#[allow(dead_code)]
pub fn disconnect(&mut self){
self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
}
pub fn transmit_data(&self, data: &str) {
println!("Transmitting data: {}", data);
let error_result = self.stream_arc.lock().unwrap().write_all(data.to_string().as_bytes());
if let Some(error) = error_result.err(){
match error.kind() {
// handle disconnections
io::ErrorKind::NotConnected => {
let _ = self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone()));
},
Err(_) => {
//println!("no data read");
},
_ => { },
}
}
println!("---Thread Exit---");
}
pub fn transmit_data(&mut self, data: &str){
println!("Transmitting...");
println!("{} data: {}", self.uuid, data);
self.stream.write(data.to_string().as_bytes()).unwrap();
self.stream.flush().unwrap();
}
fn read_data(&mut self, buffer: &mut [u8; 1024]) -> Result<Commands, Error> {
self.stream.read(buffer)?;
self.stream_arc.lock().unwrap().read(buffer)?;
let command = Commands::from(buffer);
Ok(command)
}
fn confirm_success(&mut self, buffer: &mut [u8; 1024]){
//self.stream.set_nonblocking(false).expect("set_nonblocking call failed");
//self.stream.set_read_timeout(Some(Duration::from_millis(3000))).expect("set_read_timeout call failed");
}
match self.read_data(buffer) {
Ok(command) => {
match command {
Commands::Success(_params) => {
println!("Success Confirmed");
},
_ => {
let error = Commands::Error(None);
self.transmit_data(error.to_string().as_str());
},
}
},
Err(_) => {
println!("no success read");
let error = Commands::Error(None);
self.transmit_data(error.to_string().as_str());
},
}
impl ToString for Client {
fn to_string(&self) -> std::string::String { todo!() }
}
//self.stream.set_read_timeout(None).expect("set_read_timeout call failed");
//self.stream.set_nonblocking(true).expect("set_nonblocking call failed");
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn disconnect(&mut self){
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
self.connected = false;
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn transmit_success(&self, data: &String){
let mut success_message = "!success:".to_string();
if !data.is_empty(){
success_message.push_str(&" ".to_string());
success_message.push_str(&data.to_string());
}
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
fn transmit_error(&mut self, data: &String){
let mut error_message = "!error:".to_string();
if !data.is_empty(){
error_message.push_str(&" ".to_string());
error_message.push_str(&data.to_string());
}
self.transmit_data(&error_message);
impl Drop for Client {
fn drop(&mut self) {
let _ = self.stream_arc.lock().unwrap().write_all(Commands::Disconnect(None).to_string().as_bytes());
let _ = self.stream_arc.lock().unwrap().shutdown(Shutdown::Both);
}
}

View File

@ -1,29 +0,0 @@
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
pub fn get_client_data(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, data: &HashMap<String, String>) -> String{
let clients_hashmap = clients_ref.lock().unwrap();
let uuid = data.get("uuid").unwrap();
println!("uuid: {}", uuid);
for (key, value) in clients_hashmap.iter(){
println!("{}",key);
}
let client = clients_hashmap.get(uuid);
match client{
Some(data) => {
let mut message = String::from("!success:");
message.push_str(&" uuid:".to_string());
message.push_str(&data.get_uuid().to_string());
message.push_str(&" host:".to_string());
message.push_str(&data.get_address().to_string());
message.push_str(&" username:".to_string());
message.push_str(&data.get_username().to_string());
message
},
None => String::from("client not online"),
}
}*/

View File

@ -1,5 +0,0 @@
/*use crate::server::client::client_profile::Client;
pub fn format_client_data(uuid: &String, client: &Client) -> String{
["!client: username:",client.get_username(), " uuid:", uuid, " host:\"", client.get_address(), "\""].concat()
}*/

View File

@ -1,13 +0,0 @@
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use dashmap::DashMap;
pub fn add_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
let uuid = client.get_uuid().to_string();
//clients_hashmap.insert(uuid, client.clone());
}
*/

View File

@ -1,10 +0,0 @@
/*use crate::server::client::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
pub fn remove_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
clients_hashmap.remove(client.get_uuid()).unwrap();
}*/

View File

@ -8,6 +8,7 @@ use crate::{
},
commands::Commands
};
use std::{
sync::{Arc, Mutex},
net::{TcpStream, TcpListener},
@ -15,7 +16,6 @@ use std::{
io::prelude::*,
time::Duration,
io::Error,
io::prelude::*,
thread,
io
};
@ -24,19 +24,16 @@ use log::info;
use crossbeam_channel::{Sender, Receiver, unbounded};
use rust_chat_server::ThreadPool;
use zeroize::Zeroize;
use parking_lot::FairMutex;
use dashmap::DashMap;
use regex::Regex;
//use zeroize::Zeroize;
//use parking_lot::FairMutex;
//use dashmap::DashMap;
//use regex::Regex;
#[derive(Debug)]
pub enum ServerMessages {
RequestUpdate(String),
#[allow(dead_code)]
RequestInfo(String, String),
#[allow(dead_code)]
RequestDisconnect(String),
#[allow(dead_code)]
RequestUpdate(Arc<Mutex<TcpStream>>),
RequestInfo(String, Arc<Mutex<TcpStream>>),
Disconnect(String),
Shutdown,
}
@ -46,9 +43,9 @@ pub struct Server<'z> {
name: &'z str,
address: &'z str,
author: &'z str,
connected_clients: Arc<Mutex<HashMap<String, Sender<Commands>>>>,
connected_clients: Arc<Mutex<HashMap<String, Client>>>,
thread_pool: ThreadPool,
sender: Sender<ServerMessages>,
@ -64,103 +61,127 @@ impl<'z> Server<'z> {
name: name,
address: address,
author: author,
connected_clients: Arc::new(Mutex::new(HashMap::new())),
thread_pool: ThreadPool::new(16),
thread_pool: ThreadPool::new(16),
sender,
receiver,
}
}
pub fn get_name(&self) -> String{
pub fn get_name(&self) -> String {
self.name.to_string()
}
pub fn get_address(&self) -> String{
pub fn get_address(&self) -> String {
self.address.to_string()
}
pub fn get_author(&self) -> String{
pub fn get_author(&self) -> String {
self.author.to_string()
}
pub fn start(&'static self) -> Result<(), io::Error> {
info!("server: starting server...");
pub fn set_port(&mut self) {
}
pub fn start(&'static self) -> Result<(), io::Error>{
println!("server: starting server...");
// clone elements for thread
let client_map = self.connected_clients.clone();
let receiver = self.receiver.clone();
// set up listener and buffer
let listener = TcpListener::bind(self.get_address())?;
listener.set_nonblocking(true);
info!("server: spawning threads");
thread::Builder::new().name("Server Thread".to_string()).spawn(move || {
listener.set_nonblocking(true)?;
println!("server: spawning threads");
let _ = thread::Builder::new().name("Server Thread".to_string()).spawn(move || {
let mut buffer = [0; 1024];
'outer: loop {
std::thread::sleep(Duration::from_millis(100));
// get messages from the servers channel.
info!("server: getting messages");
println!("server: getting messages");
for i in receiver.try_iter() {
match i {
ServerMessages::Shutdown => {
// TODO: implement disconnecting all clients and shutting down the server
info!("server: shutting down...");
println!("server: shutting down...");
break 'outer;
},
_ => {},
ServerMessages::RequestUpdate(stream_arc) => {
for (_k, v) in self.connected_clients.lock().unwrap().iter() {
let stream = stream_arc.lock().unwrap();
self.transmit_data(&stream, v.to_string().as_str());
if self.read_data(&stream, &mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
println!("Success Confirmed");
} else {
println!("no success read");
let error = Commands::Error(None);
self.transmit_data(&stream, error.to_string().as_str());
}
}
},
ServerMessages::RequestInfo(uuid, stream_arc) => {
let stream = stream_arc.lock().unwrap();
if let Some(client) = self.connected_clients.lock().unwrap().get(&uuid) {
let params: HashMap<String, String> = [(String::from("uuid"), client.get_uuid()), (String::from("name"), client.get_username()), (String::from("host"), client.get_address())].iter().cloned().collect();
let command = Commands::Success(Some(params));
self.transmit_data(&stream, command.to_string().as_str());
} else {
let command = Commands::Success(None);
self.transmit_data(&stream, command.to_string().as_str());
}
},
ServerMessages::Disconnect(uuid) => {
self.remove_client(uuid.as_str());
let params: HashMap<String, String> = [(String::from("uuid"), uuid)].iter().cloned().collect();
self.update_all_clients(Commands::ClientRemove(Some(params)));
},
}
}
info!("server: checking for new connections");
if let Ok((mut stream, addr)) = listener.accept() {
stream.set_read_timeout(Some(Duration::from_millis(10000))).unwrap();
println!("server: checking for new connections");
if let Ok((stream, _addr)) = listener.accept() {
stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
let _ = stream.set_nonblocking(false);
let request = Commands::Request(None);
self.transmit_data(&stream, &request.to_string().as_str());
match self.read_data(&stream, &mut buffer) {
Ok(command) => {
println!("Server: new connection sent - {:?}", command);
match command {
Commands::Connect(Some(data)) => {
let uuid = data.get("uuid").unwrap();
let username = data.get("name").unwrap();
let address = data.get("host").unwrap();
println!("{}", format!("Server: new Client connection: _addr = {}", address ));
let client = Client::new(stream, self.sender.clone(), &uuid, &username, &address);
info!("{}", format!("Server: new Client connection: addr = {}", address ));
let mut client = Client::new(self, stream, &uuid, &username, &address);
let tx = client.get_transmitter();
let mut clients_hashmap = self.connected_clients.lock().unwrap();
clients_hashmap.insert(uuid.to_string(), tx.clone());
std::mem::drop(clients_hashmap);
let success = Commands::Success(None);
tx.send(success).unwrap();
self.thread_pool.execute(move || {
client.handle_connection();
});
self.connected_clients.lock().unwrap().insert(uuid.to_string(), client);
let params: HashMap<String, String> = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect();
let new_client = Commands::Client(Some(params));
self.update_all_clients(uuid.as_str(), new_client);
},
let _ = self.connected_clients.lock().unwrap().iter().map(|(_k, v)| v.sender.send(new_client.clone()));
},
// TODO: - correct connection reset error when getting info.
Commands::Info(None) => {
info!("Server: info requested");
println!("Server: info requested");
let params: HashMap<String, String> = [(String::from("name"), self.name.to_string().clone()), (String::from("owner"), self.author.to_string().clone())].iter().cloned().collect();
let command = Commands::Success(Some(params));
let command = Commands::Info(Some(params));
self.transmit_data(&stream, command.to_string().as_str());
},
_ => {
info!("Server: Invalid command sent");
println!("Server: Invalid command sent");
self.transmit_data(&stream, Commands::Error(None).to_string().as_str());
},
}
@ -168,28 +189,35 @@ impl<'z> Server<'z> {
Err(_) => println!("ERROR: stream closed"),
}
}
// TODO: end -
// handle each client for messages
println!("server: handing control to clients");
for (_k, client) in self.connected_clients.lock().unwrap().iter_mut() {
client.handle_connection();
}
}
info!("server: stopped")
println!("server: stopped");
});
info!("server: started");
println!("server: started");
Ok(())
}
pub fn stop(&self) {
info!("server: sending stop message");
self.sender.send(ServerMessages::Shutdown);
let _ = self.sender.send(ServerMessages::Shutdown);
}
fn transmit_data(&self, mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
println!("data: {}", data);
/*
* This will throw an error and crash any thread, including the main thread, if
* the connection is lost before transmitting. Maybe change to handle any exceptions
* that may occur.
*/
stream.write(data.to_string().as_bytes()).unwrap();
let _ = stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
@ -200,20 +228,11 @@ impl<'z> Server<'z> {
Ok(command)
}
pub fn update_client(&self, uuid: &str, command: &Commands){
let clients = self.connected_clients.lock().unwrap();
fn update_all_clients(&self, command: Commands){
let client_map = self.connected_clients.lock().unwrap();
let sender = clients.get(&uuid.to_string()).unwrap();
sender.send(command.clone()).unwrap();
}
pub fn update_all_clients(&self, uuid: &str, command: Commands){
let clients = self.connected_clients.lock().unwrap();
for (client_uuid, sender) in clients.iter() {
if uuid != client_uuid.to_string() {
sender.send(command.clone()).unwrap();
}
for client in client_map.values() {
client.get_sender().send(command.clone()).unwrap();
}
}
@ -221,28 +240,10 @@ impl<'z> Server<'z> {
let mut clients = self.connected_clients.lock().unwrap();
clients.remove(&uuid.to_string());
}
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
pub fn get_info(&self, tx: Sender<Commands>) {
let mut params: HashMap<String, String> = HashMap::new();
params.insert(String::from("name"), self.name.to_string().clone());
params.insert(String::from("owner"), self.author.to_string().clone());
let command = Commands::Info(Some(params));
tx.send(command).unwrap();
}
#[deprecated(since="24.7.20", note="will be removed in future, please do not use!")]
#[allow(dead_code)]
fn regex_data(&self, command_regex: &Regex, data: &str, command_addons: &mut HashMap<String, String>){
for figure in command_regex.find_iter(data){
let segment = figure.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
command_addons.insert(contents[0].to_string(), contents[1].to_string());
}
}
impl<'z> ToString for Server<'z> {
fn to_string(&self) -> std::string::String { todo!() }
}
impl<'z> Drop for Server<'z> {
@ -252,14 +253,16 @@ impl<'z> Drop for Server<'z> {
}
}
struct ServerDelegate {
}
/* The new version of the server no long works with these unit
* tests.
* They will be fixed soon!
* TODO: fix unit tests
*/
#[cfg(test)]
/*#[cfg(test)]
mod tests{
use super::*;
use std::{thread, time};
@ -605,4 +608,4 @@ mod tests{
let dur = time::Duration::from_millis(500);
thread::sleep(dur);
}
}
}*/