connect and disconnect features

This commit is contained in:
Mitchell 2020-06-20 14:56:14 +01:00
parent a4dc149aee
commit 714b33464c
4 changed files with 81 additions and 133 deletions

View File

@ -1,15 +1,3 @@
/*
* Add execute method to Commands enum and implement a struct of CommandInfo into its constructor
* Change parameters passed into required funtions
*
*
* client sends message to other client, reciever sends confirm message back to sender so sender
* knows theyre online and accepting packets. If no comfirm comes back to sender, try 2 more times.
* If no success, request ip from server to double check if theyre online, if ip match, assume
* theyre offline. If no response from server assume some error has occured or sender is offline.
* Save messages to be sent and check every few mins to see if they are online.
*
*/
mod client_management;
mod protocols;
@ -24,11 +12,8 @@ use rust_chat_server::ThreadPool;
use std::collections::VecDeque;
use std::net::TcpListener;
use std::net::TcpStream;
use std::sync::mpsc;
use std::sync::Mutex;
use std::sync::Arc;
use crossbeam_queue::{SegQueue, PushError, PopError};
use crossbeam_utils::sync::WaitGroup;
use std::sync::{Arc, Barrier, Mutex};
use crossbeam_channel::{unbounded, Sender, Receiver, TryRecvError};
use parking_lot::FairMutex;
use std::collections::HashMap;
use dashmap::DashMap;
@ -41,23 +26,31 @@ fn main(){
let listener = TcpListener::bind("127.0.0.1:6001").unwrap();
let pool = ThreadPool::new(10);
let connected_clients = Arc::new(Mutex::new(HashMap::new()));
// Using an ArrayQueue is much faster, but has limited capacity
//let message_queue = Arc::new(SegQueue::new());
let message_queue = Arc::new(FairMutex::new(VecDeque::new()));
let sync_wg = Arc::new(WaitGroup::new());
let message_queue: Arc<FairMutex<VecDeque<String>>> = Arc::new(FairMutex::new(VecDeque::new()));
let (tx,rx): (Sender<Arc<Barrier>>, Receiver<Arc<Barrier>>) = unbounded();
let (clock_tx, _) = (tx.clone(), rx.clone());
thread::spawn({
let message_ref = Arc::clone(&message_queue);
let connected_clients = Arc::clone(&connected_clients);
let message_queue = Arc::clone(&message_queue);
move || {
loop{
println!("Phase 1");
//sync_wg.clone().wait();
println!("Removing item...");
message_ref.lock().pop_front();
println!("Phase 2");
//sync_wg.clone().wait();
println!("Done");
let online_clients = connected_clients.lock().unwrap().len();
if !message_queue.lock().is_empty(){
println!("message on queue detected");
let sync_group = Arc::new(Barrier::new(online_clients+1));
println!("sending to threads... {}",online_clients);
for _ in 0..online_clients{
println!("thread");
clock_tx.send(sync_group.clone()).unwrap();
}
println!("all threads updated!");
sync_group.wait();
println!("data removed");
message_queue.lock().pop_front();
sync_group.wait();
}
}
}
});
@ -67,51 +60,47 @@ fn main(){
println!("Connected: {}", address);
let clients_ref = Arc::clone(&connected_clients);
let message_ref = Arc::clone(&message_queue);
//let wg = sync_wg.clone();
let (_ , client_rx) = (tx.clone(), rx.clone());
pool.execute(move || {
handle_connection(&stream, &clients_ref, &message_ref, &sync_wg, &address.to_string());
handle_connection(&stream, &clients_ref, &message_ref, &address.to_string(), client_rx);
});
}
}
}
fn handle_connection(mut stream: &TcpStream, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, sync_wg: &WaitGroup, new_address: &String){
let wg = sync_wg.clone();
fn handle_connection(mut stream: &TcpStream, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>, new_address: &String, client_rx: Receiver<Arc<Barrier>>){
//let wg = sync_wg.clone();
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let mut buffer = [0; 1024];
let request = String::from("?request:");
network::transmit_data(stream, &request);
/*
* Share a message queue with all threads where operations that need to be performed are added
* to the queue, each thread will then increment a integer till its the same as the amount of
* clients online, then all threads will execute the command together.
*/
let mut connected = false;
// do while loop
while {
println!("loop start");
if connected == true && !message_queue.lock().is_empty() {
let (command, data) = format_data(message_queue);
let command = match_outbound_command(&command);
//BUG: gets stuck at waiting...
//change to a wait group so all threads are in sync
println!("waiting...");
//sync_wg.clone().wait();
sync_wg.wait();
println!("done");
//execute copied command
command.execute(stream, &mut buffer, &data);
println!("waiting...");
//sync_wg.clone().wait();
println!("done");
//remove data from front of queue
match client_rx.try_recv(){
/*handle our data*/
Ok(sync_group) => {
println!("data present");
let (command, data) = format_data(message_queue);
let command = match_outbound_command(&command);
println!("waiting 1");
sync_group.wait();
println!("executing");
command.execute(stream, &mut buffer, &data);
println!("waiting 2");
sync_group.wait();
println!("client updated");
},
/*sender disconnected*/
Err(TryRecvError::Disconnected) => {},
/*no data available yet*/
Err(TryRecvError::Empty) => {},
}
}else{
stream.read(&mut buffer).unwrap();
// after timeout handle error and do not execute the code below if there is an error
@ -148,7 +137,6 @@ fn tokenize(incoming_message: &str) -> Vec<String>{
fn match_command(data: &Vec<String>) -> Commands{
match data[0].as_str(){
"?request:" => Commands::Request,
"?info!" => Commands::Info,
"!success:" => Commands::Success,
"!error:" => Commands::Error,
@ -156,16 +144,14 @@ fn match_command(data: &Vec<String>) -> Commands{
"!disconnect:" => Commands::Disconnect,
"!clientUpdate:" => Commands::ClientUpdate,
"!clientInfo:" => Commands::ClientInfo,
"!client:" => Commands::Client,
"!test:" => Commands::Test,
"!message:" => Commands::Message,
_ => Commands::Unknown,
}
}
fn match_outbound_command(data: &Vec<String>) -> OutboundCommands{
match data[0].as_str(){
"!clientUpdate:" => OutboundCommands::ClientUpdate,
"!client:" => OutboundCommands::Client,
"!clientRemove:" => OutboundCommands::ClientRemove,
_ => OutboundCommands::Unknown,
}
}

View File

@ -12,19 +12,16 @@ mod message;
pub mod network;
use crate::client_management::client_profile::Client;
use crossbeam_queue::SegQueue;
use std::collections::VecDeque;
use parking_lot::FairMutex;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::io::{self, Read};
use std::net::TcpStream;
use std::time::Duration;
pub enum Commands{
Request,
Info,
Success,
Error,
@ -32,25 +29,22 @@ pub enum Commands{
Disconnect,
ClientUpdate,
ClientInfo,
Client,
Test,
Message,
Unknown,
}
pub enum OutboundCommands{
Success,
Error,
ClientUpdate,
Client,
ClientRemove,
Unknown,
}
impl Commands{
pub fn execute(&self, mut stream: &TcpStream, buffer: &mut [u8; 1024], data: &Vec<String>, address: &String, clients_ref: &Arc<Mutex<HashMap<String,Client>>>, message_queue: &Arc<FairMutex<VecDeque<String>>>){
match *self{
Commands::Request => {
},
Commands::Info => {
},
Commands::Success => {
},
@ -66,17 +60,17 @@ impl Commands{
message.push_str(&" ".to_string());
message.push_str(&data[1].to_string());
println!("message: {}", message);
message_queue.lock().push_back(message);
network::transmit_data(stream, &String::from("!success:"));
},
Commands::Disconnect => {
let message = String::from("!success:");
network::transmit_data(stream, &message);
let client_profile = disconnect::remove_client(clients_ref, &data[1]);
disconnect::remove_client(clients_ref, &data[1]);
/*
* repeat what connect does
*/
let mut message = "!clientRemove: ".to_string();
message.push_str(&client_profile.get_uuid().to_string());
message_queue.lock().push_back(message);
network::transmit_data(stream, &String::from("!success:"));
},
Commands::ClientUpdate => {
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
@ -88,7 +82,6 @@ impl Commands{
let formatted_data = client_update::format_client_data(&key, &value);
network::transmit_data(stream, &formatted_data);
failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
@ -131,17 +124,8 @@ impl Commands{
}
},
Commands::ClientInfo => {
let message = String::from("!success:");
network::transmit_data(stream, &message);
let requested_address = client_info::get_client_address(clients_ref, &data[1]);
network::transmit_data(stream, &requested_address);
},
Commands::Client => {
},
Commands::Test => {
},
Commands::Message => {
let requested_data = client_info::get_client_data(clients_ref, &data[1]);
network::transmit_data(stream, &requested_data);
},
Commands::Unknown => {
println!("Uknown Command!");
@ -155,35 +139,11 @@ impl OutboundCommands{
match *self{
OutboundCommands::Success => {},
OutboundCommands::Error => {},
OutboundCommands::ClientUpdate => {
OutboundCommands::Client => {
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
let message = String::from("!clientUpdate:");
network::transmit_data(stream, &message);
let mut failing = true;
//let mut buffer = [0; 1024];
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &message);
},
_ => panic!("Fatal Error {}", e),
}
},
Ok(m) => {
println!("{:?}", m);
failing = false;
},
};
}
network::transmit_data(stream, data);
failing = true;
let mut failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
@ -201,18 +161,19 @@ impl OutboundCommands{
},
};
}
},
OutboundCommands::ClientRemove => {
stream.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
network::transmit_data(stream, data);
let final_message = String::from("!success:");
network::transmit_data(stream, &final_message);
failing = true;
while failing{
let mut failing = true;
while failing{
let _ = match stream.read(&mut *buffer){
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("Blocking...");
network::transmit_data(stream, &final_message);
network::transmit_data(stream, data);
},
_ => panic!("Fatal Error {}", e),
}
@ -223,7 +184,6 @@ impl OutboundCommands{
},
};
}
},
OutboundCommands::Unknown => {
println!("Unknown Command!");

View File

@ -1,5 +0,0 @@
use crate::client_management::client_profile::Client;
use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;

View File

@ -4,12 +4,19 @@ use std::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
pub fn get_client_address(clients_ref: &Arc<Mutex<HashMap<String,Client>>>, uuid: &String) -> String{
// may not need to lock hashmap as it may cause difficulties later on
pub fn get_client_data(clients_ref: &Arc<Mutex<HashMap<String,Client>>>, uuid: &String) -> String{
let clients_hashmap = clients_ref.lock().unwrap();
let client = clients_hashmap.get(uuid);
match client{
Some(data) => data.get_address().to_string(),
Some(data) => {
let mut message = String::from("!success: ");
message.push_str(&data.get_uuid().to_string());
message.push_str(&" host:".to_string());
message.push_str(&data.get_address().to_string());
message.push_str(&" ".to_string());
message.push_str(&data.get_username().to_string());
message
},
None => String::from("client not online"),
}
}