Moved threads to tokio async

This commit is contained in:
michael-bailey 2021-07-30 11:50:08 +01:00
parent 0ed2c5a290
commit 17f6d2c78a
8 changed files with 334 additions and 409 deletions

View File

@ -1,32 +1,37 @@
use openssl::symm::{Cipher, Crypter, Mode};
use openssl::sha::sha256;
// use openssl::sha::sha256;
// use openssl::symm::{Cipher, Crypter, Mode};
#[cfg(test)]
mod test {
use openssl::symm::{Cipher, Crypter, Mode};
use openssl::sha::sha256;
use openssl::symm::{Cipher, Crypter, Mode};
#[test]
fn testEncryption() {
let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes();
let key = sha256(b"This is a key");
let key = sha256(b"This is a key");
let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let mut ciphertext = vec![0u8; 1024];
let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap();
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let mut ciphertext = vec![0u8; 1024];
let cipherlen = encrypter
.unwrap()
.update(plaintext, ciphertext.as_mut_slice())
.unwrap();
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let mut decrypted = vec![0u8; 1024];
decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap();
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let mut decrypted = vec![0u8; 1024];
decrypter
.unwrap()
.update(&ciphertext[..cipherlen], decrypted.as_mut_slice())
.unwrap();
println!("{:?}", plaintext);
println!("{:?}", ciphertext.as_slice());
println!("{:?}", decrypted.as_slice());
println!("{:?}", plaintext);
println!("{:?}", ciphertext.as_slice());
println!("{:?}", decrypted.as_slice());
println!("{:?}", plaintext.len());
println!("{:?}", ciphertext.len());
println!("{:?}", decrypted.len());
println!("{:?}", plaintext.len());
println!("{:?}", ciphertext.len());
println!("{:?}", decrypted.len());
}
}
}

View File

@ -1,11 +1,10 @@
pub mod encryption;
pub mod messages;
pub mod prelude;
pub mod encryption;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/**
* #ClientDetails.
* This defines the fileds a client would want to send when connecitng
@ -21,5 +20,3 @@ pub struct ClientDetails {
pub address: String,
pub public_key: Option<Vec<u8>>,
}

View File

@ -15,5 +15,7 @@ crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
zeroize = "1.1.0"
openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
foundation = {path = '../foundation'}

View File

@ -1,56 +1,47 @@
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use foundation::prelude::IPreemptive;
use std::cmp::Ordering;
use std::io::BufRead;
use std::io::Write;
use std::io::{BufReader, BufWriter};
use std::mem::replace;
use std::net::TcpStream;
use std::sync::Arc;
use std::sync::Mutex;
use std::cmp::Ordering;
use std::fmt::Write;
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde::Serialize;
use uuid::Uuid;
use zeroize::Zeroize;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::prelude::IMessagable;
use futures::lock::Mutex;
use tokio::task;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::sync::mpsc::{Sender, Receiver, channel};
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use foundation::ClientDetails;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
/// # Client
/// This struct represents a connected user.
///
/// ## Attrubutes
/// ## Attributes
/// - details: store of the clients infomation.
///
/// - stream: The socket for the connected client.
/// - stream_reader: the buffered reader used to receive messages
/// - stream_writer: the buffered writer used to send messages
/// - owner: An optional reference to the owning object.
#[derive(Debug, Serialize)]
#[derive(Debug)]
pub struct Client {
pub details: ClientDetails,
// non serializable
#[serde(skip)]
server_channel: Mutex<Option<Sender<ServerMessage>>>,
// server send channel
server_channel: Mutex<Sender<ServerMessage>>,
#[serde(skip)]
input: Sender<ClientMessage>,
// object channels
tx: Sender<ClientMessage>,
rx: Mutex<Receiver<ClientMessage>>,
#[serde(skip)]
output: Receiver<ClientMessage>,
#[serde(skip)]
stream: Mutex<Option<TcpStream>>,
#[serde(skip)]
stream_reader: Mutex<Option<BufReader<TcpStream>>>,
#[serde(skip)]
stream_writer: Mutex<Option<BufWriter<TcpStream>>>,
stream_rx: Mutex<BufReader<ReadHalf<tokio::net::TcpStream>>>,
stream_tx: Mutex<WriteHalf<tokio::net::TcpStream>>,
}
// client funciton implmentations
@ -59,13 +50,11 @@ impl Client {
uuid: String,
username: String,
address: String,
stream: TcpStream,
stream_rx: BufReader<ReadHalf<tokio::net::TcpStream>>,
stream_tx: WriteHalf<tokio::net::TcpStream>,
server_channel: Sender<ServerMessage>,
) -> Arc<Client> {
let (sender, receiver) = unbounded();
let out_stream = stream.try_clone().unwrap();
let in_stream = stream.try_clone().unwrap();
let (sender, receiver) = channel(1024);
Arc::new(Client {
details: ClientDetails {
@ -75,172 +64,134 @@ impl Client {
public_key: None
},
server_channel: Mutex::new(Some(server_channel)),
server_channel: Mutex::new(server_channel),
input: sender,
output: receiver,
tx: sender,
rx: Mutex::new(receiver),
stream: Mutex::new(Some(stream)),
stream_reader: Mutex::new(Some(BufReader::new(in_stream))),
stream_writer: Mutex::new(Some(BufWriter::new(out_stream))),
stream_rx: Mutex::new(stream_rx),
stream_tx: Mutex::new(stream_tx),
})
}
}
impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client {
fn send_message(&self, msg: ClientMessage) {
self.input
.send(msg)
.expect("failed to send message to client.");
}
fn set_sender(&self, sender: Sender<ServerMessage>) {
let mut server_lock = self.server_channel.lock().unwrap();
let _ = replace(&mut *server_lock, Some(sender));
}
}
pub fn start(self: &Arc<Client>) {
// cooperative multitasking implementation
impl IPreemptive for Client {
fn run(arc: &Arc<Self>) {
let arc1 = arc.clone();
let arc2 = arc.clone();
let t1_client = self.clone();
let t2_client = self.clone();
// read thread
let _ = std::thread::Builder::new()
.name(format!("client thread recv [{:?}]", &arc.details.uuid))
.spawn(move || {
use ClientMessage::Disconnect;
let arc = arc1;
// client stream read task
tokio::spawn(async move {
use ClientMessage::Disconnect;
let client = t1_client;
let mut lock = client.stream_tx.lock().await;
let mut buffer = String::new();
// tell client that is is now connected
let _ = writeln!(buffer, "{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = lock.write_all(&buffer.as_bytes());
let _ = lock.flush().await;
drop(lock);
loop {
let mut stream_reader = client.stream_rx.lock().await;
let mut buffer = String::new();
let mut reader_lock = arc.stream_reader.lock().unwrap();
let reader = reader_lock.as_mut().unwrap();
'main: while let Ok(size) = reader.read_line(&mut buffer) {
if size == 0 {
arc.send_message(Disconnect);
break 'main;
}
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
println!("[Client {:?}]: recieved {}", arc.details.uuid, &buffer);
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
match command {
Ok(ClientStreamIn::Disconnect) => {
println!("[Client {:?}]: Disconnect recieved", &arc.details.uuid);
arc.send_message(Disconnect);
break 'main;
println!("[Client {:?}]: Disconnect recieved", &client.details.uuid);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!("[Client {:?}]: send message to: {:?}", &arc.details.uuid, &to);
let lock = arc.server_channel.lock().unwrap();
let sender = lock.as_ref().unwrap();
let _ = sender.send(ServerMessage::ClientSendMessage {
from: arc.details.uuid,
println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
});
}
Ok(ClientStreamIn::Update) => {
let lock = arc.server_channel.lock().unwrap();
let sender = lock.as_ref().unwrap();
let _ = sender.send(ServerMessage::ClientUpdate { to: arc.details.uuid });
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid });
}
_ => println!("[Client {:?}]: command not found", &arc.details.uuid),
_ => println!("[Client {:?}]: command not found", &client.details.uuid),
}
buffer.zeroize();
}
println!("[Client {:?}] exited thread 1", &arc.details.uuid);
});
println!("[Client {:?}] exited thread 1", &client.details.uuid);
}
});
// write thread
let _ = std::thread::Builder::new()
.name(format!("client thread msg [{:?}]", &arc.details.uuid))
.spawn(move || {
let arc = arc2;
let mut writer_lock = arc.stream_writer.lock().unwrap();
let writer = writer_lock.as_mut().unwrap();
let mut buffer: Vec<u8> = Vec::new();
// client channel read thread
tokio::spawn(async move {
use ClientMessage::{Disconnect, Message, SendClients};
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = writer.write_all(&buffer);
let _ = writer.flush();
let client = t2_client;
'main: loop {
for message in arc.output.iter() {
use ClientMessage::{Disconnect, Message, SendClients};
println!("[Client {:?}]: {:?}", &arc.details.uuid, message);
match message {
Disconnect => {
arc.server_channel
.lock()
.unwrap()
.as_mut()
.unwrap()
.send(ServerMessage::ClientDisconnected { id: arc.details.uuid })
.unwrap();
break 'main;
}
Message { from, content } => {
let msg = &ClientStreamOut::UserMessage { from, content };
let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap());
let _ = writer.write_all(&buffer);
let _ = writer.flush();
}
SendClients { clients } => {
let client_details_vec: Vec<ClientDetails> = clients
.iter()
.map(|client| &client.details)
.cloned()
.collect();
loop {
let mut channel = client.rx.lock().await;
let mut buffer = String::new();
let msg = &ClientStreamOut::ConnectedClients {
clients: client_details_vec,
};
let message = channel.recv().await.unwrap();
drop(channel);
let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap());
let _ = writer.write_all(&buffer);
let _ = writer.flush();
}
}
buffer.zeroize();
println!("[Client {:?}]: {:?}", &client.details.uuid, message);
match message {
Disconnect => {
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await;
return
}
Message { from, content } => {
let msg = ClientStreamOut::UserMessage { from, content };
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes());
let _ = stream.flush().await;
drop(stream);
}
SendClients { clients } => {
let client_details_vec: Vec<ClientDetails> = clients
.iter()
.map(|client| &client.details)
.cloned()
.collect();
let msg = ClientStreamOut::ConnectedClients {
clients: client_details_vec,
};
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes());
let _ = stream.flush().await;
}
}
println!("[Client {:?}]: exited thread 2", &arc.details.uuid);
});
}
});
}
fn start(arc: &Arc<Self>) {
Client::run(arc)
}
}
// default value implementation
impl Default for Client {
fn default() -> Self {
let (sender, reciever) = unbounded();
Client {
details: ClientDetails {
uuid: Uuid::new_v4(),
username: "generic_client".to_string(),
address: "127.0.0.1".to_string(),
public_key: None
},
output: reciever,
input: sender,
server_channel: Mutex::new(None),
stream: Mutex::new(None),
stream_reader: Mutex::new(None),
stream_writer: Mutex::new(None),
}
pub async fn send_message(self: &Arc<Client>, msg: ClientMessage) {
let _ = self.tx.send(msg).await;
}
}

View File

@ -1,18 +1,15 @@
// use crate::lib::server::ServerMessages;
use foundation::prelude::IPreemptive;
use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
use std::sync::Mutex;
use crossbeam_channel::{unbounded, Receiver, Sender};
use uuid::Uuid;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use futures::lock::Mutex;
use crate::client::Client;
use crate::messages::ClientMessage;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use foundation::prelude::IMessagable;
/// # ClientManager
/// This struct manages all connected users
@ -22,92 +19,85 @@ pub struct ClientManager {
server_channel: Mutex<Sender<ServerMessage>>,
sender: Sender<ClientMgrMessage>,
receiver: Receiver<ClientMgrMessage>,
tx: Sender<ClientMgrMessage>,
rx: Mutex<Receiver<ClientMgrMessage>>,
}
impl ClientManager {
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (sender, receiver) = unbounded();
let (tx, rx) = channel(1024);
Arc::new(ClientManager {
clients: Mutex::default(),
server_channel: Mutex::new(server_channel),
sender,
receiver,
tx,
rx: Mutex::new(rx),
})
}
fn send_to_client(&self, id: &Uuid, msg: ClientMessage) {
let lock = self.clients.lock().unwrap();
if let Some(client) = lock.get(id) {
client.send_message(msg)
}
}
}
pub fn start(self: &Arc<ClientManager>) {
impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
fn send_message(&self, msg: ClientMgrMessage) {
self.sender.send(msg).unwrap();
}
fn set_sender(&self, sender: Sender<ServerMessage>) {
let mut server_lock = self.server_channel.lock().unwrap();
let _ = replace(&mut *server_lock, sender);
}
}
let client_manager = self.clone();
impl IPreemptive for ClientManager {
fn run(arc: &Arc<Self>) {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
tokio::spawn(async move {
if !arc.receiver.is_empty() {
for message in arc.receiver.try_iter() {
println!("[Client manager]: recieved message: {:?}", message);
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage};
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage};
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
Client::start(&client);
let mut lock = arc.clients.lock().unwrap();
if lock.insert(client.details.uuid, client).is_none() {
println!("value is new");
}
loop {
let mut receiver = client_manager.rx.lock().await;
let message = receiver.recv().await.unwrap();
println!("[Client manager]: recieved message: {:?}", message);
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
client.start();
let mut lock = client_manager.clients.lock().await;
if lock.insert(client.details.uuid, client).is_none() {
println!("value is new");
}
Remove(uuid) => {
println!("[Client Manager]: removing client: {:?}", &uuid);
if let Some(client) = arc.clients.lock().unwrap().remove(&uuid) {
client.send_message(ClientMessage::Disconnect);
}
}
SendMessage { to, from, content } => {
arc.send_to_client(&to, ClientMessage::Message { from, content })
}
SendClients { to } => {
let lock = arc.clients.lock().unwrap();
if let Some(client) = lock.get(&to) {
let clients_vec: Vec<Arc<Client>> =
lock.values().cloned().collect();
client.send_message(ClientMessage::SendClients {
clients: clients_vec,
})
}
}
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}
Remove(uuid) => {
println!("[Client Manager]: removing client: {:?}", &uuid);
if let Some(client) = client_manager.clients.lock().await.remove(&uuid) {
client.send_message(ClientMessage::Disconnect).await;
}
}
SendMessage { to, from, content } => {
client_manager.send_to_client(&to, ClientMessage::Message { from, content }).await;
}
SendClients { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
let clients_vec: Vec<Arc<Client>> =
lock.values().cloned().collect();
client.send_message(ClientMessage::SendClients {
clients: clients_vec,
}).await
}
}
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}
}
});
}
async fn send_to_client(self: &Arc<ClientManager>, id: &Uuid, msg: ClientMessage) {
let lock = self.clients.lock().await;
if let Some(client) = lock.get(&id) {
client.clone().send_message(msg).await;
}
}
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || ClientManager::run(&arc));
pub async fn send_message(
self: Arc<ClientManager>,
message: ClientMgrMessage)
{
let _ = self.tx.send(message).await;
}
}

View File

@ -4,12 +4,14 @@ pub mod messages;
pub mod network_manager;
pub mod server;
use std::io;
use clap::{App, Arg};
use foundation::prelude::IPreemptive;
use server::Server;
fn main() {
#[tokio::main]
async fn main() -> io::Result<()> {
let _args = App::new("--rust chat server--")
.version("0.1.5")
.author("Mitchel Hardie <mitch161>, Michael Bailey <michael-bailey>")
@ -26,7 +28,8 @@ fn main() {
)
.get_matches();
let server = Server::new();
let server = Server::new().unwrap();
Server::run(&server);
server.start().await;
Ok(())
}

View File

@ -1,128 +1,104 @@
use foundation::prelude::IPreemptive;
use std::io::BufRead;
use std::io::BufReader;
use std::io::BufWriter;
use std::io::Write;
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
use std::io::Write;
use crossbeam_channel::Sender;
use tokio::task;
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::client::Client;
use crate::messages::ServerMessage;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
pub struct NetworkManager {
listener: TcpListener,
address: String,
server_channel: Sender<ServerMessage>,
}
impl NetworkManager {
pub fn new(port: String, server_channel: Sender<ServerMessage>) -> Arc<NetworkManager> {
let mut address = "0.0.0.0:".to_string();
address.push_str(&port);
let listener = TcpListener::bind(address).expect("Could not bind to address");
pub fn new(_port: String, server_channel: Sender<ServerMessage>) -> Arc<NetworkManager> {
Arc::new(NetworkManager {
listener,
address: "0.0.0.0:5600".to_string(),
server_channel,
})
}
}
impl IPreemptive for NetworkManager {
fn run(_: &Arc<Self>) {}
pub fn start(self: &Arc<NetworkManager>) {
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || {
// fetch new connections and add them to the client queue
for connection in arc.listener.incoming() {
println!("[NetworkManager]: New Connection!");
match connection {
Ok(stream) => {
let server_channel = arc.server_channel.clone();
let network_manager = self.clone();
// create readers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
tokio::spawn(async move {
let listener = TcpListener::bind(network_manager.address.clone()).await.unwrap();
let _handle = thread::Builder::new()
.name("NetworkJoinThread".to_string())
.spawn(move || {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
loop {
let (connection, _) = listener.accept().await.unwrap();
let (rd, mut wd) = io::split(connection);
let mut reader = BufReader::new(rd);
let server_channel = network_manager.server_channel.clone();
// send request message to connection
task::spawn(async move {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
let _ = writeln!(
out_buffer,
"{}",
serde_json::to_string(&NetworkSockOut::Request).unwrap()
// write request
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
println!("{:?}", &a);
let _ = writeln!(
out_buffer,
"{}",
a
);
let _ = wd.write_all(&out_buffer).await;
let _ = wd.flush().await;
// get response
let _ = reader.read_line(&mut in_buffer).await.unwrap();
//match the response
if let Ok(request) =
serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
let _ = wd.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
},
)
.unwrap()
.as_bytes(),
).await;
let _ = wd.write_all(b"\n").await;
let _ = wd.flush().await;
}
NetworkSockIn::Connect {
uuid,
username,
address,
} => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
reader,
wd,
server_channel.clone(),
);
let _ = writer.write_all(&out_buffer);
let _ = writer.flush();
// try get response
let res = reader.read_line(&mut in_buffer);
if res.is_err() {
return;
}
//match the response
if let Ok(request) =
serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer
.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
},
)
.unwrap()
.as_bytes(),
)
.unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect {
uuid,
username,
address,
} => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
server_channel.clone(),
);
server_channel
.send(ServerMessage::ClientConnected {
client: new_client,
})
.unwrap_or_default();
}
}
}
});
let _ = server_channel
.send(ServerMessage::ClientConnected {
client: new_client,
}).await;
}
}
}
Err(e) => {
println!("[Network manager]: error getting stream: {:?}", e);
continue;
}
}
});
}
});
});
}
}

View File

@ -1,15 +1,15 @@
use std::sync::Arc;
use crossbeam_channel::{unbounded, Receiver};
// use crossbeam_channel::{unbounded, Receiver};
use uuid::Uuid;
use tokio::task;
use tokio::sync::mpsc::{channel, Receiver};
use futures::lock::Mutex;
use crate::client_manager::ClientManager;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
use foundation::prelude::ICooperative;
use foundation::prelude::IMessagable;
use foundation::prelude::IPreemptive;
/// # ServerMessages
/// This is used internally to send messages to the server to be dispatched
@ -19,67 +19,68 @@ pub enum ServerMessages<TClient> {
ClientDisconnected(Uuid),
}
/// # Server
/// authors: @michael-bailey, @Mitch161
/// This Represents a server instance.
/// it is componsed of a client manager and a network manager
///
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
receiver: Receiver<ServerMessage>,
receiver: Mutex<Receiver<ServerMessage>>,
}
impl Server {
pub fn new() -> Arc<Server> {
let (sender, receiver) = unbounded();
/// Create a new server object
pub fn new() -> Result<Arc<Server>, Box<dyn std::error::Error>> {
let (sender, receiver) = channel(1024);
Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
receiver,
})
Ok(
Arc::new(
Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
receiver: Mutex::new(receiver),
}
)
)
}
}
impl ICooperative for Server {
fn tick(&self) {
pub async fn start(self: &Arc<Server>) {
// start client manager and network manager
self.network_manager.clone().start();
self.client_manager.clone().start();
// clone block items
let server = self.clone();
use ClientMgrMessage::{Add, Remove, SendMessage};
// handle new messages loop
if !self.receiver.is_empty() {
for message in self.receiver.try_iter() {
loop {
let mut lock = server.receiver.lock().await;
if let Some(message) = lock.recv().await {
println!("[server]: received message {:?}", &message);
match message {
ServerMessage::ClientConnected { client } => {
self.client_manager.send_message(Add(client))
server.client_manager.clone()
.send_message(Add(client)).await
}
ServerMessage::ClientDisconnected { id } => {
println!("disconnecting client {:?}", id);
self.client_manager.send_message(Remove(id));
server.client_manager.clone().send_message(Remove(id)).await;
}
ServerMessage::ClientSendMessage { from, to, content } => self
.client_manager
.send_message(SendMessage { from, to, content }),
ServerMessage::ClientUpdate { to } => self
.client_manager
.send_message(ClientMgrMessage::SendClients { to }),
ServerMessage::ClientSendMessage { from, to, content } => server
.client_manager.clone()
.send_message(SendMessage { from, to, content }).await,
ServerMessage::ClientUpdate { to } => server
.client_manager.clone()
.send_message(ClientMgrMessage::SendClients { to }).await,
}
}
}
}
}
impl IPreemptive for Server {
fn run(arc: &std::sync::Arc<Self>) {
// start services
NetworkManager::start(&arc.network_manager);
ClientManager::start(&arc.client_manager);
loop {
arc.tick();
}
}
fn start(arc: &std::sync::Arc<Self>) {
let arc = arc.clone();
// start thread
std::thread::spawn(move || Server::run(&arc));
}
}