Merge pull request #1 from Mitch161/struct-rework

Updated rework of files and folders
This commit is contained in:
Mitch161 2020-07-01 12:54:37 +01:00 committed by GitHub
commit c3c1dfb342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 725 additions and 8 deletions

2
.gitignore vendored
View File

@ -8,3 +8,5 @@ Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
.DS_Store
.idea

View File

@ -1,5 +1,5 @@
[package]
name = "server-rust"
name = "rust-chat-server"
version = "0.1.0"
authors = ["Mitchell <mitchellhardie1@gmail.com>"]
edition = "2018"
@ -7,3 +7,17 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
regex = "1"
crossbeam = "0.7"
parking_lot = "0.10"
crossbeam-channel = "0.4"
crossbeam-utils = "0.7"
crossbeam-queue = "0.2"
dashmap = "3.11.4"
async-std = "1.6.2"
[profile.dev]
opt-level = 0
[profile.release]
opt-level = 3

102
src/lib.rs Normal file
View File

@ -0,0 +1,102 @@
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool{
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool{
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
// create some threads and store them in the vector
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static {
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || {
loop{
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

View File

@ -1,11 +1,17 @@
use std::net::TcpListener;
//mod client_management;
mod server;
use crate::server::client::client_profile::Client;
use crate::server::server_profile::Server;
use std::collections::HashMap;
use std::collections::VecDeque;
fn main(){
let listener = TcpListener::bind("127.0.0.1:6001").unwrap();
let server_name = String::from("Server-01");
let server_address = String::from("127.0.0.1:6001");
let server_author = String::from("nope@live.co.uk");
for stream in listener.incoming(){
let stream = stream.unwrap();
println!("Connection Established!");
}
let server = Server::new(&server_name, &server_address, &server_author);
server.start();
}

View File

@ -0,0 +1,114 @@
extern crate regex;
use crate::server::commands::Commands;
use crate::server::utility;
use std::collections::VecDeque;
use std::net::{Shutdown, TcpStream};
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{Receiver, TryRecvError};
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::time::Duration;
#[derive(Clone)]
pub struct Client{
connected: bool,
stream: Arc<TcpStream>,
uuid: String,
username: String,
address: String,
}
impl Client{
pub fn new(stream: Arc<TcpStream>, uuid: &String, username: &String, address: &String) -> Client{
Client{
connected: true,
stream: stream,
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
}
}
pub fn get_stream(&self) -> &TcpStream{
&self.stream
}
pub fn get_uuid(&self) -> &String{
&self.uuid
}
pub fn get_username(&self) -> &String{
&self.username
}
pub fn get_address(&self) -> &String{
&self.address
}
pub fn disconnect(&mut self){
self.stream.shutdown(Shutdown::Both).expect("shutdown call failed");
self.connected = false;
}
pub fn handle_connection(&mut self, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, client_rx: Receiver<Arc<Barrier>>){
//self.stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut buffer = [0; 1024];
while self.connected {
if !message_queue.lock().is_empty() {
match client_rx.try_recv(){
/*handle our data*/
Ok(sync_group) => {
println!("data present");
let data = utility::format_data(message_queue);
let command = utility::match_outbound_command(&data.get("command").unwrap());
println!("waiting 1");
sync_group.wait();
println!("executing");
command.execute(&self, &mut buffer, &data);
println!("waiting 2");
sync_group.wait();
println!("client updated");
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
}
match self.stream.peek(&mut buffer){
Ok(_) => {
//self.stream.lock().unwrap().read(&mut buffer).unwrap();
self.get_stream().read(&mut buffer).unwrap();
let incoming_message = String::from_utf8_lossy(&buffer[..]);
//let data: Vec<String> = utility::tokenize(&incoming_message);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
println!("Request: {}", incoming_message);
//println!("Data: {:?}", data);
//let command = utility::match_command(&data[0]);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
println!("Error!");
} else {
println!("command executing...");
command.execute(self, &mut buffer, &data, clients_ref, message_queue);
}
},
Err(_) => {
println!("no data peeked");
},
}
}
println!("---thread exit---");
}
}

1
src/server/client/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod client_profile;

View File

View File

@ -0,0 +1,30 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
pub fn get_client_data(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, data: &HashMap<String, String>) -> String{
let clients_hashmap = clients_ref.lock().unwrap();
let uuid = data.get("uuid").unwrap();
println!("uuid: {}", uuid);
for (key, value) in clients_hashmap.iter(){
println!("{}",key);
}
let client = clients_hashmap.get(uuid);
match client{
Some(data) => {
let mut message = String::from("!success:");
message.push_str(&" uuid:".to_string());
message.push_str(&data.get_uuid().to_string());
message.push_str(&" host:".to_string());
message.push_str(&data.get_address().to_string());
message.push_str(&" username:".to_string());
message.push_str(&data.get_username().to_string());
message
},
None => String::from("client not online"),
}
}

View File

@ -0,0 +1,11 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
// send a list of all clients.
// client !clientupdate:
// server !client: name:"vobo" uuid:24653526-23464562-46346-3563563 host:"127.0.0.1"
// server !client: name:"bovo" uuid:24643526-23464562-46346-3563563 host:"127.0.0.1"
pub fn format_client_data(uuid: &String, client: &Client) -> String{
["!client: username:",client.get_username(), " uuid:", uuid, " host:\"", client.get_address(), "\""].concat()
}

View File

@ -0,0 +1,13 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use dashmap::DashMap;
pub fn add_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
let uuid = client.get_uuid().to_string();
clients_hashmap.insert(uuid, client.clone());
}

View File

@ -0,0 +1,11 @@
use crate::server::client::client_profile::Client;
//use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
pub fn remove_client(clients_ref: &Arc<Mutex<HashMap<String, Client>>>, client: &Client){
let mut clients_hashmap = clients_ref.lock().unwrap();
clients_hashmap.remove(client.get_uuid()).unwrap();
}

View File

View File

@ -0,0 +1,12 @@
pub fn get_server_info() -> String{
let mut server_details = "".to_string();
let server_name = String::from("Server-01");
let server_owner = String::from("mickyb18");
server_details.push_str(&"name:".to_string());
server_details.push_str(&server_name.to_string());
server_details.push_str(&" owner:".to_string());
server_details.push_str(&server_owner.to_string());
server_details
}

View File

195
src/server/commands/mod.rs Normal file
View File

@ -0,0 +1,195 @@
mod request;
mod info;
mod success;
mod error;
mod connect;
mod disconnect;
mod client_update;
mod client_info;
mod client;
mod test;
mod message;
use crate::server::client::client_profile::Client;
use crate::server::utility;
use std::collections::VecDeque;
use parking_lot::FairMutex;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::TcpStream;
use std::time::Duration;
use dashmap::DashMap;
pub enum Commands{
Info,
Connect,
Disconnect,
ClientUpdate,
ClientInfo,
Unknown,
}
pub enum OutboundCommands{
Client,
ClientRemove,
Unknown,
}
enum InboundReturns{
Success,
Error,
}
enum OutboundReturns{
Success,
Error,
}
impl Commands{
pub fn execute(&self, client: &mut Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>, clients_ref: &Arc<Mutex<HashMap<String, Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>){
let stream = client.get_stream();
match *self{
Commands::Info => {
let server_details = info::get_server_info();
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &server_details);
},
Commands::Connect => {
connect::add_client(clients_ref, client);
let mut message = "!client: username:".to_string();
message.push_str(&client.get_username().to_string());
message.push_str(&" host:".to_string());
message.push_str(&client.get_address().to_string());
message.push_str(&" uuid:".to_string());
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
},
Commands::Disconnect => {
disconnect::remove_client(clients_ref, client);
let mut message = "!clientRemove: uuid:".to_string();
message.push_str(&client.get_uuid().to_string());
message_queue.lock().push_back(message);
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
client.disconnect();
println!("disconnected!");
},
Commands::ClientUpdate => {
let in_success = InboundReturns::Success;
let clients_hashmap = clients_ref.lock().unwrap();
for (key, value) in clients_hashmap.iter(){
let formatted_data = client_update::format_client_data(&key, &value);
utility::transmit_data(&stream, &formatted_data);
in_success.execute(&stream, buffer, &formatted_data);
}
let out_success = OutboundReturns::Success;
out_success.execute(&stream, &String::from(""));
in_success.execute(&stream, buffer, &String::from("!success:"));
},
Commands::ClientInfo => {
let requested_data = client_info::get_client_data(clients_ref, data);
utility::transmit_data(&stream, &requested_data);
},
Commands::Unknown => {
println!("Uknown Command!");
},
}
}
}
impl OutboundCommands{
pub fn execute(&self, client: &Client, buffer: &mut [u8; 1024], data: &HashMap<String, String>){
let stream = client.get_stream();
match *self{
OutboundCommands::Client => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" username:");
message.push_str(&data.get("username").unwrap());
message.push_str(&" host:");
message.push_str(&data.get("host").unwrap());
message.push_str(&" uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
},
OutboundCommands::ClientRemove => {
let mut message = String::from("");
message.push_str(&data.get("command").unwrap());
message.push_str(&" uuid:");
message.push_str(&data.get("uuid").unwrap());
utility::transmit_data(&stream, &message);
let in_success = InboundReturns::Success;
in_success.execute(&stream, buffer, &message);
},
OutboundCommands::Unknown => {
println!("Unknown Command!");
},
}
}
}
impl InboundReturns{
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &String){
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
match *self{
InboundReturns::Success => {
let mut failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
utility::transmit_data(stream, data);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
},
InboundReturns::Error => {},
}
}
}
impl OutboundReturns{
pub fn execute(&self, stream: &TcpStream, data: &String){
match *self{
OutboundReturns::Success => {
let mut message = "!success:".to_string();
if !data.is_empty(){
message.push_str(&" ".to_string());
message.push_str(&data.to_string());
}
utility::transmit_data(stream, &message);
},
OutboundReturns::Error => {},
}
}
}

View File

View File

View File

5
src/server/mod.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod client;
pub mod commands;
pub mod server_profile;
pub mod utility;

View File

@ -0,0 +1,124 @@
extern crate regex;
use crate::server::client::client_profile::Client;
use crate::server::commands::Commands;
use crate::server::utility;
use rust_chat_server::ThreadPool;
//use crate::client_management::client_profile::Client;
//use crate::server::commands::Commands;
//use crate::server::commands::network;
use std::collections::VecDeque;
use std::net::TcpListener;
use std::sync::{Arc, Barrier, Mutex};
use std::rc::Rc;
use crossbeam_channel::{unbounded, Sender, Receiver};
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::thread;
pub struct Server{
name: String,
address: String,
author: String,
connected_clients: Arc<Mutex<HashMap<String,Client>>>,
message_queue: Arc<FairMutex<VecDeque<String>>>,
}
impl Server{
pub fn new(name: &String, address: &String, author: &String) -> Server{
let connected_clients: Arc<Mutex<HashMap<String,Client>>> = Arc::new(Mutex::new(HashMap::new()));
let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(VecDeque::new()));
Server{
name: name.to_string(),
address: address.to_string(),
author: author.to_string(),
connected_clients: connected_clients,
message_queue: message_queue,
}
}
pub fn start(&self){
let listener = TcpListener::bind(self.address.clone()).unwrap();
let pool = ThreadPool::new(10);
//let connected_clients = Arc::new(Mutex::new(self.connected_clients.clone()));
//let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(self.message_queue.clone()));
let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
let (clock_tx, _) = (tx.clone(), rx.clone());
thread::spawn({
let connected_clients = Arc::clone(&self.connected_clients);
let message_queue = Arc::clone(&self.message_queue);
move || {
loop{
let online_clients = connected_clients.lock().unwrap().len();
if !message_queue.lock().is_empty(){
println!("message on queue detected");
let sync_group = Arc::new(Barrier::new(online_clients+1));
println!("sending to threads... {}",online_clients);
for _ in 0..online_clients{
println!("thread");
clock_tx.send(sync_group.clone()).unwrap();
}
println!("all threads updated!");
sync_group.wait();
println!("data removed");
message_queue.lock().pop_front();
sync_group.wait();
println!("clock finished!");
}
}
}
});
//stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
loop{
if let Ok((mut stream, addr)) = listener.accept(){
println!("Connected: {}", addr);
let connected_clients_ref = Arc::clone(&self.connected_clients);
let message_queue_ref = Arc::clone(&self.message_queue);
let (_ , client_rx) = (tx.clone(), rx.clone());
pool.execute(move || {
let mut buffer = [0; 1024];
let request = String::from("?request:");
utility::transmit_data(&stream, &request);
stream.read(&mut buffer).unwrap();
let stream = Arc::new(stream);
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let data: HashMap<String, String> = utility::tokenize(&incoming_message);
let command = utility::match_command(&data.get("command").unwrap());
if match command{ Commands::Connect => true, _ => false,}{
/*
* Change so that command is paassed in and then matches how to break the
* data up
*/
//let (uuid, username) = utility::extract_fields(&data);
let uuid = data.get("uuid").unwrap();
let username = data.get("username").unwrap();
let address = data.get("host").unwrap();
let mut client = Client::new(stream, &uuid, &username, &address);
command.execute(&mut client, &mut buffer, &data, &connected_clients_ref, &message_queue_ref);
client.handle_connection(&connected_clients_ref, &message_queue_ref, client_rx);
//process_connection(&stream, &clients_ref, &message_ref, &address.to_string(), client_rx);
}else{
//error
}
});
}
}
}
}

77
src/server/utility.rs Normal file
View File

@ -0,0 +1,77 @@
use std::net::TcpStream;
use std::io::Write;
use std::collections::HashMap;
use regex::Regex;
use crate::server::commands::{Commands, OutboundCommands};
use std::sync::Arc;
use parking_lot::FairMutex;
use std::collections::VecDeque;
pub fn transmit_data(mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
pub fn tokenize(incoming_message: &str) -> HashMap<String, String>{
let mut data: HashMap<String, String> = HashMap::new();
for mat in Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9\-\+\[\]{}_=/]+|("(.*?)")+)"###).unwrap().find_iter(incoming_message){
if match match_command(&mat.as_str().to_string()) { Commands::Unknown => false, _ => true,} || match match_outbound_command(&mat.as_str().to_string()) { OutboundCommands::Unknown => false, _ => true,}{
data.insert("command".to_string(), mat.as_str().to_string());
}else{
let segment = mat.as_str().to_string();
let contents: Vec<&str> = segment.split(":").collect();
println!("key: {}, value: {}", contents[0].to_string(), contents[1].to_string());
data.insert(contents[0].to_string(), contents[1].to_string());
}
}
data
}
pub fn match_command(command: &String) -> Commands{
match command.as_str(){
"!info:" => Commands::Info,
"!connect:" => Commands::Connect,
"!disconnect:" => Commands::Disconnect,
"!clientUpdate:" => Commands::ClientUpdate,
"!clientInfo:" => Commands::ClientInfo,
_ => Commands::Unknown,
}
}
pub fn match_outbound_command(command: &String) -> OutboundCommands{
match command.as_str(){
"!client:" => OutboundCommands::Client,
"!clientRemove:" => OutboundCommands::ClientRemove,
_ => OutboundCommands::Unknown,
}
}
pub fn format_data(message_queue: &Arc<FairMutex<VecDeque<String>>>) -> HashMap<String, String>{
//copy data from queue
let locked_message_queue = message_queue.lock();
let message = locked_message_queue.get(0).unwrap();
println!("msg: {}", message);
tokenize(&message)
}
pub fn extract_fields(data: &Vec<String>) -> (String, String){
let mut uuid = String::from("");
let mut username = String::from("");
for field in data{
if field.contains("uuid:"){
let contents: Vec<&str> = field.split(":").collect();
uuid.push_str(contents[1]);
}else if field.contains("username:"){
let contents: Vec<&str> = field.split(":").collect();
username.push_str(contents[1]);
}
}
(uuid, username)
}