Created global message support

This commit is contained in:
michael-bailey 2022-02-01 20:51:04 +00:00
parent 14495e1b27
commit d320f345c8
10 changed files with 281 additions and 118 deletions

View File

@ -12,14 +12,16 @@ mod test {
let key = sha256(b"This is a key");
let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let encrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let mut ciphertext = vec![0u8; 1024];
let cipherlen = encrypter
.unwrap()
.update(plaintext, ciphertext.as_mut_slice())
.unwrap();
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let decrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let mut decrypted = vec![0u8; 1024];
decrypter
.unwrap()

View File

@ -25,7 +25,7 @@ pub enum ClientStreamOut {
Connected,
UserMessage { from: Uuid, content: String },
GlobalMessage { content: String },
GlobalMessage { from: Uuid, content: String },
ConnectedClients { clients: Vec<ClientDetails> },

View File

@ -1,2 +1,2 @@
hard_tabs = true
max_width = 100
max_width = 90

View File

@ -0,0 +1,77 @@
use std::ops::Index;
use crate::client::Client;
use crate::messages::ServerMessage;
use std::sync::{Arc, Weak};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Message {
content: String,
sender: Weak<Client>,
}
impl Message {
pub fn new(content: String, sender: Weak<Client>) -> Message {
Message { content, sender }
}
}
enum ChatManagerMessage {
AddMessage {sender: Weak<Client>, content: String}
}
pub struct ChatManager {
messages: Mutex<Vec<Message>>,
server_channel: Sender<ServerMessage>,
tx: Sender<ChatManagerMessage>,
rx: Mutex<Receiver<ChatManagerMessage>>,
}
impl ChatManager {
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (tx, rx) = channel::<ChatManagerMessage>(1024);
let manager = Arc::new(ChatManager {
messages: Mutex::new(Vec::new()),
server_channel,
tx,
rx: Mutex::new(rx),
});
manager.start();
manager
}
fn start(self: &Arc<ChatManager>) {
let manager = self.clone();
tokio::spawn(async move {
use ServerMessage::{BroadcastGlobalMessage};
use ChatManagerMessage::{AddMessage};
while let message = manager.rx.lock().await.recv().await {
match message {
Some(AddMessage { content,sender }) => {
let sender = &sender.upgrade().unwrap().details.uuid;
manager.server_channel.send(
BroadcastGlobalMessage {sender: sender.clone(), content}
).await.unwrap();
}
None => {
println!("None found in message broadcast some how");
}
}
} });
}
pub async fn add_message(self: &Arc<Self>, sender: Weak<Client>, content: String) {
let mut a = self.messages.lock().await;
a.push(Message::new(content, sender))
}
pub async fn get_all_messages(self: &Arc<Self>) -> Vec<Message> {
self.messages.lock().await.clone()
}
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use std::cmp::Ordering;
use std::fmt::Write;
use std::sync::Arc;
use uuid::Uuid;
@ -8,15 +8,15 @@ use zeroize::Zeroize;
use futures::lock::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{ReadHalf, WriteHalf};
use tokio::sync::mpsc::{Sender, Receiver, channel};
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use foundation::ClientDetails;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::ClientDetails;
/// # Client
/// This struct represents a connected user.
@ -60,7 +60,7 @@ impl Client {
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
username,
address,
public_key: None
public_key: None,
},
server_channel: Mutex::new(server_channel),
@ -74,13 +74,11 @@ impl Client {
}
pub fn start(self: &Arc<Client>) {
let t1_client = self.clone();
let t2_client = self.clone();
// client stream read task
tokio::spawn(async move {
use ClientMessage::Disconnect;
let client = t1_client;
@ -89,7 +87,9 @@ impl Client {
let mut buffer = String::new();
// tell client that is is now connected
let _ = writeln!(buffer, "{}",
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
@ -103,34 +103,65 @@ impl Client {
let mut buffer = String::new();
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
match command {
Ok(ClientStreamIn::Disconnect) => {
println!("[Client {:?}]: Disconnect recieved", &client.details.uuid);
println!(
"[Client {:?}]: Disconnect recieved",
&client.details.uuid
);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to);
println!(
"[Client {:?}]: send message to: {:?}",
&client.details.uuid, &to
);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
}).await;
let _ = lock
.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
})
.await;
}
Ok(ClientStreamIn::Update) => {
println!("[Client {:?}]: update received", &client.details.uuid);
println!(
"[Client {:?}]: update received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await;
let _ = lock
.send(ServerMessage::ClientUpdate {
to: client.details.uuid,
})
.await;
}
Ok(ClientStreamIn::SendGlobalMessage {content}) => {
println!(
"[Client {:?}]: send global message received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::BroadcastGlobalMessage { content, sender: *&client.details.uuid.clone() })
.await;
}
_ => {
println!("[Client {:?}]: command not found", &client.details.uuid);
println!(
"[Client {:?}]: command not found",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await;
let _ = lock
.send(ServerMessage::ClientError {
to: client.details.uuid,
})
.await;
}
}
buffer.zeroize();
@ -140,7 +171,7 @@ impl Client {
// client channel read thread
tokio::spawn(async move {
use ClientMessage::{Disconnect, Message, SendClients, Error};
use ClientMessage::{Disconnect, Error, Message, SendClients};
let client = t2_client;
@ -155,12 +186,17 @@ impl Client {
match message {
Disconnect => {
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await;
return
let _ = lock
.send(ServerMessage::ClientDisconnected {
id: client.details.uuid,
})
.await;
return;
}
Message { from, content } => {
let msg = ClientStreamOut::UserMessage { from, content };
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
@ -180,24 +216,41 @@ impl Client {
clients: client_details_vec,
};
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
},
Error => {
let _ = writeln!(buffer, "{}", serde_json::to_string(&ClientStreamOut::Error).unwrap());
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
Error => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Error).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
ClientMessage::GlobalBroadcastMessage { from,content } => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::GlobalMessage {from, content}).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
}
}
});
});
}
pub async fn send_message(self: &Arc<Client>, msg: ClientMessage) {

View File

@ -1,9 +1,12 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use futures::future::join_all;
use uuid::Uuid;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use futures::lock::Mutex;
use tokio::join;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use uuid::Uuid;
use crate::client::Client;
use crate::messages::ClientMessage;
@ -37,19 +40,17 @@ impl ClientManager {
}
pub fn start(self: &Arc<ClientManager>) {
let client_manager = self.clone();
tokio::spawn(async move {
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage, SendError};
use ClientMgrMessage::{Add, Remove, SendClients, SendError, SendMessage};
loop {
let mut receiver = client_manager.rx.lock().await;
let message = receiver.recv().await.unwrap();
println!("[Client manager]: recieved message: {:?}", message);
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
@ -61,12 +62,16 @@ impl ClientManager {
}
Remove(uuid) => {
println!("[Client Manager]: removing client: {:?}", &uuid);
if let Some(client) = client_manager.clients.lock().await.remove(&uuid) {
if let Some(client) =
client_manager.clients.lock().await.remove(&uuid)
{
client.send_message(ClientMessage::Disconnect).await;
}
}
SendMessage { to, from, content } => {
client_manager.send_to_client(&to, ClientMessage::Message { from, content }).await;
client_manager
.send_to_client(&to, ClientMessage::Message { from, content })
.await;
}
SendClients { to } => {
let lock = client_manager.clients.lock().await;
@ -74,17 +79,29 @@ impl ClientManager {
let clients_vec: Vec<Arc<Client>> =
lock.values().cloned().collect();
client.send_message(ClientMessage::SendClients {
clients: clients_vec,
}).await
client
.send_message(ClientMessage::SendClients {
clients: clients_vec,
})
.await
}
},
}
ClientMgrMessage::BroadcastGlobalMessage {sender, content} => {
use futures::stream::TryStreamExt;
let lock = client_manager.clients.lock().await;
let futures = lock.iter()
.map(|i| i.1.send_message(
ClientMessage::GlobalBroadcastMessage {from: sender, content: content.clone()}
));
join_all(futures).await;
}
SendError { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
client.send_message(ClientMessage::Error).await
}
},
}
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}
@ -99,10 +116,7 @@ impl ClientManager {
}
}
pub async fn send_message(
self: Arc<ClientManager>,
message: ClientMgrMessage)
{
pub async fn send_message(self: Arc<ClientManager>, message: ClientMgrMessage) {
let _ = self.tx.send(message).await;
}
}

View File

@ -1,3 +1,4 @@
pub mod chat_manager;
pub mod client;
pub mod client_manager;
pub mod messages;
@ -31,5 +32,5 @@ async fn main() -> io::Result<()> {
let server = Server::new().unwrap();
server.start().await;
Ok(())
Ok(())
}

View File

@ -1,11 +1,13 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use uuid::Uuid;
use crate::chat_manager::Message;
use crate::client::Client;
#[derive(Debug)]
pub enum ClientMessage {
Message { from: Uuid, content: String },
GlobalBroadcastMessage {from: Uuid, content:String},
SendClients { clients: Vec<Arc<Client>> },
@ -26,9 +28,10 @@ pub enum ClientMgrMessage {
to: Uuid,
content: String,
},
BroadcastGlobalMessage {sender: Uuid, content: String},
SendError {
to: Uuid,
}
},
}
#[derive(Debug)]
@ -48,6 +51,8 @@ pub enum ServerMessage {
to: Uuid,
},
ClientError {
to: Uuid
}
to: Uuid,
},
BroadcastGlobalMessage {sender: Uuid, content: String}
}

View File

@ -1,10 +1,10 @@
use std::sync::Arc;
use std::io::Write;
use std::sync::Arc;
use tokio::task;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::task;
use crate::client::Client;
use crate::messages::ServerMessage;
@ -16,7 +16,10 @@ pub struct NetworkManager {
}
impl NetworkManager {
pub fn new(_port: String, server_channel: Sender<ServerMessage>) -> Arc<NetworkManager> {
pub fn new(
_port: String,
server_channel: Sender<ServerMessage>,
) -> Arc<NetworkManager> {
Arc::new(NetworkManager {
address: "0.0.0.0:5600".to_string(),
server_channel,
@ -24,16 +27,17 @@ impl NetworkManager {
}
pub fn start(self: &Arc<NetworkManager>) {
let network_manager = self.clone();
tokio::spawn(async move {
let listener = TcpListener::bind(network_manager.address.clone()).await.unwrap();
let listener = TcpListener::bind(network_manager.address.clone())
.await
.unwrap();
loop {
let (connection, _) = listener.accept().await.unwrap();
let (rd, mut wd) = io::split(connection);
let mut reader = BufReader::new(rd);
let server_channel = network_manager.server_channel.clone();
@ -44,11 +48,7 @@ impl NetworkManager {
// write request
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
println!("{:?}", &a);
let _ = writeln!(
out_buffer,
"{}",
a
);
let _ = writeln!(out_buffer, "{}", a);
let _ = wd.write_all(&out_buffer).await;
let _ = wd.flush().await;
@ -57,22 +57,21 @@ impl NetworkManager {
let _ = reader.read_line(&mut in_buffer).await.unwrap();
//match the response
if let Ok(request) =
serde_json::from_str::<NetworkSockIn>(&in_buffer)
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
let _ = wd.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
let _ = wd
.write_all(
serde_json::to_string(&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
},
})
.unwrap()
.as_bytes(),
)
.unwrap()
.as_bytes(),
).await;
.await;
let _ = wd.write_all(b"\n").await;
let _ = wd.flush().await;
}
@ -93,12 +92,13 @@ impl NetworkManager {
let _ = server_channel
.send(ServerMessage::ClientConnected {
client: new_client,
}).await;
})
.await;
}
}
}
});
}
});
});
}
}

View File

@ -1,28 +1,20 @@
use std::sync::Arc;
// use crossbeam_channel::{unbounded, Receiver};
use uuid::Uuid;
use tokio::sync::mpsc::{channel, Receiver};
use futures::lock::Mutex;
use tokio::sync::mpsc::{channel, Receiver};
use uuid::Uuid;
use crate::client_manager::ClientManager;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
/// # ServerMessages
/// This is used internally to send messages to the server to be dispatched
#[derive(Debug)]
pub enum ServerMessages<TClient> {
ClientConnected(Arc<TClient>),
ClientDisconnected(Uuid),
}
/// # Server
/// authors: @michael-bailey, @Mitch161
/// This Represents a server instance.
/// it is componsed of a client manager and a network manager
///
///
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
@ -32,21 +24,19 @@ pub struct Server {
impl Server {
/// Create a new server object
pub fn new() -> Result<Arc<Server>, Box<dyn std::error::Error>> {
let (sender, receiver) = channel(1024);
let (
sender,
receiver
) = channel(1024);
Ok(
Arc::new(
Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
receiver: Mutex::new(receiver),
}
)
)
Ok(Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
receiver: Mutex::new(receiver),
}))
}
pub async fn start(self: &Arc<Server>) {
// start client manager and network manager
self.network_manager.clone().start();
self.client_manager.clone().start();
@ -54,7 +44,6 @@ impl Server {
// clone block items
let server = self.clone();
use ClientMgrMessage::{Add, Remove, SendMessage};
loop {
@ -64,25 +53,47 @@ impl Server {
match message {
ServerMessage::ClientConnected { client } => {
server.client_manager.clone()
.send_message(Add(client)).await
server
.client_manager
.clone()
.send_message(Add(client))
.await
}
ServerMessage::ClientDisconnected { id } => {
println!("disconnecting client {:?}", id);
server.client_manager.clone().send_message(Remove(id)).await;
}
ServerMessage::ClientSendMessage { from, to, content } => server
.client_manager.clone()
.send_message(SendMessage { from, to, content }).await,
ServerMessage::ClientUpdate { to } => server
.client_manager.clone()
.send_message(ClientMgrMessage::SendClients { to }).await,
ServerMessage::ClientError { to } => server
.client_manager.clone()
.send_message(ClientMgrMessage::SendError {to}).await,
ServerMessage::ClientSendMessage { from, to, content } => {
server
.client_manager
.clone()
.send_message(SendMessage { from, to, content })
.await
}
ServerMessage::ClientUpdate { to } => {
server
.client_manager
.clone()
.send_message(ClientMgrMessage::SendClients { to })
.await
}
ServerMessage::ClientError { to } => {
server
.client_manager
.clone()
.send_message(ClientMgrMessage::SendError { to })
.await
}
ServerMessage::BroadcastGlobalMessage {sender,content} => {
server
.client_manager
.clone()
.send_message(
ClientMgrMessage::BroadcastGlobalMessage {sender, content}
).await
}
}
}
}
}
}