Moved threads to tokio async

This commit is contained in:
michael-bailey 2021-07-30 11:50:08 +01:00 committed by Mitch161
parent 2f8677710a
commit 14495e1b27
5 changed files with 39 additions and 12 deletions

View File

@ -30,4 +30,6 @@ pub enum ClientStreamOut {
ConnectedClients { clients: Vec<ClientDetails> },
Disconnected,
Error,
}

View File

@ -8,7 +8,6 @@ use zeroize::Zeroize;
use futures::lock::Mutex;
use tokio::task;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::sync::mpsc::{Sender, Receiver, channel};
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
@ -121,23 +120,27 @@ impl Client {
from: client.details.uuid,
to,
content,
});
}).await;
}
Ok(ClientStreamIn::Update) => {
println!("[Client {:?}]: update received", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid });
let _ = lock.send(ServerMessage::ClientUpdate { to: client.details.uuid }).await;
}
_ => {
println!("[Client {:?}]: command not found", &client.details.uuid);
let lock = client.server_channel.lock().await;
let _ = lock.send(ServerMessage::ClientError { to: client.details.uuid }).await;
}
_ => println!("[Client {:?}]: command not found", &client.details.uuid),
}
buffer.zeroize();
}
println!("[Client {:?}] exited thread 1", &client.details.uuid);
}
});
// client channel read thread
tokio::spawn(async move {
use ClientMessage::{Disconnect, Message, SendClients};
use ClientMessage::{Disconnect, Message, SendClients, Error};
let client = t2_client;
@ -161,7 +164,7 @@ impl Client {
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes());
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
drop(stream);
@ -181,8 +184,15 @@ impl Client {
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
},
Error => {
let _ = writeln!(buffer, "{}", serde_json::to_string(&ClientStreamOut::Error).unwrap());
let _ = stream.write_all(&buffer.as_bytes());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
}

View File

@ -3,7 +3,6 @@ use std::sync::Arc;
use uuid::Uuid;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use futures::lock::Mutex;
use crate::client::Client;
@ -43,7 +42,7 @@ impl ClientManager {
tokio::spawn(async move {
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage};
use ClientMgrMessage::{Add, Remove, SendClients, SendMessage, SendError};
loop {
let mut receiver = client_manager.rx.lock().await;
@ -79,7 +78,13 @@ impl ClientManager {
clients: clients_vec,
}).await
}
}
},
SendError { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
client.send_message(ClientMessage::Error).await
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}

View File

@ -10,6 +10,8 @@ pub enum ClientMessage {
SendClients { clients: Vec<Arc<Client>> },
Disconnect,
Error,
}
#[derive(Debug)]
@ -24,6 +26,9 @@ pub enum ClientMgrMessage {
to: Uuid,
content: String,
},
SendError {
to: Uuid,
}
}
#[derive(Debug)]
@ -42,4 +47,7 @@ pub enum ServerMessage {
ClientUpdate {
to: Uuid,
},
ClientError {
to: Uuid
}
}

View File

@ -2,7 +2,6 @@ use std::sync::Arc;
// use crossbeam_channel::{unbounded, Receiver};
use uuid::Uuid;
use tokio::task;
use tokio::sync::mpsc::{channel, Receiver};
use futures::lock::Mutex;
@ -78,6 +77,9 @@ impl Server {
ServerMessage::ClientUpdate { to } => server
.client_manager.clone()
.send_message(ClientMgrMessage::SendClients { to }).await,
ServerMessage::ClientError { to } => server
.client_manager.clone()
.send_message(ClientMgrMessage::SendError {to}).await,
}
}
}