Moved threads to tokio async
This commit is contained in:
parent
0ed2c5a290
commit
2f8677710a
|
|
@ -1,32 +1,37 @@
|
|||
use openssl::symm::{Cipher, Crypter, Mode};
|
||||
use openssl::sha::sha256;
|
||||
|
||||
// use openssl::sha::sha256;
|
||||
// use openssl::symm::{Cipher, Crypter, Mode};
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use openssl::symm::{Cipher, Crypter, Mode};
|
||||
use openssl::sha::sha256;
|
||||
use openssl::symm::{Cipher, Crypter, Mode};
|
||||
|
||||
#[test]
|
||||
fn testEncryption() {
|
||||
let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes();
|
||||
let key = sha256(b"This is a key");
|
||||
let key = sha256(b"This is a key");
|
||||
let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
|
||||
|
||||
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
|
||||
let mut ciphertext = vec![0u8; 1024];
|
||||
let cipherlen = encrypter.unwrap().update(plaintext, ciphertext.as_mut_slice()).unwrap();
|
||||
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
|
||||
let mut ciphertext = vec![0u8; 1024];
|
||||
let cipherlen = encrypter
|
||||
.unwrap()
|
||||
.update(plaintext, ciphertext.as_mut_slice())
|
||||
.unwrap();
|
||||
|
||||
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
|
||||
let mut decrypted = vec![0u8; 1024];
|
||||
decrypter.unwrap().update(&ciphertext[..cipherlen], decrypted.as_mut_slice()).unwrap();
|
||||
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
|
||||
let mut decrypted = vec![0u8; 1024];
|
||||
decrypter
|
||||
.unwrap()
|
||||
.update(&ciphertext[..cipherlen], decrypted.as_mut_slice())
|
||||
.unwrap();
|
||||
|
||||
println!("{:?}", plaintext);
|
||||
println!("{:?}", ciphertext.as_slice());
|
||||
println!("{:?}", decrypted.as_slice());
|
||||
println!("{:?}", plaintext);
|
||||
println!("{:?}", ciphertext.as_slice());
|
||||
println!("{:?}", decrypted.as_slice());
|
||||
|
||||
println!("{:?}", plaintext.len());
|
||||
println!("{:?}", ciphertext.len());
|
||||
println!("{:?}", decrypted.len());
|
||||
println!("{:?}", plaintext.len());
|
||||
println!("{:?}", ciphertext.len());
|
||||
println!("{:?}", decrypted.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,10 @@
|
|||
pub mod encryption;
|
||||
pub mod messages;
|
||||
pub mod prelude;
|
||||
pub mod encryption;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
||||
/**
|
||||
* #ClientDetails.
|
||||
* This defines the fileds a client would want to send when connecitng
|
||||
|
|
@ -21,5 +20,3 @@ pub struct ClientDetails {
|
|||
pub address: String,
|
||||
pub public_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -15,5 +15,7 @@ crossbeam = "0.8.0"
|
|||
crossbeam-channel = "0.5.0"
|
||||
zeroize = "1.1.0"
|
||||
openssl = "0.10.33"
|
||||
tokio = { version = "1.9.0", features = ["full"] }
|
||||
futures = "0.3.16"
|
||||
|
||||
foundation = {path = '../foundation'}
|
||||
|
|
@ -1,56 +1,47 @@
|
|||
use crate::messages::ClientMessage;
|
||||
use crate::messages::ServerMessage;
|
||||
use foundation::prelude::IPreemptive;
|
||||
use std::cmp::Ordering;
|
||||
use std::io::BufRead;
|
||||
use std::io::Write;
|
||||
use std::io::{BufReader, BufWriter};
|
||||
use std::mem::replace;
|
||||
use std::net::TcpStream;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Write;
|
||||
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use serde::Serialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
use zeroize::Zeroize;
|
||||
|
||||
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
|
||||
use foundation::prelude::IMessagable;
|
||||
use futures::lock::Mutex;
|
||||
|
||||
use tokio::task;
|
||||
use tokio::io::{ReadHalf, WriteHalf};
|
||||
use tokio::sync::mpsc::{Sender, Receiver, channel};
|
||||
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
|
||||
|
||||
use crate::messages::ClientMessage;
|
||||
use crate::messages::ServerMessage;
|
||||
|
||||
use foundation::ClientDetails;
|
||||
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
|
||||
|
||||
/// # Client
|
||||
/// This struct represents a connected user.
|
||||
///
|
||||
/// ## Attrubutes
|
||||
/// ## Attributes
|
||||
/// - details: store of the clients infomation.
|
||||
///
|
||||
/// - stream: The socket for the connected client.
|
||||
/// - stream_reader: the buffered reader used to receive messages
|
||||
/// - stream_writer: the buffered writer used to send messages
|
||||
/// - owner: An optional reference to the owning object.
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct Client {
|
||||
pub details: ClientDetails,
|
||||
|
||||
// non serializable
|
||||
#[serde(skip)]
|
||||
server_channel: Mutex<Option<Sender<ServerMessage>>>,
|
||||
// server send channel
|
||||
server_channel: Mutex<Sender<ServerMessage>>,
|
||||
|
||||
#[serde(skip)]
|
||||
input: Sender<ClientMessage>,
|
||||
// object channels
|
||||
tx: Sender<ClientMessage>,
|
||||
rx: Mutex<Receiver<ClientMessage>>,
|
||||
|
||||
#[serde(skip)]
|
||||
output: Receiver<ClientMessage>,
|
||||
|
||||
#[serde(skip)]
|
||||
stream: Mutex<Option<TcpStream>>,
|
||||
|
||||
#[serde(skip)]
|
||||
stream_reader: Mutex<Option<BufReader<TcpStream>>>,
|
||||
|
||||
#[serde(skip)]
|
||||
stream_writer: Mutex<Option<BufWriter<TcpStream>>>,
|
||||
stream_rx: Mutex<BufReader<ReadHalf<tokio::net::TcpStream>>>,
|
||||
stream_tx: Mutex<WriteHalf<tokio::net::TcpStream>>,
|
||||
}
|
||||
|
||||
// client funciton implmentations
|
||||
|
|
@ -59,13 +50,11 @@ impl Client {
|
|||
uuid: String,
|
||||
username: String,
|
||||
address: String,
|
||||
stream: TcpStream,
|
||||
stream_rx: BufReader<ReadHalf<tokio::net::TcpStream>>,
|
||||
stream_tx: WriteHalf<tokio::net::TcpStream>,
|
||||
server_channel: Sender<ServerMessage>,
|
||||
) -> Arc<Client> {
|
||||
let (sender, receiver) = unbounded();
|
||||
|
||||
let out_stream = stream.try_clone().unwrap();
|
||||
let in_stream = stream.try_clone().unwrap();
|
||||
let (sender, receiver) = channel(1024);
|
||||
|
||||
Arc::new(Client {
|
||||
details: ClientDetails {
|
||||
|
|
@ -75,172 +64,134 @@ impl Client {
|
|||
public_key: None
|
||||
},
|
||||
|
||||
server_channel: Mutex::new(Some(server_channel)),
|
||||
server_channel: Mutex::new(server_channel),
|
||||
|
||||
input: sender,
|
||||
output: receiver,
|
||||
tx: sender,
|
||||
rx: Mutex::new(receiver),
|
||||
|
||||
stream: Mutex::new(Some(stream)),
|
||||
|
||||
stream_reader: Mutex::new(Some(BufReader::new(in_stream))),
|
||||
stream_writer: Mutex::new(Some(BufWriter::new(out_stream))),
|
||||
stream_rx: Mutex::new(stream_rx),
|
||||
stream_tx: Mutex::new(stream_tx),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client {
|
||||
fn send_message(&self, msg: ClientMessage) {
|
||||
self.input
|
||||
.send(msg)
|
||||
.expect("failed to send message to client.");
|
||||
}
|
||||
fn set_sender(&self, sender: Sender<ServerMessage>) {
|
||||
let mut server_lock = self.server_channel.lock().unwrap();
|
||||
let _ = replace(&mut *server_lock, Some(sender));
|
||||
}
|
||||
}
|
||||
pub fn start(self: &Arc<Client>) {
|
||||
|
||||
// cooperative multitasking implementation
|
||||
impl IPreemptive for Client {
|
||||
fn run(arc: &Arc<Self>) {
|
||||
let arc1 = arc.clone();
|
||||
let arc2 = arc.clone();
|
||||
let t1_client = self.clone();
|
||||
let t2_client = self.clone();
|
||||
|
||||
// read thread
|
||||
let _ = std::thread::Builder::new()
|
||||
.name(format!("client thread recv [{:?}]", &arc.details.uuid))
|
||||
.spawn(move || {
|
||||
use ClientMessage::Disconnect;
|
||||
let arc = arc1;
|
||||
// client stream read task
|
||||
tokio::spawn(async move {
|
||||
|
||||
use ClientMessage::Disconnect;
|
||||
|
||||
let client = t1_client;
|
||||
|
||||
let mut lock = client.stream_tx.lock().await;
|
||||
let mut buffer = String::new();
|
||||
|
||||
// tell client that is is now connected
|
||||
let _ = writeln!(buffer, "{}",
|
||||
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
|
||||
);
|
||||
|
||||
let _ = lock.write_all(&buffer.as_bytes());
|
||||
let _ = lock.flush().await;
|
||||
|
||||
drop(lock);
|
||||
|
||||
loop {
|
||||
let mut stream_reader = client.stream_rx.lock().await;
|
||||
let mut buffer = String::new();
|
||||
let mut reader_lock = arc.stream_reader.lock().unwrap();
|
||||
let reader = reader_lock.as_mut().unwrap();
|
||||
|
||||
'main: while let Ok(size) = reader.read_line(&mut buffer) {
|
||||
if size == 0 {
|
||||
arc.send_message(Disconnect);
|
||||
break 'main;
|
||||
}
|
||||
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
|
||||
|
||||
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
|
||||
println!("[Client {:?}]: recieved {}", arc.details.uuid, &buffer);
|
||||
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
|
||||
|
||||
match command {
|
||||
Ok(ClientStreamIn::Disconnect) => {
|
||||
println!("[Client {:?}]: Disconnect recieved", &arc.details.uuid);
|
||||
arc.send_message(Disconnect);
|
||||
break 'main;
|
||||
println!("[Client {:?}]: Disconnect recieved", &client.details.uuid);
|
||||
client.send_message(Disconnect).await;
|
||||
return;
|
||||
}
|
||||
Ok(ClientStreamIn::SendMessage { to, content }) => {
|
||||
println!("[Client {:?}]: send message to: {:?}", &arc.details.uuid, &to);
|
||||
let lock = arc.server_channel.lock().unwrap();
|
||||
let sender = lock.as_ref().unwrap();
|
||||
let _ = sender.send(ServerMessage::ClientSendMessage {
|
||||
from: arc.details.uuid,
|
||||
println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to);
|
||||
let lock = client.server_channel.lock().await;
|
||||
let _ = lock.send(ServerMessage::ClientSendMessage {
|
||||
from: client.details.uuid,
|
||||
to,
|
||||
content,
|
||||
});
|
||||
}
|
||||
Ok(ClientStreamIn::Update) => {
|
||||
let lock = arc.server_channel.lock().unwrap();
|
||||
let sender = lock.as_ref().unwrap();
|
||||
let _ = sender.send(ServerMessage::ClientUpdate { to: arc.details.uuid });
|
||||
let lock = client.server_channel.lock().await;
|
||||
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid });
|
||||
}
|
||||
_ => println!("[Client {:?}]: command not found", &arc.details.uuid),
|
||||
_ => println!("[Client {:?}]: command not found", &client.details.uuid),
|
||||
}
|
||||
buffer.zeroize();
|
||||
}
|
||||
println!("[Client {:?}] exited thread 1", &arc.details.uuid);
|
||||
});
|
||||
println!("[Client {:?}] exited thread 1", &client.details.uuid);
|
||||
}
|
||||
});
|
||||
|
||||
// write thread
|
||||
let _ = std::thread::Builder::new()
|
||||
.name(format!("client thread msg [{:?}]", &arc.details.uuid))
|
||||
.spawn(move || {
|
||||
let arc = arc2;
|
||||
let mut writer_lock = arc.stream_writer.lock().unwrap();
|
||||
let writer = writer_lock.as_mut().unwrap();
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
// client channel read thread
|
||||
tokio::spawn(async move {
|
||||
use ClientMessage::{Disconnect, Message, SendClients};
|
||||
|
||||
let _ = writeln!(
|
||||
buffer,
|
||||
"{}",
|
||||
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
|
||||
);
|
||||
let _ = writer.write_all(&buffer);
|
||||
let _ = writer.flush();
|
||||
let client = t2_client;
|
||||
|
||||
'main: loop {
|
||||
for message in arc.output.iter() {
|
||||
use ClientMessage::{Disconnect, Message, SendClients};
|
||||
println!("[Client {:?}]: {:?}", &arc.details.uuid, message);
|
||||
match message {
|
||||
Disconnect => {
|
||||
arc.server_channel
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.send(ServerMessage::ClientDisconnected { id: arc.details.uuid })
|
||||
.unwrap();
|
||||
break 'main;
|
||||
}
|
||||
Message { from, content } => {
|
||||
let msg = &ClientStreamOut::UserMessage { from, content };
|
||||
let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap());
|
||||
let _ = writer.write_all(&buffer);
|
||||
let _ = writer.flush();
|
||||
}
|
||||
SendClients { clients } => {
|
||||
let client_details_vec: Vec<ClientDetails> = clients
|
||||
.iter()
|
||||
.map(|client| &client.details)
|
||||
.cloned()
|
||||
.collect();
|
||||
loop {
|
||||
let mut channel = client.rx.lock().await;
|
||||
let mut buffer = String::new();
|
||||
|
||||
let msg = &ClientStreamOut::ConnectedClients {
|
||||
clients: client_details_vec,
|
||||
};
|
||||
let message = channel.recv().await.unwrap();
|
||||
drop(channel);
|
||||
|
||||
let _ = writeln!(buffer, "{}", serde_json::to_string(msg).unwrap());
|
||||
let _ = writer.write_all(&buffer);
|
||||
let _ = writer.flush();
|
||||
}
|
||||
}
|
||||
buffer.zeroize();
|
||||
println!("[Client {:?}]: {:?}", &client.details.uuid, message);
|
||||
match message {
|
||||
Disconnect => {
|
||||
let lock = client.server_channel.lock().await;
|
||||
let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await;
|
||||
return
|
||||
}
|
||||
Message { from, content } => {
|
||||
let msg = ClientStreamOut::UserMessage { from, content };
|
||||
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
|
||||
|
||||
let mut stream = client.stream_tx.lock().await;
|
||||
|
||||
let _ = stream.write_all(&buffer.as_bytes());
|
||||
let _ = stream.flush().await;
|
||||
|
||||
drop(stream);
|
||||
}
|
||||
SendClients { clients } => {
|
||||
let client_details_vec: Vec<ClientDetails> = clients
|
||||
.iter()
|
||||
.map(|client| &client.details)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let msg = ClientStreamOut::ConnectedClients {
|
||||
clients: client_details_vec,
|
||||
};
|
||||
|
||||
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
|
||||
|
||||
let mut stream = client.stream_tx.lock().await;
|
||||
|
||||
|
||||
let _ = stream.write_all(&buffer.as_bytes());
|
||||
let _ = stream.flush().await;
|
||||
}
|
||||
}
|
||||
println!("[Client {:?}]: exited thread 2", &arc.details.uuid);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn start(arc: &Arc<Self>) {
|
||||
Client::run(arc)
|
||||
}
|
||||
}
|
||||
|
||||
// default value implementation
|
||||
impl Default for Client {
|
||||
fn default() -> Self {
|
||||
let (sender, reciever) = unbounded();
|
||||
Client {
|
||||
details: ClientDetails {
|
||||
uuid: Uuid::new_v4(),
|
||||
username: "generic_client".to_string(),
|
||||
address: "127.0.0.1".to_string(),
|
||||
public_key: None
|
||||
},
|
||||
|
||||
output: reciever,
|
||||
input: sender,
|
||||
|
||||
server_channel: Mutex::new(None),
|
||||
|
||||
stream: Mutex::new(None),
|
||||
|
||||
stream_reader: Mutex::new(None),
|
||||
stream_writer: Mutex::new(None),
|
||||
}
|
||||
pub async fn send_message(self: &Arc<Client>, msg: ClientMessage) {
|
||||
let _ = self.tx.send(msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,18 +1,15 @@
|
|||
// use crate::lib::server::ServerMessages;
|
||||
use foundation::prelude::IPreemptive;
|
||||
use std::collections::HashMap;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use uuid::Uuid;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task;
|
||||
use futures::lock::Mutex;
|
||||
|
||||
use crate::client::Client;
|
||||
use crate::messages::ClientMessage;
|
||||
use crate::messages::ClientMgrMessage;
|
||||
use crate::messages::ServerMessage;
|
||||
use foundation::prelude::IMessagable;
|
||||
|
||||
/// # ClientManager
|
||||
/// This struct manages all connected users
|
||||
|
|
@ -22,92 +19,85 @@ pub struct ClientManager {
|
|||
|
||||
server_channel: Mutex<Sender<ServerMessage>>,
|
||||
|
||||
sender: Sender<ClientMgrMessage>,
|
||||
receiver: Receiver<ClientMgrMessage>,
|
||||
tx: Sender<ClientMgrMessage>,
|
||||
rx: Mutex<Receiver<ClientMgrMessage>>,
|
||||
}
|
||||
|
||||
impl ClientManager {
|
||||
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
|
||||
let (sender, receiver) = unbounded();
|
||||
let (tx, rx) = channel(1024);
|
||||
|
||||
Arc::new(ClientManager {
|
||||
clients: Mutex::default(),
|
||||
|
||||
server_channel: Mutex::new(server_channel),
|
||||
|
||||
sender,
|
||||
receiver,
|
||||
tx,
|
||||
rx: Mutex::new(rx),
|
||||
})
|
||||
}
|
||||
|
||||
fn send_to_client(&self, id: &Uuid, msg: ClientMessage) {
|
||||
let lock = self.clients.lock().unwrap();
|
||||
if let Some(client) = lock.get(id) {
|
||||
client.send_message(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn start(self: &Arc<ClientManager>) {
|
||||
|
||||
impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
|
||||
fn send_message(&self, msg: ClientMgrMessage) {
|
||||
self.sender.send(msg).unwrap();
|
||||
}
|
||||
fn set_sender(&self, sender: Sender<ServerMessage>) {
|
||||
let mut server_lock = self.server_channel.lock().unwrap();
|
||||
let _ = replace(&mut *server_lock, sender);
|
||||
}
|
||||
}
|
||||
let client_manager = self.clone();
|
||||
|
||||
impl IPreemptive for ClientManager {
|
||||
fn run(arc: &Arc<Self>) {
|
||||
loop {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
tokio::spawn(async move {
|
||||
|
||||
if !arc.receiver.is_empty() {
|
||||
for message in arc.receiver.try_iter() {
|
||||
println!("[Client manager]: recieved message: {:?}", message);
|
||||
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage};
|
||||
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage};
|
||||
|
||||
match message {
|
||||
Add(client) => {
|
||||
println!("[Client Manager]: adding new client");
|
||||
Client::start(&client);
|
||||
let mut lock = arc.clients.lock().unwrap();
|
||||
if lock.insert(client.details.uuid, client).is_none() {
|
||||
println!("value is new");
|
||||
}
|
||||
loop {
|
||||
let mut receiver = client_manager.rx.lock().await;
|
||||
let message = receiver.recv().await.unwrap();
|
||||
|
||||
println!("[Client manager]: recieved message: {:?}", message);
|
||||
|
||||
match message {
|
||||
Add(client) => {
|
||||
println!("[Client Manager]: adding new client");
|
||||
client.start();
|
||||
let mut lock = client_manager.clients.lock().await;
|
||||
if lock.insert(client.details.uuid, client).is_none() {
|
||||
println!("value is new");
|
||||
}
|
||||
Remove(uuid) => {
|
||||
println!("[Client Manager]: removing client: {:?}", &uuid);
|
||||
if let Some(client) = arc.clients.lock().unwrap().remove(&uuid) {
|
||||
client.send_message(ClientMessage::Disconnect);
|
||||
}
|
||||
}
|
||||
SendMessage { to, from, content } => {
|
||||
arc.send_to_client(&to, ClientMessage::Message { from, content })
|
||||
}
|
||||
SendClients { to } => {
|
||||
let lock = arc.clients.lock().unwrap();
|
||||
if let Some(client) = lock.get(&to) {
|
||||
let clients_vec: Vec<Arc<Client>> =
|
||||
lock.values().cloned().collect();
|
||||
|
||||
client.send_message(ClientMessage::SendClients {
|
||||
clients: clients_vec,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => println!("[Client manager]: not implemented"),
|
||||
}
|
||||
Remove(uuid) => {
|
||||
println!("[Client Manager]: removing client: {:?}", &uuid);
|
||||
if let Some(client) = client_manager.clients.lock().await.remove(&uuid) {
|
||||
client.send_message(ClientMessage::Disconnect).await;
|
||||
}
|
||||
}
|
||||
SendMessage { to, from, content } => {
|
||||
client_manager.send_to_client(&to, ClientMessage::Message { from, content }).await;
|
||||
}
|
||||
SendClients { to } => {
|
||||
let lock = client_manager.clients.lock().await;
|
||||
if let Some(client) = lock.get(&to) {
|
||||
let clients_vec: Vec<Arc<Client>> =
|
||||
lock.values().cloned().collect();
|
||||
|
||||
client.send_message(ClientMessage::SendClients {
|
||||
clients: clients_vec,
|
||||
}).await
|
||||
}
|
||||
}
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => println!("[Client manager]: not implemented"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_to_client(self: &Arc<ClientManager>, id: &Uuid, msg: ClientMessage) {
|
||||
let lock = self.clients.lock().await;
|
||||
if let Some(client) = lock.get(&id) {
|
||||
client.clone().send_message(msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn start(arc: &Arc<Self>) {
|
||||
let arc = arc.clone();
|
||||
std::thread::spawn(move || ClientManager::run(&arc));
|
||||
pub async fn send_message(
|
||||
self: Arc<ClientManager>,
|
||||
message: ClientMgrMessage)
|
||||
{
|
||||
let _ = self.tx.send(message).await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,14 @@ pub mod messages;
|
|||
pub mod network_manager;
|
||||
pub mod server;
|
||||
|
||||
use std::io;
|
||||
|
||||
use clap::{App, Arg};
|
||||
|
||||
use foundation::prelude::IPreemptive;
|
||||
use server::Server;
|
||||
|
||||
fn main() {
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
let _args = App::new("--rust chat server--")
|
||||
.version("0.1.5")
|
||||
.author("Mitchel Hardie <mitch161>, Michael Bailey <michael-bailey>")
|
||||
|
|
@ -26,7 +28,8 @@ fn main() {
|
|||
)
|
||||
.get_matches();
|
||||
|
||||
let server = Server::new();
|
||||
let server = Server::new().unwrap();
|
||||
|
||||
Server::run(&server);
|
||||
server.start().await;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,128 +1,104 @@
|
|||
use foundation::prelude::IPreemptive;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::io::BufWriter;
|
||||
use std::io::Write;
|
||||
use std::net::TcpListener;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::io::Write;
|
||||
|
||||
use crossbeam_channel::Sender;
|
||||
use tokio::task;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
|
||||
use crate::client::Client;
|
||||
use crate::messages::ServerMessage;
|
||||
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
|
||||
|
||||
pub struct NetworkManager {
|
||||
listener: TcpListener,
|
||||
address: String,
|
||||
server_channel: Sender<ServerMessage>,
|
||||
}
|
||||
|
||||
impl NetworkManager {
|
||||
pub fn new(port: String, server_channel: Sender<ServerMessage>) -> Arc<NetworkManager> {
|
||||
let mut address = "0.0.0.0:".to_string();
|
||||
address.push_str(&port);
|
||||
|
||||
let listener = TcpListener::bind(address).expect("Could not bind to address");
|
||||
|
||||
pub fn new(_port: String, server_channel: Sender<ServerMessage>) -> Arc<NetworkManager> {
|
||||
Arc::new(NetworkManager {
|
||||
listener,
|
||||
address: "0.0.0.0:5600".to_string(),
|
||||
server_channel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IPreemptive for NetworkManager {
|
||||
fn run(_: &Arc<Self>) {}
|
||||
pub fn start(self: &Arc<NetworkManager>) {
|
||||
|
||||
fn start(arc: &Arc<Self>) {
|
||||
let arc = arc.clone();
|
||||
std::thread::spawn(move || {
|
||||
// fetch new connections and add them to the client queue
|
||||
for connection in arc.listener.incoming() {
|
||||
println!("[NetworkManager]: New Connection!");
|
||||
match connection {
|
||||
Ok(stream) => {
|
||||
let server_channel = arc.server_channel.clone();
|
||||
let network_manager = self.clone();
|
||||
|
||||
// create readers
|
||||
let mut reader = BufReader::new(stream.try_clone().unwrap());
|
||||
let mut writer = BufWriter::new(stream.try_clone().unwrap());
|
||||
tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(network_manager.address.clone()).await.unwrap();
|
||||
|
||||
let _handle = thread::Builder::new()
|
||||
.name("NetworkJoinThread".to_string())
|
||||
.spawn(move || {
|
||||
let mut out_buffer: Vec<u8> = Vec::new();
|
||||
let mut in_buffer: String = String::new();
|
||||
loop {
|
||||
let (connection, _) = listener.accept().await.unwrap();
|
||||
let (rd, mut wd) = io::split(connection);
|
||||
|
||||
let mut reader = BufReader::new(rd);
|
||||
let server_channel = network_manager.server_channel.clone();
|
||||
|
||||
// send request message to connection
|
||||
task::spawn(async move {
|
||||
let mut out_buffer: Vec<u8> = Vec::new();
|
||||
let mut in_buffer: String = String::new();
|
||||
|
||||
let _ = writeln!(
|
||||
out_buffer,
|
||||
"{}",
|
||||
serde_json::to_string(&NetworkSockOut::Request).unwrap()
|
||||
// write request
|
||||
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
|
||||
println!("{:?}", &a);
|
||||
let _ = writeln!(
|
||||
out_buffer,
|
||||
"{}",
|
||||
a
|
||||
);
|
||||
|
||||
let _ = wd.write_all(&out_buffer).await;
|
||||
let _ = wd.flush().await;
|
||||
|
||||
// get response
|
||||
let _ = reader.read_line(&mut in_buffer).await.unwrap();
|
||||
|
||||
//match the response
|
||||
if let Ok(request) =
|
||||
serde_json::from_str::<NetworkSockIn>(&in_buffer)
|
||||
{
|
||||
match request {
|
||||
NetworkSockIn::Info => {
|
||||
// send back server info to the connection
|
||||
let _ = wd.write_all(
|
||||
serde_json::to_string(
|
||||
&NetworkSockOut::GotInfo {
|
||||
server_name: "oof",
|
||||
server_owner: "michael",
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
).await;
|
||||
let _ = wd.write_all(b"\n").await;
|
||||
let _ = wd.flush().await;
|
||||
}
|
||||
NetworkSockIn::Connect {
|
||||
uuid,
|
||||
username,
|
||||
address,
|
||||
} => {
|
||||
// create client and send to server
|
||||
let new_client = Client::new(
|
||||
uuid,
|
||||
username,
|
||||
address,
|
||||
reader,
|
||||
wd,
|
||||
server_channel.clone(),
|
||||
);
|
||||
|
||||
let _ = writer.write_all(&out_buffer);
|
||||
let _ = writer.flush();
|
||||
|
||||
// try get response
|
||||
let res = reader.read_line(&mut in_buffer);
|
||||
if res.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
//match the response
|
||||
if let Ok(request) =
|
||||
serde_json::from_str::<NetworkSockIn>(&in_buffer)
|
||||
{
|
||||
match request {
|
||||
NetworkSockIn::Info => {
|
||||
// send back server info to the connection
|
||||
writer
|
||||
.write_all(
|
||||
serde_json::to_string(
|
||||
&NetworkSockOut::GotInfo {
|
||||
server_name: "oof",
|
||||
server_owner: "michael",
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
)
|
||||
.unwrap();
|
||||
writer.write_all(b"\n").unwrap();
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
NetworkSockIn::Connect {
|
||||
uuid,
|
||||
username,
|
||||
address,
|
||||
} => {
|
||||
// create client and send to server
|
||||
let new_client = Client::new(
|
||||
uuid,
|
||||
username,
|
||||
address,
|
||||
stream.try_clone().unwrap(),
|
||||
server_channel.clone(),
|
||||
);
|
||||
server_channel
|
||||
.send(ServerMessage::ClientConnected {
|
||||
client: new_client,
|
||||
})
|
||||
.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let _ = server_channel
|
||||
.send(ServerMessage::ClientConnected {
|
||||
client: new_client,
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("[Network manager]: error getting stream: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +1,15 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use crossbeam_channel::{unbounded, Receiver};
|
||||
// use crossbeam_channel::{unbounded, Receiver};
|
||||
use uuid::Uuid;
|
||||
use tokio::task;
|
||||
use tokio::sync::mpsc::{channel, Receiver};
|
||||
use futures::lock::Mutex;
|
||||
|
||||
use crate::client_manager::ClientManager;
|
||||
use crate::messages::ClientMgrMessage;
|
||||
use crate::messages::ServerMessage;
|
||||
use crate::network_manager::NetworkManager;
|
||||
use foundation::prelude::ICooperative;
|
||||
use foundation::prelude::IMessagable;
|
||||
use foundation::prelude::IPreemptive;
|
||||
|
||||
/// # ServerMessages
|
||||
/// This is used internally to send messages to the server to be dispatched
|
||||
|
|
@ -19,67 +19,68 @@ pub enum ServerMessages<TClient> {
|
|||
ClientDisconnected(Uuid),
|
||||
}
|
||||
|
||||
/// # Server
|
||||
/// authors: @michael-bailey, @Mitch161
|
||||
/// This Represents a server instance.
|
||||
/// it is componsed of a client manager and a network manager
|
||||
///
|
||||
pub struct Server {
|
||||
client_manager: Arc<ClientManager>,
|
||||
network_manager: Arc<NetworkManager>,
|
||||
|
||||
receiver: Receiver<ServerMessage>,
|
||||
receiver: Mutex<Receiver<ServerMessage>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new() -> Arc<Server> {
|
||||
let (sender, receiver) = unbounded();
|
||||
/// Create a new server object
|
||||
pub fn new() -> Result<Arc<Server>, Box<dyn std::error::Error>> {
|
||||
let (sender, receiver) = channel(1024);
|
||||
|
||||
Arc::new(Server {
|
||||
client_manager: ClientManager::new(sender.clone()),
|
||||
|
||||
network_manager: NetworkManager::new("5600".to_string(), sender),
|
||||
receiver,
|
||||
})
|
||||
Ok(
|
||||
Arc::new(
|
||||
Server {
|
||||
client_manager: ClientManager::new(sender.clone()),
|
||||
network_manager: NetworkManager::new("5600".to_string(), sender),
|
||||
receiver: Mutex::new(receiver),
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ICooperative for Server {
|
||||
fn tick(&self) {
|
||||
pub async fn start(self: &Arc<Server>) {
|
||||
|
||||
// start client manager and network manager
|
||||
self.network_manager.clone().start();
|
||||
self.client_manager.clone().start();
|
||||
|
||||
// clone block items
|
||||
let server = self.clone();
|
||||
|
||||
|
||||
use ClientMgrMessage::{Add, Remove, SendMessage};
|
||||
|
||||
// handle new messages loop
|
||||
if !self.receiver.is_empty() {
|
||||
for message in self.receiver.try_iter() {
|
||||
loop {
|
||||
let mut lock = server.receiver.lock().await;
|
||||
if let Some(message) = lock.recv().await {
|
||||
println!("[server]: received message {:?}", &message);
|
||||
|
||||
match message {
|
||||
ServerMessage::ClientConnected { client } => {
|
||||
self.client_manager.send_message(Add(client))
|
||||
server.client_manager.clone()
|
||||
.send_message(Add(client)).await
|
||||
}
|
||||
ServerMessage::ClientDisconnected { id } => {
|
||||
println!("disconnecting client {:?}", id);
|
||||
self.client_manager.send_message(Remove(id));
|
||||
server.client_manager.clone().send_message(Remove(id)).await;
|
||||
}
|
||||
ServerMessage::ClientSendMessage { from, to, content } => self
|
||||
.client_manager
|
||||
.send_message(SendMessage { from, to, content }),
|
||||
ServerMessage::ClientUpdate { to } => self
|
||||
.client_manager
|
||||
.send_message(ClientMgrMessage::SendClients { to }),
|
||||
ServerMessage::ClientSendMessage { from, to, content } => server
|
||||
.client_manager.clone()
|
||||
.send_message(SendMessage { from, to, content }).await,
|
||||
ServerMessage::ClientUpdate { to } => server
|
||||
.client_manager.clone()
|
||||
.send_message(ClientMgrMessage::SendClients { to }).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IPreemptive for Server {
|
||||
fn run(arc: &std::sync::Arc<Self>) {
|
||||
// start services
|
||||
NetworkManager::start(&arc.network_manager);
|
||||
ClientManager::start(&arc.client_manager);
|
||||
loop {
|
||||
arc.tick();
|
||||
}
|
||||
}
|
||||
|
||||
fn start(arc: &std::sync::Arc<Self>) {
|
||||
let arc = arc.clone();
|
||||
// start thread
|
||||
std::thread::spawn(move || Server::run(&arc));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue