merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
5 changed files with 37 additions and 69 deletions
Showing only changes of commit 8f0b502487 - Show all commits

View File

@ -1,7 +1,6 @@
use std::cmp::Ordering;
use std::io::Error;
use std::sync::Arc;
use futures::executor::block_on;
use serde::{Deserialize, Serialize};
@ -9,7 +8,7 @@ use uuid::Uuid;
use async_trait::async_trait;
use tokio::{select, task};
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{Mutex};
@ -50,8 +49,11 @@ pub struct Client<Out: 'static>
// server send channel
out_channel: Sender<Out>,
// todo: - remove these
// object channels
#[allow(dead_code)]
tx: Sender<ClientMessage>,
#[allow(dead_code)]
rx: Mutex<Receiver<ClientMessage>>,
connection: Arc<Connection>,
@ -93,7 +95,7 @@ impl<Out> Client<Out>
"[Client {:?}]: Disconnect received",
self.details.uuid
);
self.disconnect();
self.disconnect().await;
return;
}
_ => {
@ -108,10 +110,6 @@ impl<Out> Client<Out>
}
}
async fn handle_channel(&self, value: Option<ClientMessage>) {
unimplemented!();
}
pub async fn broadcast_message(&self, from: Uuid, content: String) -> Result<(), Error> {
self.connection.write(ClientStreamOut::GlobalMessage { from, content }).await?;
Ok(())
@ -119,16 +117,11 @@ impl<Out> Client<Out>
async fn disconnect(&self) {
let _ = self.out_channel
.send(ClientMessage::NewDisconnect {
.send(ClientMessage::Disconnect {
id: self.details.uuid,
connection: self.connection.clone()}.into()
);
}
#[deprecated]
pub async fn send_message(self: &Arc<Client<Out>>, msg: ClientMessage) {
let _ = self.tx.send(msg).await;
}
}
#[async_trait]
@ -140,21 +133,14 @@ impl<Out> IManager for Client<Out>
where
Self: Send + Sync + 'static
{
self.connection.write(Connected).await;
let _ = self.connection.write(Connected).await;
}
async fn run(self: &Arc<Self>) {
let mut channel_lock = self.rx.lock().await;
select! {
val = self.connection.read::<ClientStreamIn>() => {
self.handle_connection(val).await;
}
val = channel_lock.recv() => {
self.handle_channel(val).await;
}
}
}
}
@ -172,7 +158,7 @@ impl<Out> Drop for Client<Out>
tokio::spawn(async move {
let _ = connection.write(Disconnected).await;
let _ = out.send(
ClientMessage::NewDisconnect {
ClientMessage::Disconnect {
id,
connection
}.into()).await;
@ -225,7 +211,7 @@ mod test {
use foundation::test::create_connection_pair;
use crate::client::{Client};
use crate::messages::ClientMessage;
use crate::messages::ClientMessage::NewDisconnect;
use crate::messages::ClientMessage::Disconnect;
#[tokio::test]
async fn create_client_and_drop() -> Result<(), Error> {
@ -258,7 +244,7 @@ mod test {
// fetch from out_channel
let disconnect_msg = receiver.recv().await.unwrap();
assert_eq!(disconnect_msg, NewDisconnect {id: uuid, connection: Connection::new()});
assert_eq!(disconnect_msg, Disconnect {id: uuid, connection: Connection::new()});
Ok(())
}

View File

@ -1,8 +1,7 @@
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use futures::future::{join_all, select};
use futures::future::join_all;
use tokio::sync::Mutex;
use tokio::select;
@ -13,7 +12,6 @@ use uuid::Uuid;
use async_trait::async_trait;
use foundation::prelude::IManager;
use foundation::ClientDetails;
use foundation::connection::Connection;
use crate::client::Client;
@ -21,9 +19,11 @@ use crate::messages::ClientMessage;
#[derive(Debug)]
pub enum ClientMgrMessage {
#[allow(dead_code)]
Remove {
id: Uuid
},
#[allow(dead_code)]
SendClients {
to: Uuid
},
@ -37,7 +37,7 @@ pub enum ClientMgrMessage {
impl From<ClientMessage> for ClientMgrMessage {
fn from(msg: ClientMessage) -> Self {
use ClientMessage::{IncomingMessage,IncomingGlobalMessage,NewDisconnect,RequestedUpdate};
use ClientMessage::{IncomingMessage,IncomingGlobalMessage};
match msg {
IncomingMessage {
@ -70,13 +70,13 @@ impl From<ClientMessage> for ClientMgrMessage {
/// - server_channel: a channel to the parent that manages this object.
/// - tx: the sender that clients will send their messages to.
/// - rx: the receiver where messages are sent to.
#[derive(Debug)]
pub struct ClientManager<Out: 'static>
where
Out: From<ClientMgrMessage> + Send
{
clients: Mutex<HashMap<Uuid, Arc<Client<ClientMgrMessage>>>>,
#[allow(dead_code)]
server_channel: Mutex<Sender<Out>>,
tx: Sender<ClientMgrMessage>,
@ -100,6 +100,7 @@ impl<Out> ClientManager<Out>
})
}
#[allow(dead_code)]
pub async fn get_count(&self) -> usize {
self.clients.lock().await.len()
}
@ -123,6 +124,7 @@ impl<Out> ClientManager<Out>
lock.insert(client.details.uuid, client);
}
#[allow(dead_code)]
pub async fn remove_client(&self, id: Uuid) {
let mut lock = self.clients.lock().await;
lock.remove(&id);
@ -137,7 +139,7 @@ impl<Out> ClientManager<Out>
let mut lock = self.clients.lock().await;
lock.remove(&id);
},
Some(SendClients {to }) => {
Some(SendClients {to: _ }) => {
let lock = self.clients.lock().await;
let futures = lock.iter().map(|(_,_)| async {
println!("Send message to Client")
@ -158,19 +160,6 @@ impl<Out> ClientManager<Out>
}
}
}
async fn send_to_client(self: &Arc<ClientManager<Out>>, id: &Uuid, msg: ClientMessage) {
let lock = self.clients.lock().await;
if let Some(client) = lock.get(&id) {
client.clone().send_message(msg).await;
}
}
pub async fn send_message(self: Arc<ClientManager<Out>>, message: ClientMgrMessage) {
let _ = self.tx.send(message).await;
}
}
#[async_trait]

View File

@ -1,9 +1,7 @@
use std::sync::{Arc};
use uuid::Uuid;
use foundation::ClientDetails;
use foundation::connection::Connection;
use crate::client::Client;
use foundation::connection::Connection;
/// # ClientMessage
///
@ -18,39 +16,30 @@ use crate::client::Client;
#[derive(Debug)]
pub enum ClientMessage {
#[allow(dead_code)]
Connected,
#[allow(dead_code)]
IncomingMessage { from: Uuid, to: Uuid, content: String },
#[allow(dead_code)]
IncomingGlobalMessage { from: Uuid, content: String },
#[allow(dead_code)]
RequestedUpdate { from: Uuid },
NewDisconnect { id: Uuid, connection: Arc<Connection> },
Disconnect { id: Uuid, connection: Arc<Connection> },
Error,
#[deprecated]
Message { from: Uuid, content: String },
#[deprecated]
GlobalBroadcastMessage {from: Uuid, content:String},
#[deprecated]
SendClients { clients: Vec<ClientDetails> },
#[deprecated]
Disconnect,
}
impl PartialEq for ClientMessage {
fn eq(&self, other: &Self) -> bool {
use ClientMessage::{NewDisconnect, Connected, Error};
use ClientMessage::{Disconnect, Connected, Error};
match (self,other) {
(Connected, Connected) => true,
(Error, Error) => true,
(NewDisconnect {id, .. }, NewDisconnect {id: other_id, .. }) => id == other_id,
(Disconnect {id, .. }, Disconnect {id: other_id, .. }) => id == other_id,
_ => {
false
}

View File

@ -1,12 +1,12 @@
use std::io::{Error, ErrorKind};
use std::sync::{Arc,Weak};
use std::sync::Arc;
use uuid::Uuid;
use async_trait::async_trait;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::Sender;
use tokio::{select};
use tokio::sync::Mutex;
@ -37,6 +37,8 @@ impl PartialEq for NetworkManagerMessage {
address: other_address,
username: other_username, ..
}) => uuid == other_uuid && address == other_address && username == other_username,
#[allow(unreachable_patterns)]
_ => false
}
}
@ -81,6 +83,7 @@ impl<Out> NetworkManager<Out>
}
/// This fetches the IP address from the NetworkManager
#[allow(dead_code)]
pub async fn address(&self) -> String {
self.listener.lock().await.local_addr().unwrap().ip().to_string()
}
@ -107,6 +110,7 @@ impl<Out> NetworkManager<Out>
connection,
}.into()).await;
}
#[allow(unreachable_patterns)]
_ => {
return Err(Error::new(ErrorKind::InvalidData, "Did not receive valid message"));
}
@ -124,7 +128,7 @@ impl<Out: 'static> IManager for NetworkManager<Out>
let lock = self.listener.lock().await;
select! {
val = lock.accept() => {
if let Ok((stream, addr)) = val {
if let Ok((stream, _addr)) = val {
let _ = self.handle_connection(Arc::new(stream.into())).await;
}
}

View File

@ -1,7 +1,6 @@
use std::io::Error;
use std::sync::Arc;
// use crossbeam_channel::{unbounded, Receiver};
use futures::lock::Mutex;
use tokio::sync::mpsc::{channel, Receiver};
use uuid::Uuid;
@ -9,7 +8,6 @@ use foundation::connection::Connection;
use foundation::prelude::IManager;
use crate::client_manager::{ClientManager, ClientMgrMessage};
use crate::messages::{ClientMessage};
use crate::network_manager::{NetworkManager, NetworkManagerMessage};
#[derive(Debug)]
@ -39,6 +37,7 @@ impl From<NetworkManagerMessage> for ServerMessage {
username,
connection
},
#[allow(unreachable_patterns)]
_ => unimplemented!()
}
}
@ -121,8 +120,8 @@ impl Server {
).await
},
ServerMessage::BroadcastGlobalMessage {
from,
content
from: _,
content: _,
} => {
// server
// .client_manager
@ -131,6 +130,7 @@ impl Server {
// ClientMgrMessage::BroadcastGlobalMessage {sender, content}
// ).await
}
#[allow(unreachable_patterns)]
_ => {unimplemented!()}
}
}