merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
22 changed files with 724 additions and 354 deletions
Showing only changes of commit 8197231401 - Show all commits

View File

@ -1,6 +1,6 @@
mod worker;
mod managers;
mod WorkerMessage;
mod worker_message;
use worker::Worker;
use cursive::{Cursive, CursiveExt};
@ -10,12 +10,12 @@ use cursive::views::{Dialog, TextView};
fn main() {
let mut app = Cursive::default();
let workerStream =
let worker_stream =
Worker::new(app.cb_sink().clone()).start();
app.set_user_data(workerStream);
app.set_user_data(worker_stream);
app.add_layer(Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()));

View File

@ -1,16 +1,15 @@
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use cursive::views::{Dialog, TextView};
use tokio::sync::Mutex;
use tokio::time::sleep;
use async_trait::async_trait;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use serverlib::Server;
use foundation::ClientDetails;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
@ -19,19 +18,27 @@ use crate::managers::NetworkManagerMessage;
pub struct NetworkManager<M>
where M: From<NetworkManagerMessage> {
#[allow(unused)]
server_connection: Mutex<Option<Connection>>,
#[allow(unused)]
cursive: Sender<M>,
is_logged_in: AtomicBool,
}
impl<M> NetworkManager<M>
where M: From<NetworkManagerMessage> {
pub fn new(sender: Sender<M>) -> Arc<Self> {
Arc::new(NetworkManager {
server_connection: Mutex::new(None),
cursive: sender,
is_logged_in: AtomicBool::new(false),
})
}
#[allow(unused)]
pub async fn info<T: ToSocketAddrs>(self: &Arc<Self>, host: T) -> Result<NetworkManagerMessage, Error> {
let connection= Connection::new();
println!("Created connection");
@ -40,7 +47,7 @@ impl<M> NetworkManager<M>
println!("request: {:?}", req);
if let NetworkSockOut::Request = req {
if let NetworkSockOut::Request = req {
connection.write::<NetworkSockIn>(NetworkSockIn::Info)
.await?;
return Ok(connection.read::<NetworkSockOut>()
@ -49,25 +56,66 @@ impl<M> NetworkManager<M>
Err(Error::new(ErrorKind::ConnectionAborted, "Request not received"))
}
}
pub async fn login(self: &Arc<Self>, host: String, id: String, username: String) {
#[allow(unused)]
pub async fn login(
self: &Arc<Self>,
host: String,
uuid: Uuid,
username: String,
address: String
) -> Result<(), Error> {
let connection= Connection::new();
let _ = connection.connect(host);
// connection.write(NetworkSockIn::Connect {}).await;
let mut lock = self.server_connection.lock().await;
*lock = Some(connection);
let _ = connection.connect(host).await?;
println!("created connection");
let req = connection.read().await?;
println!("read request");
return if let NetworkSockOut::Request = req {
println!("got request");
connection.write(NetworkSockIn::Connect {username, uuid: uuid.to_string(), address}).await?;
let res = connection.read().await?;
// switch over to ClientStreamOut
if let ClientStreamOut::Connected = res {
let mut connection_lock = self.server_connection.lock().await;
let _ = mem::replace(&mut *connection_lock, Some(connection));
Ok(())
} else {
Err(Error::new(ErrorKind::ConnectionRefused, format!("expected connecting received: {:?}", res)))
}
} else {
println!("request not found");
Err(Error::new(ErrorKind::ConnectionAborted, "Server did not send request"))
}
}
pub async fn logout() {
#[allow(unused)]
pub async fn logout(self: &Arc<Self>) -> Result<(), Error> {
let mut connection_lock = self.server_connection.lock().await;
let connection = mem::replace(&mut *connection_lock, None).unwrap();
connection.write(ClientStreamIn::Disconnect).await?;
return if let ClientStreamOut::Disconnected = connection.read().await? {
Ok(())
} else {
Err(Error::new(ErrorKind::InvalidData, "disconnect failed, forcing disconnect"))
}
}
#[allow(unused)]
pub async fn update() {
}
#[allow(unused)]
async fn start(self: Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(async {
@ -80,7 +128,7 @@ impl<M> NetworkManager<M>
impl<M: 'static> IManager for NetworkManager<M>
where M: From<NetworkManagerMessage> + Send {
async fn run(self: Arc<Self>) {
let networkManager = self.clone();
// let networkManager = self.clone();
loop {
sleep(Duration::new(1,0)).await;
println!("networkManager tick")
@ -98,18 +146,17 @@ impl<M: 'static> IManager for NetworkManager<M>
#[cfg(test)]
mod test {
use std::future::Future;
use std::panic;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use serverlib::Server;
use crate::managers::Network::NetworkManagerMessage;
use crate::managers::Network::NetworkManagerMessage::Info;
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
async fn wrap_setup<T,F>(test: T)
where T: FnOnce(u16) -> F,
F: Future
{
let server = Server::new().unwrap();
let server = Server::new().await.unwrap();
let port = server.port();
tokio::spawn(
async move {
@ -121,8 +168,9 @@ mod test {
}
#[tokio::test]
async fn create_network_manager() {
async fn test_fetch_server_info() {
use NetworkManagerMessage::Info;
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
@ -138,12 +186,28 @@ mod test {
}).await;
}
// #[tokio::test]
// async fn fetch_server_info() {
// wrap_setup(|port| {
// async move {
//
// }
// })
// }
#[tokio::test]
async fn test_login_and_logout_to_server() {
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
let network = NetworkManager::new(tx);
println!("created network manger");
wrap_setup(|port| {
async move {
network.login(
format!("localhost:{}", port),
Uuid::default(),
"user1".to_string(),
"localhost".to_string()
).await.expect("login failed");
network.logout().await.expect("logout failed");
}
}).await;
}
}

View File

@ -3,6 +3,7 @@ use foundation::messages::network::NetworkSockOut;
#[derive(Debug)]
pub enum NetworkManagerMessage {
#[allow(unused)]
Users(Vec<ClientDetails>),
Info {
server_name: String,

View File

@ -1,7 +1,7 @@
mod Network;
mod network;
#[path = "NetworkManagerMessage.rs"]
mod Message;
#[path = "message.rs"]
mod message;
pub use Network::NetworkManager;
pub use Message::NetworkManagerMessage;
pub use network::NetworkManager;
pub use message::NetworkManagerMessage;

View File

@ -0,0 +1,213 @@
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use async_trait::async_trait;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
use crate::managers::NetworkManagerMessage;
pub struct NetworkManager<M>
where M: From<NetworkManagerMessage> {
#[allow(unused)]
server_connection: Mutex<Option<Connection>>,
#[allow(unused)]
cursive: Sender<M>,
is_logged_in: AtomicBool,
}
impl<M> NetworkManager<M>
where M: From<NetworkManagerMessage> {
pub fn new(sender: Sender<M>) -> Arc<Self> {
Arc::new(NetworkManager {
server_connection: Mutex::new(None),
cursive: sender,
is_logged_in: AtomicBool::new(false),
})
}
#[allow(unused)]
pub async fn info<T: ToSocketAddrs>(self: &Arc<Self>, host: T) -> Result<NetworkManagerMessage, Error> {
let connection= Connection::new();
println!("Created connection");
connection.connect(host).await?;
let req = connection.read().await?;
println!("request: {:?}", req);
if let NetworkSockOut::Request = req {
connection.write::<NetworkSockIn>(NetworkSockIn::Info)
.await?;
return Ok(connection.read::<NetworkSockOut>()
.await?.into());
} else {
Err(Error::new(ErrorKind::ConnectionAborted, "Request not received"))
}
}
#[allow(unused)]
pub async fn login(
self: &Arc<Self>,
host: String,
uuid: Uuid,
username: String,
address: String
) -> Result<(), Error> {
let connection= Connection::new();
let _ = connection.connect(host).await?;
println!("created connection");
let req = connection.read().await?;
println!("read request");
return if let NetworkSockOut::Request = req {
println!("got request");
connection.write(NetworkSockIn::Connect {username, uuid: uuid.to_string(), address}).await?;
let res = connection.read().await?;
// switch over to ClientStreamOut
if let ClientStreamOut::Connected = res {
let mut connection_lock = self.server_connection.lock().await;
let _ = mem::replace(&mut *connection_lock, Some(connection));
Ok(())
} else {
Err(Error::new(ErrorKind::ConnectionRefused, format!("expected connecting received: {:?}", res)))
}
} else {
println!("request not found");
Err(Error::new(ErrorKind::ConnectionAborted, "Server did not send request"))
}
}
#[allow(unused)]
pub async fn logout(self: &Arc<Self>) -> Result<(), Error> {
let mut connection_lock = self.server_connection.lock().await;
let connection = mem::replace(&mut *connection_lock, None).unwrap();
connection.write(ClientStreamIn::Disconnect).await?;
return if let ClientStreamOut::Disconnected = connection.read().await? {
Ok(())
} else {
Err(Error::new(ErrorKind::InvalidData, "disconnect failed, forcing disconnect"))
}
}
#[allow(unused)]
pub async fn update() {
}
#[allow(unused)]
async fn start(self: Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(async {
});
}
}
#[async_trait]
impl<M: 'static> IManager for NetworkManager<M>
where M: From<NetworkManagerMessage> + Send {
async fn run(self: Arc<Self>) {
// let networkManager = self.clone();
loop {
sleep(Duration::new(1,0)).await;
println!("networkManager tick")
}
}
async fn start(self: &Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(
network_manager.run()
);
}
}
#[cfg(test)]
mod test {
use std::future::Future;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use serverlib::Server;
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
async fn wrap_setup<T,F>(test: T)
where T: FnOnce(u16) -> F,
F: Future
{
let server = Server::new().await.unwrap();
let port = server.port();
tokio::spawn(
async move {
server.start().await;
}
);
test(port).await;
}
#[tokio::test]
async fn test_fetch_server_info() {
use NetworkManagerMessage::Info;
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
wrap_setup(|port| {
async move {
let network = NetworkManager::new(tx);
let info = network.info(format!("localhost:{}", port)).await.expect("Failed to fetch info");
assert_eq!(info, Info {
server_name: "oof".to_string(),
server_owner: "michael".to_string()
});
}
}).await;
}
#[tokio::test]
async fn test_login_and_logout_to_server() {
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
let network = NetworkManager::new(tx);
println!("created network manger");
wrap_setup(|port| {
async move {
network.login(
format!("localhost:{}", port),
Uuid::default(),
"user1".to_string(),
"localhost".to_string()
).await.expect("login failed");
network.logout().await.expect("logout failed");
}
}).await;
}
}

View File

@ -1,21 +1,17 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::thread::spawn;
use std::time::Duration;
use crossbeam_channel::Sender as CrossSender;
use cursive::backends::curses::n::ncurses::LcCategory::numeric;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{channel, Sender as TokioSender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use foundation::ClientDetails;
use crate::{Cursive, TextView};
use crate::managers::{NetworkManager, NetworkManagerMessage};
use crate::WorkerMessage::WorkerMessage;
use crate::managers::{NetworkManager};
use crate::worker_message::WorkerMessage;
pub type CursiveSender = CrossSender<Box<dyn FnOnce(&mut Cursive) + Send>>;
@ -27,12 +23,14 @@ pub struct Worker
network_manager: Arc<NetworkManager<WorkerMessage>>,
number: Arc<Mutex<usize>>,
#[allow(unused)]
user_details: Mutex<Option<ClientDetails>>,
}
impl Worker {
pub fn new(sender: CursiveSender) -> Worker {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
@ -45,12 +43,14 @@ impl Worker {
}
pub fn start(self) -> TokioSender<WorkerMessage> {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
spawn(move || {
let sender = self.cursive_sender.clone();
let rt = Runtime::new().unwrap();
let tmp_num = self.number.clone();
#[allow(unused)]
let network_manager = self.network_manager.clone();
rt.block_on(async move {
let a = &tmp_num;
@ -69,9 +69,4 @@ impl Worker {
});
tx
}
pub async fn sendLoginInfo(&self) {
}
}

View File

@ -5,12 +5,15 @@ pub enum WorkerMessage {
server_name: String,
server_owner: String,
},
Error(&'static str),
#[allow(unused)]
Error(String),
}
impl From<NetworkManagerMessage> for WorkerMessage {
fn from(other: NetworkManagerMessage) -> Self {
#[allow(unused)]
use WorkerMessage::{Info as NewInfo, Error as NewError};
#[allow(unused)]
use NetworkManagerMessage::{Info as OldInfo, Error};
match other {
OldInfo {server_name, server_owner}

View File

@ -1,38 +1,37 @@
use std::borrow::BorrowMut;
use std::io::{Error, ErrorKind};
use std::io::Write;
use std::mem;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io;
use tokio::io::{AsyncWriteExt, BufReader, AsyncBufReadExt, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::Mutex;
use crate::messages::client::ClientStreamOut;
use crate::messages::network::NetworkSockIn;
#[derive(Debug)]
pub struct Connection {
stream_rx: Mutex<Option<BufReader<ReadHalf<tokio::net::TcpStream>>>>,
stream_tx: Mutex<Option<WriteHalf<tokio::net::TcpStream>>>,
}
impl Connection {
pub fn new() -> Self {
Connection {
pub fn new() -> Arc<Self> {
Arc::new(Connection {
stream_rx: Mutex::new(None),
stream_tx: Mutex::new(None),
}
})
}
pub async fn connect<T: ToSocketAddrs>(&self, host: T) -> Result<(), Error> {
let connection = TcpStream::connect(host).await?;
let (rd, mut wd) = io::split(connection);
let (rd, wd) = io::split(connection);
let mut writer_lock = self.stream_tx.lock().await;
let mut reader_lock = self.stream_rx.lock().await;
mem::replace(&mut *writer_lock, Some(wd));
mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
let _ = mem::replace(&mut *writer_lock, Some(wd));
let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
Ok(())
}
@ -78,7 +77,7 @@ impl Connection {
impl From<TcpStream> for Connection {
fn from(stream: TcpStream) -> Self {
let (rd, mut wd) = io::split(stream);
let (rd, wd) = io::split(stream);
Connection {
stream_tx: Mutex::new(Some(wd)),
stream_rx: Mutex::new(Some(BufReader::new(rd))),
@ -94,7 +93,7 @@ mod test {
use tokio::net::TcpListener;
use serde::{Serialize,Deserialize};
use crate::connection::Connection;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum TestMessages {
Ping,
@ -122,8 +121,8 @@ mod test {
where T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future
{
let mut server = TcpListener::bind("localhost:0").await?;
let mut addr = server.local_addr()?;
let server = TcpListener::bind("localhost:0").await?;
let addr = server.local_addr()?;
// create tokio server execution
tokio::spawn(async move {

View File

@ -4,6 +4,7 @@ pub mod encryption;
pub mod messages;
pub mod prelude;
pub mod connection;
pub mod test;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

View File

@ -19,7 +19,7 @@ pub enum ClientStreamIn {
Disconnect,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ClientStreamOut {
Connected,
@ -33,3 +33,14 @@ pub enum ClientStreamOut {
Error,
}
impl PartialEq for ClientStreamOut {
fn eq(&self, other: &Self) -> bool {
use ClientStreamOut::{Connected, Disconnected};
match (self, other) {
(Connected, Connected) => true,
(Disconnected, Disconnected) => true,
_ => false
}
}
}

View File

@ -1,11 +1,12 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum NetworkSockIn {
Info,
Connect {
uuid: String,
uuid: Uuid,
username: String,
address: String,
},
@ -24,3 +25,16 @@ pub enum NetworkSockOut {
Error
}
impl PartialEq for NetworkSockOut {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(NetworkSockOut::Request, NetworkSockOut::Request) => true,
(NetworkSockOut::GotInfo {server_name,server_owner},
NetworkSockOut::GotInfo {server_owner: owner_other,server_name: name_other})
=> server_name == name_other && server_owner == owner_other,
(NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true,
_ => false
}
}
}

View File

@ -1,13 +1,46 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
/// This is used with all managers to implement multitasking
#[async_trait]
pub trait IManager {
/// This defines some setup before the tokio loop is started
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{}
/// this is used to get a future that can be awaited
async fn run(self: Arc<Self>);
async fn run(self: &Arc<Self>);
/// This is used to start a future through tokio
async fn start(self: &Arc<Self>);
fn start(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{
let weak_self: Weak<Self> = Arc::downgrade(self);
// this looks horrid but works
tokio::spawn(async move {
let weak_self = weak_self.clone();
let a = weak_self.upgrade().unwrap();
a.init().await;
drop(a);
loop {
sleep(Duration::new(1,0)).await;
if let Some(manager) =
Weak::upgrade(&weak_self)
{
manager.run().await
} else { () }
}
});
}
}

View File

@ -1,11 +1,12 @@
use std::io::{Error};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::join;
use tokio::net::{TcpStream,TcpListener};
use crate::connection::Connection;
pub async fn create_connection_pair()
-> Result<(Connection, (Connection, SocketAddr )), Error> {
-> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr )), Error> {
let listener: TcpListener = TcpListener::bind("localhost:0000").await?;
let port = listener.local_addr()?.port();
@ -16,7 +17,7 @@ pub async fn create_connection_pair()
);
let (client,addr) = client_res?;
let server = Connection::from(server_res?);
let client = Connection::from(client);
let server = Arc::new(Connection::from(server_res?));
let client = Arc::new(Connection::from(client));
Ok((server,(client,addr)))
}

View File

@ -25,5 +25,6 @@ zeroize = "1.1.0"
openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
async-trait = "0.1.52"
foundation = {path = '../foundation'}

View File

@ -1,4 +1,3 @@
use std::ops::Index;
use crate::client::Client;
use crate::messages::ServerMessage;
use std::sync::{Arc, Weak};
@ -8,28 +7,31 @@ use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Message {
content: String,
sender: Weak<Client>,
sender: Weak<Client<>>,
}
impl Message {
pub fn new(content: String, sender: Weak<Client>) -> Message {
#[allow(unused)]
pub fn new(content: String, sender: Weak<Client<>>) -> Message {
Message { content, sender }
}
}
enum ChatManagerMessage {
AddMessage {sender: Weak<Client>, content: String}
AddMessage {sender: Weak<Client<>>, content: String}
}
pub struct ChatManager {
messages: Mutex<Vec<Message>>,
server_channel: Sender<ServerMessage>,
#[allow(unused)]
tx: Sender<ChatManagerMessage>,
rx: Mutex<Receiver<ChatManagerMessage>>,
}
impl ChatManager {
#[allow(unused)]
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (tx, rx) = channel::<ChatManagerMessage>(1024);
@ -44,33 +46,34 @@ impl ChatManager {
manager
}
#[allow(unused)]
fn start(self: &Arc<ChatManager>) {
let manager = self.clone();
tokio::spawn(async move {
use ServerMessage::{BroadcastGlobalMessage};
use ChatManagerMessage::{AddMessage};
while let message = manager.rx.lock().await.recv().await {
while let Some(message) = manager.rx.lock().await.recv().await {
match message {
Some(AddMessage { content,sender }) => {
AddMessage { content,sender } => {
let sender = &sender.upgrade().unwrap().details.uuid;
manager.server_channel.send(
BroadcastGlobalMessage {sender: sender.clone(), content}
).await.unwrap();
}
None => {
println!("None found in message broadcast some how");
}
}
} });
}
});
}
#[allow(unused)]
pub async fn add_message(self: &Arc<Self>, sender: Weak<Client>, content: String) {
let mut a = self.messages.lock().await;
a.push(Message::new(content, sender))
}
#[allow(unused)]
pub async fn get_all_messages(self: &Arc<Self>) -> Vec<Message> {
self.messages.lock().await.clone()
}

View File

@ -1,22 +1,31 @@
use std::cmp::Ordering;
use std::fmt::Write;
use std::io::Error;
use std::sync::Arc;
use futures::executor::block_on;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zeroize::Zeroize;
use async_trait::async_trait;
use futures::lock::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{ReadHalf, WriteHalf};
use tokio::{select, task};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use tokio::sync::{Mutex};
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::ClientDetails;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use crate::messages::{ClientMessage};
#[derive(Serialize, Deserialize)]
enum ClientOutMessage {
MessageTo,
UpdateRequest,
}
/// # Client
/// This struct represents a connected user.
@ -29,258 +38,220 @@ use foundation::ClientDetails;
/// - stream_writer: the buffered writer used to send messages
/// - owner: An optional reference to the owning object.
#[derive(Debug)]
pub struct Client {
pub struct Client<Out: 'static>
where
Out: From<ClientMessage> + Send{
pub details: ClientDetails,
// server send channel
server_channel: Mutex<Sender<ServerMessage>>,
out_channel: Sender<Out>,
// object channels
tx: Sender<ClientMessage>,
rx: Mutex<Receiver<ClientMessage>>,
stream_rx: Mutex<BufReader<ReadHalf<tokio::net::TcpStream>>>,
stream_tx: Mutex<WriteHalf<tokio::net::TcpStream>>,
connection: Arc<Connection>,
}
// client funciton implmentations
impl Client {
// client function implementations
impl<Out> Client<Out>
where
Out: From<ClientMessage> + Send {
pub fn new(
uuid: String,
uuid: Uuid,
username: String,
address: String,
stream_rx: BufReader<ReadHalf<tokio::net::TcpStream>>,
stream_tx: WriteHalf<tokio::net::TcpStream>,
server_channel: Sender<ServerMessage>,
) -> Arc<Client> {
out_channel: Sender<Out>,
connection: Arc<Connection>
) -> Arc<Client<Out>> {
let (sender, receiver) = channel(1024);
Arc::new(Client {
details: ClientDetails {
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
uuid,
username,
address,
address: address.to_string(),
public_key: None,
},
server_channel: Mutex::new(server_channel),
tx: sender,
rx: Mutex::new(receiver),
stream_rx: Mutex::new(stream_rx),
stream_tx: Mutex::new(stream_tx),
connection: connection,
out_channel,
})
}
pub fn start(self: &Arc<Client>) {
let t1_client = self.clone();
let t2_client = self.clone();
// client stream read task
tokio::spawn(async move {
use ClientMessage::Disconnect;
let client = t1_client;
let mut lock = client.stream_tx.lock().await;
let mut buffer = String::new();
// tell client that is is now connected
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = lock.write_all(&buffer.as_bytes());
let _ = lock.flush().await;
drop(lock);
loop {
let mut stream_reader = client.stream_rx.lock().await;
let mut buffer = String::new();
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
match command {
Ok(ClientStreamIn::Disconnect) => {
println!(
"[Client {:?}]: Disconnect recieved",
&client.details.uuid
);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!(
"[Client {:?}]: send message to: {:?}",
&client.details.uuid, &to
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
})
.await;
}
Ok(ClientStreamIn::Update) => {
println!(
"[Client {:?}]: update received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientUpdate {
to: client.details.uuid,
})
.await;
}
Ok(ClientStreamIn::SendGlobalMessage {content}) => {
println!(
"[Client {:?}]: send global message received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::BroadcastGlobalMessage { content, sender: *&client.details.uuid.clone() })
.await;
}
_ => {
println!(
"[Client {:?}]: command not found",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientError {
to: client.details.uuid,
})
.await;
}
}
buffer.zeroize();
}
async fn handle_connection(&self, value: Result<ClientStreamIn, Error>) {
match value {
Ok(ClientStreamIn::Disconnect) => {
println!(
"[Client {:?}]: Disconnect received",
self.details.uuid
);
self.disconnect();
return;
}
});
// client channel read thread
tokio::spawn(async move {
use ClientMessage::{Disconnect, Error, Message, SendClients};
let client = t2_client;
loop {
let mut channel = client.rx.lock().await;
let mut buffer = String::new();
let message = channel.recv().await.unwrap();
drop(channel);
println!("[Client {:?}]: {:?}", &client.details.uuid, message);
match message {
Disconnect => {
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientDisconnected {
id: client.details.uuid,
})
.await;
return;
}
Message { from, content } => {
let msg = ClientStreamOut::UserMessage { from, content };
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
drop(stream);
}
SendClients { clients } => {
let client_details_vec: Vec<ClientDetails> = clients
.iter()
.map(|client| &client.details)
.cloned()
.collect();
let msg = ClientStreamOut::ConnectedClients {
clients: client_details_vec,
};
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
Error => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Error).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
ClientMessage::GlobalBroadcastMessage { from,content } => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::GlobalMessage {from, content}).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
}
_ => {
println!(
"[Client {:?}]: command not found",
self.details.uuid
);
let _ = self.out_channel
.send(ClientMessage::Error.into())
.await;
}
});
}
}
pub async fn send_message(self: &Arc<Client>, msg: ClientMessage) {
async fn handle_channel(&self, value: Option<ClientMessage>) {
unimplemented!();
}
async fn disconnect(&self) {
let _ = self.out_channel
.send(ClientMessage::NewDisconnect {
id: self.details.uuid,
connection: self.connection.clone()}.into()
);
}
#[deprecated]
pub async fn send_message(self: &Arc<Client<Out>>, msg: ClientMessage) {
let _ = self.tx.send(msg).await;
}
}
#[async_trait]
impl<Out> IManager for Client<Out>
where
Out: From<ClientMessage> + Send
{
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{
self.connection.write(Connected).await;
}
async fn run(self: &Arc<Self>) {
let mut channel_lock = self.rx.lock().await;
select! {
val = self.connection.read::<ClientStreamIn>() => {
self.handle_connection(val).await;
}
val = channel_lock.recv() => {
self.handle_channel(val).await;
}
}
}
}
// MARK: - use to handle disconnecting
impl<Out> Drop for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn drop(&mut self) {
let connection = self.connection.clone();
let out = self.out_channel.clone();
let id = self.details.uuid.clone();
tokio::spawn(async move {
let _ = connection.write(Disconnected).await;
let _ = out.send(
ClientMessage::NewDisconnect {
id,
connection
}.into()).await;
});
}
}
// MARK: - used for sorting.
impl PartialEq for Client {
impl<Out> PartialEq for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn eq(&self, other: &Self) -> bool {
self.details.uuid == other.details.uuid
}
}
impl Eq for Client {}
impl<Out> Eq for Client<Out>
where
Out: From<ClientMessage> + Send
{}
impl PartialOrd for Client {
impl<Out> PartialOrd for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Client {
impl<Out> Ord for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn cmp(&self, other: &Self) -> Ordering {
self.details.uuid.cmp(&other.details.uuid)
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] dropped!");
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use foundation::test::create_connection_pair;
use crate::client::{Client};
use crate::messages::ClientMessage;
use crate::messages::ClientMessage::NewDisconnect;
#[tokio::test]
async fn create_client_and_drop() -> Result<(), Error> {
let (sender, mut receiver) =
channel::<ClientMessage>(1024);
let (server, (client_conn, addr)) =
create_connection_pair().await?;
// client details
let uuid = Uuid::new_v4();
let username = "TestUser".to_string();
let client = Client::new(
uuid,
username,
addr.to_string(),
sender.clone(),
server
);
client.start();
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Connected);
drop(client);
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Disconnected);
// fetch from out_channel
let disconnect_msg = receiver.recv().await.unwrap();
assert_eq!(disconnect_msg, NewDisconnect {id: uuid, connection: Connection::new()});
Ok(())
}
}
}

View File

@ -1,13 +1,16 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use futures::future::join_all;
use futures::future::join_all;
use futures::lock::Mutex;
use tokio::join;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use uuid::Uuid;
use foundation::prelude::IManager;
use foundation::ClientDetails;
use crate::client::Client;
use crate::messages::ClientMessage;
use crate::messages::ClientMgrMessage;
@ -17,7 +20,7 @@ use crate::messages::ServerMessage;
/// This struct manages all connected users
#[derive(Debug)]
pub struct ClientManager {
clients: Mutex<HashMap<Uuid, Arc<Client>>>,
clients: Mutex<HashMap<Uuid, Arc<Client<ClientMgrMessage>>>>,
server_channel: Mutex<Sender<ServerMessage>>,
@ -39,6 +42,8 @@ impl ClientManager {
})
}
pub async fn add_client(self: &Arc<ClientManager>) {}
pub fn start(self: &Arc<ClientManager>) {
let client_manager = self.clone();
@ -49,15 +54,15 @@ impl ClientManager {
let mut receiver = client_manager.rx.lock().await;
let message = receiver.recv().await.unwrap();
println!("[Client manager]: recieved message: {:?}", message);
println!("[Client manager]: received message: {:?}", message);
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
client.start();
let mut lock = client_manager.clients.lock().await;
client.start();
if lock.insert(client.details.uuid, client).is_none() {
println!("value is new");
println!("client added");
}
}
Remove(uuid) => {
@ -76,8 +81,8 @@ impl ClientManager {
SendClients { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
let clients_vec: Vec<Arc<Client>> =
lock.values().cloned().collect();
let clients_vec: Vec<ClientDetails> =
lock.values().cloned().map(|i| i.details.clone()).collect();
client
.send_message(ClientMessage::SendClients {
@ -87,7 +92,6 @@ impl ClientManager {
}
}
ClientMgrMessage::BroadcastGlobalMessage {sender, content} => {
use futures::stream::TryStreamExt;
let lock = client_manager.clients.lock().await;
let futures = lock.iter()
.map(|i| i.1.send_message(

View File

@ -1,4 +1,4 @@
mod chat_manager;
// mod chat_manager;
mod client;
mod client_manager;
mod messages;

View File

@ -1,4 +1,4 @@
pub mod chat_manager;
// pub mod chat_manager;
pub mod client;
pub mod client_manager;
pub mod messages;
@ -29,7 +29,7 @@ async fn main() -> io::Result<()> {
)
.get_matches();
let server = Server::new().unwrap();
let server = Server::new().await.unwrap();
server.start().await;
Ok(())

View File

@ -1,25 +1,75 @@
use std::sync::{Arc, Weak};
use std::sync::{Arc};
use uuid::Uuid;
use foundation::ClientDetails;
use foundation::connection::Connection;
use crate::chat_manager::Message;
use crate::client::Client;
/// # ClientMessage
///
/// These messages are send from the client to a receiver
/// when events from the client happen that need to be delegated
///
/// ## Variants
///
///
/// ## Methods
///
#[derive(Debug)]
pub enum ClientMessage {
Message { from: Uuid, content: String },
GlobalBroadcastMessage {from: Uuid, content:String},
SendClients { clients: Vec<Arc<Client>> },
Connected,
Disconnect,
IncomingMessage { from: Uuid, to: Uuid, content: String },
IncomingGlobalMessage { from: Uuid, content: String },
RequestedUpdate { from: Uuid },
NewDisconnect { id: Uuid, connection: Arc<Connection> },
Error,
#[deprecated]
Message { from: Uuid, content: String },
#[deprecated]
GlobalBroadcastMessage {from: Uuid, content:String},
#[deprecated]
SendClients { clients: Vec<ClientDetails> },
#[deprecated]
Disconnect,
}
impl PartialEq for ClientMessage {
fn eq(&self, other: &Self) -> bool {
use ClientMessage::{NewDisconnect, Connected, Error};
match (self,other) {
(Connected, Connected) => true,
(Error, Error) => true,
(NewDisconnect {id, .. }, NewDisconnect {id: other_id, .. }) => id == other_id,
_ => {
false
}
}
}
}
#[derive(Debug)]
pub enum ClientMgrMessage {
Remove(Uuid),
Add(Arc<Client>),
Add(Arc<Client<Self>>),
SendClients {
to: Uuid,
},
@ -34,10 +84,16 @@ pub enum ClientMgrMessage {
},
}
impl From<ClientMessage> for ClientMgrMessage {
fn from(_: ClientMessage) -> Self {
todo!()
}
}
#[derive(Debug)]
pub enum ServerMessage {
ClientConnected {
client: Arc<Client>,
client: Arc<Client<Self>>,
},
ClientSendMessage {
from: Uuid,
@ -54,5 +110,13 @@ pub enum ServerMessage {
to: Uuid,
},
BroadcastGlobalMessage {sender: Uuid, content: String}
BroadcastGlobalMessage {sender: Uuid, content: String},
Some
}
impl From<ClientMessage> for ServerMessage {
fn from(_: ClientMessage) -> Self {
todo!()
}
}

View File

@ -22,7 +22,7 @@ pub enum NetworkManagerMessage {
address: String,
username: String,
connection: Connection
connection: Arc<Connection>
},
}
@ -85,7 +85,7 @@ impl<Out> NetworkManager<Out>
self.listener.lock().await.local_addr().unwrap().ip().to_string()
}
async fn handle_connection(&self, connection: Connection) -> Result<(), Error>{
async fn handle_connection(&self, connection: Arc<Connection>) -> Result<(), Error>{
use NetworkSockIn::{Info, Connect};
use NetworkSockOut::{GotInfo, Request, Connecting};
@ -125,28 +125,11 @@ impl<Out: 'static> IManager for NetworkManager<Out>
select! {
val = lock.accept() => {
if let Ok((stream, addr)) = val {
let _ = self.handle_connection(stream.into()).await;
let _ = self.handle_connection(Arc::new(stream.into())).await;
}
}
}
}
fn start(self: &Arc<Self>) {
let weak_self = Arc::downgrade(self);
let network = Mutex::new(weak_self.clone());
// this looks horrid but works
tokio::spawn(async move {
loop {
if let Some(network_manager) =
Weak::upgrade(&*network.lock().await)
{
network_manager.run().await
} else { () }
}
});
}
}
#[cfg(test)]

View File

@ -1,14 +1,22 @@
use std::io::Error;
use std::sync::Arc;
// use crossbeam_channel::{unbounded, Receiver};
use futures::lock::Mutex;
use tokio::sync::mpsc::{channel, Receiver};
use uuid::Uuid;
use foundation::prelude::IManager;
use crate::client_manager::ClientManager;
use crate::messages::ClientMgrMessage;
use crate::messages::{ClientMessage, ClientMgrMessage};
use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
use crate::network_manager::{NetworkManager, NetworkManagerMessage};
impl From<NetworkManagerMessage> for ServerMessage {
fn from(_: NetworkManagerMessage) -> Self {
ServerMessage::Some
}
}
/// # Server
/// authors: @michael-bailey, @Mitch161
@ -17,13 +25,13 @@ use crate::network_manager::NetworkManager;
///
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
network_manager: Arc<NetworkManager<ServerMessage>>,
receiver: Mutex<Receiver<ServerMessage>>,
}
impl Server {
/// Create a new server object
pub fn new() -> Result<Arc<Server>, Box<dyn std::error::Error>> {
pub async fn new() -> Result<Arc<Server>, Error> {
let (
sender,
receiver
@ -31,13 +39,13 @@ impl Server {
Ok(Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".parse().unwrap(), sender),
network_manager: NetworkManager::new("0.0.0.0:5600", sender).await?,
receiver: Mutex::new(receiver),
}))
}
pub fn port(self: &Arc<Server>) -> u16 {
self.network_manager.port()
pub async fn port(self: &Arc<Server>) -> u16 {
self.network_manager.port().await
}
pub async fn start(self: &Arc<Server>) {
@ -58,11 +66,11 @@ impl Server {
match message {
ServerMessage::ClientConnected { client } => {
server
.client_manager
.clone()
.send_message(Add(client))
.client_manager.add_client()
// .send_message(Add(client))
.await
}
},
ServerMessage::ClientDisconnected { id } => {
println!("disconnecting client {:?}", id);
server.client_manager.clone().send_message(Remove(id)).await;
@ -96,6 +104,7 @@ impl Server {
ClientMgrMessage::BroadcastGlobalMessage {sender, content}
).await
}
_ => {unimplemented!()}
}
}
}