update to clientUpdate command

Will not work currently as sync issues exist
This commit is contained in:
Mitchell 2020-06-16 17:37:11 +01:00
parent 830e5c9d64
commit 15edb7872f
4 changed files with 299 additions and 40 deletions

View File

@ -8,6 +8,12 @@ edition = "2018"
[dependencies]
regex = "1"
crossbeam = "0.7"
parking_lot = "0.10"
crossbeam-channel = "0.4"
crossbeam-utils = "0.7"
crossbeam-queue = "0.2"
dashmap = "3.11.4"
[profile.dev]
opt-level = 0

View File

@ -10,7 +10,6 @@
* Save messages to be sent and check every few mins to see if they are online.
*
*/
mod client_management;
mod protocols;
@ -18,45 +17,125 @@ extern crate regex;
use crate::client_management::client_profile::Client;
use crate::protocols::commands::Commands;
use crate::protocols::commands::OutboundCommands;
use crate::protocols::commands::network;
use rust_chat_server::ThreadPool;
use std::collections::VecDeque;
use std::net::TcpListener;
use std::net::TcpStream;
use std::sync::mpsc;
use std::sync::Mutex;
use std::sync::Arc;
use crossbeam_queue::{SegQueue, PushError, PopError};
use crossbeam_utils::sync::WaitGroup;
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
use std::io::prelude::*;
use std::time::Duration;
use regex::Regex;
use std::thread;
fn main(){
let listener = TcpListener::bind("127.0.0.1:6001").unwrap();
let pool = ThreadPool::new(10);
let connected_clients = Arc::new(Mutex::new(HashMap::new()));
// Using an ArrayQueue is much faster, but has limited capacity
//let message_queue = Arc::new(SegQueue::new());
let message_queue = Arc::new(FairMutex::new(VecDeque::new()));
let sync_wg = Arc::new(WaitGroup::new());
thread::spawn({
let message_ref = Arc::clone(&message_queue);
move || {
loop{
println!("Phase 1");
//sync_wg.clone().wait();
println!("Removing item...");
message_ref.lock().pop_front();
println!("Phase 2");
//sync_wg.clone().wait();
println!("Done");
}
}
});
loop{
if let Ok((stream, address)) = listener.accept(){
println!("Connected: {}", address);
let clients_ref = Arc::clone(&connected_clients);
let message_ref = Arc::clone(&message_queue);
//let wg = sync_wg.clone();
pool.execute(move || {
handle_connection(stream, &clients_ref, &address.to_string());
handle_connection(&stream, &clients_ref, &message_ref, &sync_wg, &address.to_string());
});
}
}
}
fn handle_connection(mut stream: TcpStream, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, new_address: &String){//vec needs to be of type clients
fn handle_connection(mut stream: &TcpStream, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, sync_wg: &WaitGroup, new_address: &String){
let wg = sync_wg.clone();
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let request = String::from("?request:");
network::transmit_data(stream, &request);
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let data: Vec<String> = tokenize(&incoming_message);
/*
* Share a message queue with all threads where operations that need to be performed are added
* to the queue, each thread will then increment a integer till its the same as the amount of
* clients online, then all threads will execute the command together.
*/
let mut connected = false;
// do while loop
while {
println!("loop start");
if connected == true && !message_queue.lock().is_empty() {
let (command, data) = format_data(message_queue);
let command = match_outbound_command(&command);
println!("Request: {}", incoming_message);
println!("Data: {:?}", data);
//BUG: gets stuck at waiting...
let command = match_command(&data);
command.execute(stream, &data, new_address, clients_ref);
//change to a wait group so all threads are in sync
println!("waiting...");
//sync_wg.clone().wait();
sync_wg.wait();
println!("done");
//execute copied command
command.execute(stream, &mut buffer, &data);
println!("waiting...");
//sync_wg.clone().wait();
println!("done");
//remove data from front of queue
}else{
stream.read(&mut buffer).unwrap();
// after timeout handle error and do not execute the code below if there is an error
let incoming_message = String::from_utf8_lossy(&buffer[..]);
let data: Vec<String> = tokenize(&incoming_message);
println!("Request: {}", incoming_message);
println!("Data: {:?}", data);
let command = match_command(&data);
if connected == false && match command{ Commands::Connect => true, _ => false,}{
command.execute(stream, &mut buffer, &data, new_address, clients_ref, message_queue);
connected = true;
}else if connected == true{
command.execute(stream, &mut buffer, &data, new_address, clients_ref, message_queue);
}else{
println!("Error!");
}
}
connected
}{}
println!("---thread exit---");
}
fn tokenize(incoming_message: &str) -> Vec<String>{
@ -83,3 +162,19 @@ fn match_command(data: &Vec<String>) -> Commands{
_ => Commands::Unknown,
}
}
fn match_outbound_command(data: &Vec<String>) -> OutboundCommands{
match data[0].as_str(){
"!clientUpdate:" => OutboundCommands::ClientUpdate,
_ => OutboundCommands::Unknown,
}
}
fn format_data(message_queue: &Arc<FairMutex<VecDeque<String>>>) -> (Vec<String>, String){
//copy data from queue
let locked_message_queue = message_queue.lock();
let data = locked_message_queue.get(0).unwrap();
//format the data into a command
(tokenize(&data), data.clone())
}

View File

@ -9,14 +9,19 @@ mod client_info;
mod client;
mod test;
mod message;
pub mod network;
use crate::client_management::client_profile::Client;
use crossbeam_queue::SegQueue;
use std::collections::VecDeque;
use parking_lot::FairMutex;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use std::net::TcpStream;
use std::io::Write;
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
pub enum Commands{
Request,
@ -33,66 +38,209 @@ pub enum Commands{
Unknown,
}
pub enum OutboundCommands{
Success,
Error,
ClientUpdate,
Unknown,
}
impl Commands{
pub fn execute(&self, stream: TcpStream, data: &Vec<String>, address: &String, clients_ref: &Arc<Mutex<HashMap<String,Client>>>){
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &Vec<String>, address: &String, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>){
match *self{
Commands::Request => {
}
},
Commands::Info => {
}
},
Commands::Success => {
}
},
Commands::Error => {
}
},
Commands::Connect => {
let message = String::from("!success:");
Commands::transmit_data(&stream, &message);
connect::add_client(clients_ref, &data[1], &data[2], address);
}
let mut message = "!client: ".to_string();
message.push_str(&data[2].to_string());
message.push_str(&" address:".to_string());
message.push_str(&address.to_string());
message.push_str(&" ".to_string());
message.push_str(&data[1].to_string());
println!("message: {}", message);
message_queue.lock().push_back(message);
},
Commands::Disconnect => {
let message = String::from("!success:");
Commands::transmit_data(&stream, &message);
network::transmit_data(stream, &message);
disconnect::remove_client(clients_ref, &data[1]);
}
/*
* repeat what connect does
*/
},
Commands::ClientUpdate => {
let message = String::from("!success:");
Commands::transmit_data(&stream, &message);
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
//let mut buffer = [0; 1024];
let mut failing = true;
let clients_hashmap = clients_ref.lock().unwrap();
for (key, value) in clients_hashmap.iter(){
let formatted_data = client_update::format_client_data(&key, &value);
Commands::transmit_data(&stream, &formatted_data);
network::transmit_data(stream, &formatted_data);
failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &formatted_data);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
}
let final_message = String::from("!finished:");
Commands::transmit_data(&stream, &final_message);
}
let final_message = String::from("!success:");
network::transmit_data(stream, &final_message);
failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &final_message);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
},
Commands::ClientInfo => {
let message = String::from("!success:");
Commands::transmit_data(&stream, &message);
network::transmit_data(stream, &message);
let requested_address = client_info::get_client_address(clients_ref, &data[1]);
Commands::transmit_data(&stream, &requested_address);
}
network::transmit_data(stream, &requested_address);
},
Commands::Client => {
}
},
Commands::Test => {
}
},
Commands::Message => {
}
},
Commands::Unknown => {
println!("Uknown Command!");
}
},
}
}
}
fn transmit_data(mut stream: &TcpStream, data: &str){
impl OutboundCommands{
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &String){
match *self{
OutboundCommands::Success => {},
OutboundCommands::Error => {},
OutboundCommands::ClientUpdate => {
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let message = String::from("!clientUpdate:");
network::transmit_data(stream, &message);
let mut failing = true;
//let mut buffer = [0; 1024];
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &message);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
network::transmit_data(stream, data);
failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, data);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
let final_message = String::from("!success:");
network::transmit_data(stream, &final_message);
failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &final_message);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
},
OutboundCommands::Unknown => {
println!("Unknown Command!");
},
}
}
}
/*mod network{
use std::net::TcpStream;
use std::io::Write;
pub fn transmit_data(mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}
}
}*/

View File

@ -0,0 +1,10 @@
use std::net::TcpStream;
use std::io::Write;
pub fn transmit_data(mut stream: &TcpStream, data: &str){
println!("Transmitting...");
println!("data: {}",data);
stream.write(data.to_string().as_bytes()).unwrap();
stream.flush().unwrap();
}