fixing blocking issues with componenets

This commit is contained in:
michael-bailey 2021-03-21 19:28:39 +00:00
parent 31e455407c
commit dfbb701dbf
5 changed files with 142 additions and 89 deletions

View File

@ -104,24 +104,43 @@ impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client{
// cooperative multitasking implementation
impl ICooperative for Client {
fn tick(&self) {
// aquire locks (so value isn't dropped)
let mut reader_lock = self.stream_reader.lock().unwrap();
let mut writer_lock = self.stream_writer.lock().unwrap();
println!("[client]: Tick!");
{
// aquire locks (so value isn't dropped)
let mut reader_lock = self.stream_reader.lock().unwrap();
let mut writer_lock = self.stream_writer.lock().unwrap();
// aquiring mutable buffers
let reader = reader_lock.as_mut().unwrap();
let _writer = writer_lock.as_mut().unwrap();
// aquiring mutable buffers
let reader = reader_lock.as_mut().unwrap();
let _writer = writer_lock.as_mut().unwrap();
// create buffer
let mut buffer = String::new();
// create buffer
let mut buffer = String::new();
// loop over all lines that have been sent.
while let Ok(_size) = reader.read_line(&mut buffer) {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str()).unwrap();
// loop over all lines that have been sent.
while let Ok(_size) = reader.read_line(&mut buffer) {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str()).unwrap();
match command {
ClientStreamIn::Disconnect => println!("got Disconnect"),
_ => println!("New command found"),
match command {
ClientStreamIn::Disconnect => println!("got Disconnect"),
_ => println!("New command found"),
}
}
}
{
for message in self.output.iter() {
use ClientMessage::{Disconnect};
match message {
Disconnect => {
let lock = self.server_channel.lock().unwrap();
if let Some(sender) = lock.as_ref() {
sender.send(ServerMessage::ClientDisconnected(self.uuid)).unwrap();
}
},
_ => println!("command not implemneted yet"),
}
}
}

View File

@ -54,25 +54,28 @@ impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
impl ICooperative for ClientManager {
fn tick(&self) {
println!("[client manager]: Tick!");
for message in self.receiver.iter() {
use ClientMgrMessage::{Add, Remove, SendMessage};
if !self.receiver.is_empty() {
for message in self.receiver.iter() {
use ClientMgrMessage::{Add, Remove, SendMessage};
match message {
Add(client) => {
self.clients.lock().unwrap().insert(client.uuid, client).unwrap();
},
Remove(uuid) => {
let _ = self.clients.lock().unwrap().remove(&uuid);
},
SendMessage(to_uuid, from_uuid, content) => {
let lock = self.clients.lock().unwrap();
if let Some(client) = lock.get(&to_uuid) {
client.send_message(ClientMessage::Message(from_uuid, content))
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented")
match message {
Add(client) => {
self.clients.lock().unwrap().insert(client.uuid, client).unwrap_or_default();
},
Remove(uuid) => {
let _ = self.clients.lock().unwrap().remove(&uuid);
},
SendMessage(to_uuid, from_uuid, content) => {
let lock = self.clients.lock().unwrap();
if let Some(client) = lock.get(&to_uuid) {
client.send_message(ClientMessage::Message(from_uuid, content))
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented")
}
}
}

View File

@ -3,18 +3,21 @@ use std::sync::Arc;
use crate::client::Client;
#[derive(Debug)]
pub enum ClientMessage {
Message(Uuid, String),
Disconnect,
}
#[derive(Debug)]
pub enum ClientMgrMessage {
Remove(Uuid),
Add(Arc<Client>),
SendMessage(Uuid, Uuid, String),
}
#[derive(Debug)]
pub enum ServerMessage {
ClientConnected(Arc<Client>),
ClientDisconnected(Uuid)

View File

@ -27,6 +27,7 @@ impl NetworkManager {
let listener = TcpListener::bind(address)
.expect("Could not bind to address");
listener.set_nonblocking(true).unwrap();
Arc::new(NetworkManager {
listener,
@ -37,61 +38,74 @@ impl NetworkManager {
impl ICooperative for NetworkManager {
fn tick(&self) {
println!("[NetworkManager]: Tick!");
// get all new connections
// handle each request
println!("[NetworkManager]: handling new connections!");
for connection in self.listener.incoming() {
if let Ok(stream) = connection {
match connection {
Ok(stream) => {
stream.set_nonblocking(false).expect("[NetworkManager]: cant set non-blocking on connection");
// create buffered writers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let mut buffer = String::new();
// create buffered writers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
// send request message to connection
writer.write_all(
serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes()
).unwrap_or_default();
writer.write_all(b"\n").unwrap_or_default();
writer.flush().unwrap_or_default();
let mut buffer = String::new();
// read the new request into a buffer
let res = reader.read_line(&mut buffer);
// send request message to connection
writer.write_all(
serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes()
).unwrap_or_default();
writer.write_all(b"\n").unwrap_or_default();
writer.flush().unwrap_or_default();
// if reading caused an error skip the connection
if res.is_err() {continue;}
// read the new request into a buffer
let res = reader.read_line(&mut buffer);
// turn into enum and perform pattern matching
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&buffer) {
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect { uuid, username, address } => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
self.server_channel.clone()
);
self.server_channel.send(
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
}
}
}
}
// if reading caused an error skip the connection
if res.is_err() {continue;}
// turn into enum and perform pattern matching
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&buffer) {
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect { uuid, username, address } => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
self.server_channel.clone()
);
self.server_channel.send(
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
}
}
}
}
Err(e) => {
println!("[NetworkManager]: got error {:?}", e);
break;
}
}
}
println!("[NetworkManager], ending tick!")
}
}

View File

@ -1,12 +1,14 @@
use crate::messages::ServerMessage;
use uuid::Uuid;
use std::sync::Arc;
use uuid::Uuid;
use crossbeam_channel::{Receiver, unbounded};
use foundation::prelude::ICooperative;
use foundation::prelude::IMessagable;
use crate::client_manager::ClientManager;
use crate::network_manager::NetworkManager;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
/// # ServerMessages
/// This is used internally
@ -40,20 +42,32 @@ impl Server {
impl ICooperative for Server{
fn tick(&self) {
println!("[server]: Tick!");
use ClientMgrMessage::{Remove, Add};
// handle new messages loop
for message in self.receiver.try_iter() {
match message {
ServerMessage::ClientConnected(_client) => {
},
ServerMessage::ClientDisconnected(_uuid) => {
}
}
}
if !self.receiver.is_empty() {
println!("[server]: entering loop!");
for message in self.receiver.try_iter() {
println!("[server]: received message {:?}", &message);
match message {
ServerMessage::ClientConnected(client) => {
self.client_manager.send_message(Add(client))
},
ServerMessage::ClientDisconnected(uuid) => {
println!("disconnecting client {:?}", uuid);
self.client_manager.send_message(Remove(uuid));
}
}
}
}
// alocate time for other components
println!("[server]: allocating time for others");
self.network_manager.tick();
self.client_manager.tick();
}
}