Feature/client encryption #18

Closed
michael-bailey wants to merge 28 commits from feature/client_encryption into master
3 changed files with 65 additions and 137 deletions
Showing only changes of commit e6d087b4d8 - Show all commits

View File

@ -1,19 +1,14 @@
use std::sync::Arc;
use std::cmp::Ordering;
use std::fmt::Write;
use uuid::Uuid;
use zeroize::Zeroize;
use futures::lock::Mutex;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::sync::mpsc::{Sender, Receiver, channel};
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
use crate::network::SocketSender;
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use crate::prelude::StreamMessageSender;
use foundation::ClientDetails;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
@ -39,8 +34,7 @@ pub struct Client {
tx: Sender<ClientMessage>,
rx: Mutex<Receiver<ClientMessage>>,
stream_rx: Mutex<BufReader<ReadHalf<tokio::net::TcpStream>>>,
stream_tx: Mutex<WriteHalf<tokio::net::TcpStream>>,
socket_sender: Arc<SocketSender>,
}
// client funciton implmentations
@ -49,8 +43,7 @@ impl Client {
uuid: String,
username: String,
address: String,
stream_rx: BufReader<ReadHalf<tokio::net::TcpStream>>,
stream_tx: WriteHalf<tokio::net::TcpStream>,
socket_sender: Arc<SocketSender>,
server_channel: Sender<ServerMessage>,
) -> Arc<Client> {
let (sender, receiver) = channel(1024);
@ -64,12 +57,11 @@ impl Client {
},
server_channel: Mutex::new(server_channel),
socket_sender,
tx: sender,
rx: Mutex::new(receiver),
stream_rx: Mutex::new(stream_rx),
stream_tx: Mutex::new(stream_tx),
})
}
@ -80,60 +72,39 @@ impl Client {
// client stream read task
tokio::spawn(async move {
use ClientMessage::Disconnect;
let client = t1_client;
let mut lock = client.stream_tx.lock().await;
let mut buffer = String::new();
// tell client that is is now connected
let _ = writeln!(buffer, "{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = lock.write_all(&buffer.as_bytes());
let _ = lock.flush().await;
drop(lock);
client.socket_sender.send::<ClientStreamOut>(ClientStreamOut::Connected).await.expect("error");
loop {
let mut stream_reader = client.stream_rx.lock().await;
let mut buffer = String::new();
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
match command {
Ok(ClientStreamIn::Disconnect) => {
println!("[Client {:?}]: Disconnect recieved", &client.details.uuid);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
}).await;
}
Ok(ClientStreamIn::Update) => {
println!("[Client {:?}]: update received", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await;
}
_ => {
println!("[Client {:?}]: command not found", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await;
}
let command = client.socket_sender.recv::<ClientStreamIn>().await;
match command {
Ok(ClientStreamIn::Disconnect) => {
println!("[Client {:?}]: Disconnect recieved", &client.details.uuid);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!("[Client {:?}]: send message to: {:?}", &client.details.uuid, &to);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
}).await;
}
Ok(ClientStreamIn::Update) => {
println!("[Client {:?}]: update received", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await;
}
_ => {
println!("[Client {:?}]: command not found", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await;
}
buffer.zeroize();
}
}
});
@ -146,7 +117,6 @@ impl Client {
loop {
let mut channel = client.rx.lock().await;
let mut buffer = String::new();
let message = channel.recv().await.unwrap();
drop(channel);
@ -158,43 +128,26 @@ impl Client {
let _ = lock.send(ServerMessage::ClientDisconnected { id: client.details.uuid }).await;
return
}
Message { from, content } => {
let msg = ClientStreamOut::UserMessage { from, content };
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
drop(stream);
}
Message { from, content } =>
client.socket_sender.send::<ClientStreamOut>(
ClientStreamOut::UserMessage { from, content }
).await.expect("error sending message"),
SendClients { clients } => {
let client_details_vec: Vec<ClientDetails> = clients
.iter()
.map(|client| &client.details)
.cloned()
.collect();
let client_details_vec: Vec<ClientDetails> =
clients.iter().map(|client| &client.details)
.cloned().collect();
let msg = ClientStreamOut::ConnectedClients {
clients: client_details_vec,
};
let _ = writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
client.socket_sender.send::<ClientStreamOut>(
ClientStreamOut::ConnectedClients {
clients: client_details_vec,
}
).await.expect("error sending message");
},
Error => {
let _ = writeln!(buffer, "{}", serde_json::to_string(&ClientStreamOut::Error).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
Error =>
client.socket_sender.send::<ClientStreamOut>(
ClientStreamOut::Error
).await.expect("error sending message"),
}
}
});

View File

@ -94,7 +94,7 @@ impl ClientManager {
async fn send_to_client(self: &Arc<ClientManager>, id: &Uuid, msg: ClientMessage) {
let lock = self.clients.lock().await;
if let Some(client) = lock.get(&id) {
if let Some(client) = lock.get(id) {
client.clone().send_message(msg).await;
}
}

View File

@ -1,13 +1,12 @@
use std::sync::Arc;
use std::io::Write;
use tokio::task;
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::client::Client;
use crate::network::SocketSender;
use crate::messages::ServerMessage;
use crate::prelude::StreamMessageSender;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
pub struct NetworkManager {
@ -32,49 +31,26 @@ impl NetworkManager {
loop {
let (connection, _) = listener.accept().await.unwrap();
let (rd, mut wd) = io::split(connection);
let mut reader = BufReader::new(rd);
let stream_sender = SocketSender::new(connection);
let server_channel = network_manager.server_channel.clone();
task::spawn(async move {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
tokio::spawn(async move {
// write request
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
println!("{:?}", &a);
let _ = writeln!(
out_buffer,
"{}",
a
);
stream_sender.send::<NetworkSockOut>(NetworkSockOut::Request)
.await.expect("failed to send message");
let _ = wd.write_all(&out_buffer).await;
let _ = wd.flush().await;
// get response
let _ = reader.read_line(&mut in_buffer).await.unwrap();
//match the response
if let Ok(request) =
serde_json::from_str::<NetworkSockIn>(&in_buffer)
if let Ok(request) =
stream_sender.recv::<NetworkSockIn>().await
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
let _ = wd.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
},
)
.unwrap()
.as_bytes(),
).await;
let _ = wd.write_all(b"\n").await;
let _ = wd.flush().await;
stream_sender.send(
NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
}
).await.expect("failed to send got info");
}
NetworkSockIn::Connect {
uuid,
@ -86,8 +62,7 @@ impl NetworkManager {
uuid,
username,
address,
reader,
wd,
stream_sender,
server_channel.clone(),
);
let _ = server_channel