adding threading support to prevent blocking calls

This commit is contained in:
michael-bailey 2021-03-23 10:04:58 +00:00
parent 2dcd6045ae
commit c9bb9d7567
5 changed files with 142 additions and 147 deletions

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
pub trait IMessagable<TMessage, TSender> {
fn send_message(&self, msg: TMessage);
fn set_sender(&self, sender: TSender);
@ -5,4 +7,9 @@ pub trait IMessagable<TMessage, TSender> {
pub trait ICooperative {
fn tick(&self);
}
pub trait IPreemtive {
fn run(arc: &Arc<Self>) {}
fn start(arc: &Arc<Self>);
}

View File

@ -1,4 +1,5 @@
// use crate::lib::server::ServerMessages;
use foundation::prelude::IPreemtive;
use std::sync::Arc;
use std::sync::Mutex;
use std::collections::HashMap;
@ -52,35 +53,43 @@ impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
}
}
impl ICooperative for ClientManager {
fn tick(&self) {
println!("[client manager]: Tick!");
impl IPreemtive for ClientManager {
if !self.receiver.is_empty() {
for message in self.receiver.iter() {
use ClientMgrMessage::{Add, Remove, SendMessage};
fn run(arc: &Arc<Self>) {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
match message {
Add(client) => {
self.clients.lock().unwrap().insert(client.uuid, client).unwrap_or_default();
},
Remove(uuid) => {
let _ = self.clients.lock().unwrap().remove(&uuid);
},
SendMessage(to_uuid, from_uuid, content) => {
let lock = self.clients.lock().unwrap();
if let Some(client) = lock.get(&to_uuid) {
client.send_message(ClientMessage::Message(from_uuid, content))
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented")
}
}
}
println!("[client manager]: Tick!");
// allocate time for clients.
let clients = self.clients.lock().unwrap();
let _ = clients.iter().map(|(_uuid, client)| client.tick());
if !arc.receiver.is_empty() {
for message in arc.receiver.iter() {
use ClientMgrMessage::{Add, Remove, SendMessage};
match message {
Add(client) => {
arc.clients.lock().unwrap().insert(client.uuid, client).unwrap_or_default();
},
Remove(uuid) => {
let _ = arc.clients.lock().unwrap().remove(&uuid);
},
SendMessage(to_uuid, from_uuid, content) => {
let lock = arc.clients.lock().unwrap();
if let Some(client) = lock.get(&to_uuid) {
client.send_message(ClientMessage::Message(from_uuid, content))
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented")
}
}
}
}
}
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || {
ClientManager::run(&arc)
});
}
}

View File

@ -6,7 +6,7 @@ pub mod messages;
use clap::{App, Arg};
use foundation::prelude::ICooperative;
use foundation::prelude::IPreemtive;
use server::Server;
fn main() {
@ -25,7 +25,5 @@ fn main() {
let server = Server::new();
loop {
server.tick();
}
Server::run(&server);
}

View File

@ -1,25 +1,21 @@
use std::sync::Mutex;
use std::collections::VecDeque;
use std::net::TcpStream;
use crate::client::Client;
use crate::messages::ServerMessage;
use foundation::prelude::IPreemtive;
use std::io::BufWriter;
use std::io::BufReader;
use std::sync::Arc;
use std::net::TcpListener;
use std::io::Write;
use std::io::BufRead;
use std::io::Read;
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
use crossbeam_channel::{Sender};
use foundation::prelude::ICooperative;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use crate::client::Client;
use crate::messages::ServerMessage;
pub struct NetworkManager {
listener: TcpListener,
connection_queue: Mutex<VecDeque<TcpStream>>,
server_channel: Sender<ServerMessage>,
}
@ -33,122 +29,84 @@ impl NetworkManager {
let listener = TcpListener::bind(address)
.expect("Could not bind to address");
listener.set_nonblocking(true).unwrap();
Arc::new(NetworkManager {
listener,
connection_queue: Mutex::new(VecDeque::new()),
server_channel,
})
}
}
impl ICooperative for NetworkManager {
fn tick(&self) {
println!("[NetworkManager]: Tick!");
impl IPreemtive for NetworkManager {
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || {
// fetch new connections and add them to the client queue
for connection in arc.listener.incoming() {
println!("[NetworkManager]: New Connection!");
match connection {
Ok(stream) => {
let server_channel = arc.server_channel.clone();
// create readers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
// fetch new connections and add them to the client queue
let mut lock = self.connection_queue.lock().unwrap();
for connection in self.listener.incoming() {
match connection {
Ok(stream) => {
&mut lock.push_back(stream);
},
Err(e) => {
println!("[Network manager]: error getting stream: {:?}", e);
},
}
}
drop(lock);
let _handle = thread::Builder::new()
.name("NetworkJoinThread".to_string())
.spawn(move || {
let mut buffer = String::new();
// attempt to handle current connections
let mut buffer = String::new();
let mut lock = self.connection_queue.lock().unwrap();
for i in 1..lock.len() {
let mut stream = lock.pop_front();
if let Some(stream) = stream {
// send request message to connection
writer.write_all(
serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes()
).unwrap_or_default();
writer.write_all(b"\n").unwrap_or_default();
writer.flush().unwrap_or_default();
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let mut reader = BufReader::new(stream.try_clone().unwrap());
// try get response
let res = reader.read_line(&mut buffer);
if res.is_err() {return;}
if let Ok(_) = reader.read_line(&mut buffer) {
println!("recieved");
} else {
continue;
}
}
}
drop(lock);
// get all new connections
// handle each request
println!("[NetworkManager]: handling new connections!");
for connection in self.listener.incoming() {
match connection {
Ok(stream) => {
stream.set_nonblocking(false).expect("[NetworkManager]: cant set non-blocking on connection");
// create buffered writers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let mut buffer = String::new();
// send request message to connection
writer.write_all(
serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes()
).unwrap_or_default();
writer.write_all(b"\n").unwrap_or_default();
writer.flush().unwrap_or_default();
let res = reader.read_line(&mut buffer);
// if reading caused an error skip the connection
if res.is_err() {continue;}
// turn into enum and perform pattern matching
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&buffer) {
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect { uuid, username, address } => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
self.server_channel.clone()
);
self.server_channel.send(
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
}
}
}
}
Err(e) => {
println!("[NetworkManager]: got error {:?}", e);
break;
}
}
}
println!("[NetworkManager], ending tick!")
//match the response
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&buffer) {
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect { uuid, username, address } => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
server_channel.clone()
);
server_channel.send(
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
}
}
}
});
},
Err(e) => {
println!("[Network manager]: error getting stream: {:?}", e);
continue;
},
}
}
});
}
}

View File

@ -1,8 +1,10 @@
use std::sync::Arc;
use std::thread;
use uuid::Uuid;
use crossbeam_channel::{Receiver, unbounded};
use foundation::prelude::IPreemtive;
use foundation::prelude::ICooperative;
use foundation::prelude::IMessagable;
use crate::client_manager::ClientManager;
@ -67,7 +69,28 @@ impl ICooperative for Server{
// alocate time for other components
println!("[server]: allocating time for others");
self.network_manager.tick();
self.client_manager.tick();
//
}
}
impl IPreemtive for Server {
fn run(arc: &std::sync::Arc<Self>) {
// start services
NetworkManager::start(&arc.network_manager);
ClientManager::start(&arc.client_manager);
loop {
thread::sleep(std::time::Duration::from_secs(1));
arc.tick();
}
}
fn start(arc: &std::sync::Arc<Self>) {
let arc = arc.clone();
// start thread
std::thread::spawn(move || {
Server::run(&arc)
});
}
}