merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
25 changed files with 1601 additions and 524 deletions
Showing only changes of commit 77bf1e0e25 - Show all commits

View File

@ -7,3 +7,16 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cursive = "0.17"
uuid = {version = "0.8", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
async-trait = "0.1.52"
server = {path = '../server'}
foundation = {path = '../foundation'}

View File

@ -1,3 +1,34 @@
mod worker;
mod managers;
mod worker_message;
use worker::Worker;
use cursive::{Cursive, CursiveExt};
use cursive::menu::{Item, Tree};
use cursive::traits::Nameable;
use cursive::views::{Dialog, TextView};
fn main() {
println!("Hello, world!");
let mut app = Cursive::default();
let worker_stream =
Worker::new(app.cb_sink().clone()).start();
app.set_user_data(worker_stream);
app.add_layer(Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()));
app.menubar().autohide = false;
app.menubar().add_subtree(
"Application",
Tree::new()
.item(
Item::leaf("About", |s| s.quit())
).delimiter().item(
Item::leaf("Quit",|s| s.quit())
)
);
app.set_fps(30);
app.run();
}

View File

@ -0,0 +1,213 @@
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use async_trait::async_trait;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
use crate::managers::NetworkManagerMessage;
pub struct NetworkManager<M>
where M: From<NetworkManagerMessage> {
#[allow(unused)]
server_connection: Mutex<Option<Connection>>,
#[allow(unused)]
cursive: Sender<M>,
is_logged_in: AtomicBool,
}
impl<M> NetworkManager<M>
where M: From<NetworkManagerMessage> {
pub fn new(sender: Sender<M>) -> Arc<Self> {
Arc::new(NetworkManager {
server_connection: Mutex::new(None),
cursive: sender,
is_logged_in: AtomicBool::new(false),
})
}
#[allow(unused)]
pub async fn info<T: ToSocketAddrs>(self: &Arc<Self>, host: T) -> Result<NetworkManagerMessage, Error> {
let connection= Connection::new();
println!("Created connection");
connection.connect(host).await?;
let req = connection.read().await?;
println!("request: {:?}", req);
if let NetworkSockOut::Request = req {
connection.write::<NetworkSockIn>(NetworkSockIn::Info)
.await?;
return Ok(connection.read::<NetworkSockOut>()
.await?.into());
} else {
Err(Error::new(ErrorKind::ConnectionAborted, "Request not received"))
}
}
#[allow(unused)]
pub async fn login(
self: &Arc<Self>,
host: String,
uuid: Uuid,
username: String,
address: String
) -> Result<(), Error> {
let connection= Connection::new();
let _ = connection.connect(host).await?;
println!("created connection");
let req = connection.read().await?;
println!("read request");
return if let NetworkSockOut::Request = req {
println!("got request");
connection.write(NetworkSockIn::Connect {username, uuid: uuid.to_string(), address}).await?;
let res = connection.read().await?;
// switch over to ClientStreamOut
if let ClientStreamOut::Connected = res {
let mut connection_lock = self.server_connection.lock().await;
let _ = mem::replace(&mut *connection_lock, Some(connection));
Ok(())
} else {
Err(Error::new(ErrorKind::ConnectionRefused, format!("expected connecting received: {:?}", res)))
}
} else {
println!("request not found");
Err(Error::new(ErrorKind::ConnectionAborted, "Server did not send request"))
}
}
#[allow(unused)]
pub async fn logout(self: &Arc<Self>) -> Result<(), Error> {
let mut connection_lock = self.server_connection.lock().await;
let connection = mem::replace(&mut *connection_lock, None).unwrap();
connection.write(ClientStreamIn::Disconnect).await?;
return if let ClientStreamOut::Disconnected = connection.read().await? {
Ok(())
} else {
Err(Error::new(ErrorKind::InvalidData, "disconnect failed, forcing disconnect"))
}
}
#[allow(unused)]
pub async fn update() {
}
#[allow(unused)]
async fn start(self: Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(async {
});
}
}
#[async_trait]
impl<M: 'static> IManager for NetworkManager<M>
where M: From<NetworkManagerMessage> + Send {
async fn run(self: Arc<Self>) {
// let networkManager = self.clone();
loop {
sleep(Duration::new(1,0)).await;
println!("networkManager tick")
}
}
async fn start(self: &Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(
network_manager.run()
);
}
}
#[cfg(test)]
mod test {
use std::future::Future;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use serverlib::Server;
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
async fn wrap_setup<T,F>(test: T)
where T: FnOnce(u16) -> F,
F: Future
{
let server = Server::new().await.unwrap();
let port = server.port();
tokio::spawn(
async move {
server.start().await;
}
);
test(port).await;
}
#[tokio::test]
async fn test_fetch_server_info() {
use NetworkManagerMessage::Info;
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
wrap_setup(|port| {
async move {
let network = NetworkManager::new(tx);
let info = network.info(format!("localhost:{}", port)).await.expect("Failed to fetch info");
assert_eq!(info, Info {
server_name: "oof".to_string(),
server_owner: "michael".to_string()
});
}
}).await;
}
#[tokio::test]
async fn test_login_and_logout_to_server() {
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
let network = NetworkManager::new(tx);
println!("created network manger");
wrap_setup(|port| {
async move {
network.login(
format!("localhost:{}", port),
Uuid::default(),
"user1".to_string(),
"localhost".to_string()
).await.expect("login failed");
network.logout().await.expect("logout failed");
}
}).await;
}
}

View File

@ -0,0 +1,39 @@
use foundation::ClientDetails;
use foundation::messages::network::NetworkSockOut;
#[derive(Debug)]
pub enum NetworkManagerMessage {
#[allow(unused)]
Users(Vec<ClientDetails>),
Info {
server_name: String,
server_owner: String,
},
Error(&'static str)
}
impl From<NetworkSockOut> for NetworkManagerMessage {
fn from(other: NetworkSockOut) -> Self {
use NetworkSockOut::{GotInfo as OldInfo};
use NetworkManagerMessage::{Info as NewInfo, Error};
match other {
OldInfo {server_name,server_owner} => NewInfo {server_name,server_owner},
_ => Error("Error occurred with conversion")
}
}
}
impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::Info;
match self {
Info {server_owner, server_name} => {
if let Info {server_owner: other_owner,server_name: other_name} = other {
return server_owner == other_owner && server_name == other_name;
}
false
}
_ => {false}
}
}
}

View File

@ -0,0 +1,7 @@
mod network;
#[path = "message.rs"]
mod message;
pub use network::NetworkManager;
pub use message::NetworkManagerMessage;

View File

@ -0,0 +1,213 @@
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use async_trait::async_trait;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
use crate::managers::NetworkManagerMessage;
pub struct NetworkManager<M>
where M: From<NetworkManagerMessage> {
#[allow(unused)]
server_connection: Mutex<Option<Connection>>,
#[allow(unused)]
cursive: Sender<M>,
is_logged_in: AtomicBool,
}
impl<M> NetworkManager<M>
where M: From<NetworkManagerMessage> {
pub fn new(sender: Sender<M>) -> Arc<Self> {
Arc::new(NetworkManager {
server_connection: Mutex::new(None),
cursive: sender,
is_logged_in: AtomicBool::new(false),
})
}
#[allow(unused)]
pub async fn info<T: ToSocketAddrs>(self: &Arc<Self>, host: T) -> Result<NetworkManagerMessage, Error> {
let connection= Connection::new();
println!("Created connection");
connection.connect(host).await?;
let req = connection.read().await?;
println!("request: {:?}", req);
if let NetworkSockOut::Request = req {
connection.write::<NetworkSockIn>(NetworkSockIn::Info)
.await?;
return Ok(connection.read::<NetworkSockOut>()
.await?.into());
} else {
Err(Error::new(ErrorKind::ConnectionAborted, "Request not received"))
}
}
#[allow(unused)]
pub async fn login(
self: &Arc<Self>,
host: String,
uuid: Uuid,
username: String,
address: String
) -> Result<(), Error> {
let connection= Connection::new();
let _ = connection.connect(host).await?;
println!("created connection");
let req = connection.read().await?;
println!("read request");
return if let NetworkSockOut::Request = req {
println!("got request");
connection.write(NetworkSockIn::Connect {username, uuid: uuid.to_string(), address}).await?;
let res = connection.read().await?;
// switch over to ClientStreamOut
if let ClientStreamOut::Connected = res {
let mut connection_lock = self.server_connection.lock().await;
let _ = mem::replace(&mut *connection_lock, Some(connection));
Ok(())
} else {
Err(Error::new(ErrorKind::ConnectionRefused, format!("expected connecting received: {:?}", res)))
}
} else {
println!("request not found");
Err(Error::new(ErrorKind::ConnectionAborted, "Server did not send request"))
}
}
#[allow(unused)]
pub async fn logout(self: &Arc<Self>) -> Result<(), Error> {
let mut connection_lock = self.server_connection.lock().await;
let connection = mem::replace(&mut *connection_lock, None).unwrap();
connection.write(ClientStreamIn::Disconnect).await?;
return if let ClientStreamOut::Disconnected = connection.read().await? {
Ok(())
} else {
Err(Error::new(ErrorKind::InvalidData, "disconnect failed, forcing disconnect"))
}
}
#[allow(unused)]
pub async fn update() {
}
#[allow(unused)]
async fn start(self: Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(async {
});
}
}
#[async_trait]
impl<M: 'static> IManager for NetworkManager<M>
where M: From<NetworkManagerMessage> + Send {
async fn run(self: Arc<Self>) {
// let networkManager = self.clone();
loop {
sleep(Duration::new(1,0)).await;
println!("networkManager tick")
}
}
async fn start(self: &Arc<Self>) {
let network_manager = self.clone();
tokio::spawn(
network_manager.run()
);
}
}
#[cfg(test)]
mod test {
use std::future::Future;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use serverlib::Server;
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
async fn wrap_setup<T,F>(test: T)
where T: FnOnce(u16) -> F,
F: Future
{
let server = Server::new().await.unwrap();
let port = server.port();
tokio::spawn(
async move {
server.start().await;
}
);
test(port).await;
}
#[tokio::test]
async fn test_fetch_server_info() {
use NetworkManagerMessage::Info;
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
wrap_setup(|port| {
async move {
let network = NetworkManager::new(tx);
let info = network.info(format!("localhost:{}", port)).await.expect("Failed to fetch info");
assert_eq!(info, Info {
server_name: "oof".to_string(),
server_owner: "michael".to_string()
});
}
}).await;
}
#[tokio::test]
async fn test_login_and_logout_to_server() {
#[allow(unused)]
let (tx,rx) =
channel::<NetworkManagerMessage>(16);
let network = NetworkManager::new(tx);
println!("created network manger");
wrap_setup(|port| {
async move {
network.login(
format!("localhost:{}", port),
Uuid::default(),
"user1".to_string(),
"localhost".to_string()
).await.expect("login failed");
network.logout().await.expect("logout failed");
}
}).await;
}
}

72
client/src/worker.rs Normal file
View File

@ -0,0 +1,72 @@
use std::sync::Arc;
use std::thread::spawn;
use std::time::Duration;
use crossbeam_channel::Sender as CrossSender;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Sender as TokioSender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use foundation::ClientDetails;
use crate::{Cursive, TextView};
use crate::managers::{NetworkManager};
use crate::worker_message::WorkerMessage;
pub type CursiveSender = CrossSender<Box<dyn FnOnce(&mut Cursive) + Send>>;
pub struct Worker
{
cursive_sender: CursiveSender,
network_manager: Arc<NetworkManager<WorkerMessage>>,
number: Arc<Mutex<usize>>,
#[allow(unused)]
user_details: Mutex<Option<ClientDetails>>,
}
impl Worker {
pub fn new(sender: CursiveSender) -> Worker {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
Worker {
network_manager: NetworkManager::new(tx.clone()),
number: Arc::new(Mutex::new(0)),
user_details: Mutex::new(None),
cursive_sender: sender
}
}
pub fn start(self) -> TokioSender<WorkerMessage> {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
spawn(move || {
let sender = self.cursive_sender.clone();
let rt = Runtime::new().unwrap();
let tmp_num = self.number.clone();
#[allow(unused)]
let network_manager = self.network_manager.clone();
rt.block_on(async move {
let a = &tmp_num;
loop {
let num = Arc::clone(&a);
sleep(Duration::new(1,0)).await;
let _ = sender.send(Box::new( move |s| {
let num = &num.clone();
let mut num_lock = num.blocking_lock();
*num_lock += 1;
let a = *num_lock;
s.find_name::<TextView>("TextView").unwrap().set_content(a.to_string());
}));
}
})
});
tx
}
}

View File

@ -0,0 +1,24 @@
use crate::managers::NetworkManagerMessage;
pub enum WorkerMessage {
Info {
server_name: String,
server_owner: String,
},
#[allow(unused)]
Error(String),
}
impl From<NetworkManagerMessage> for WorkerMessage {
fn from(other: NetworkManagerMessage) -> Self {
#[allow(unused)]
use WorkerMessage::{Info as NewInfo, Error as NewError};
#[allow(unused)]
use NetworkManagerMessage::{Info as OldInfo, Error};
match other {
OldInfo {server_name, server_owner}
=> NewInfo {server_owner,server_name},
_ => todo!()
}
}
}

View File

@ -8,6 +8,7 @@ edition = "2018"
[lib]
[dependencies]
async-trait = "0.1.52"
regex = "1"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
@ -21,5 +22,6 @@ log = "0.4"
url = "2.2.0"
uuid = {version = "0.8", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.9.0", features = ["full"] }
serde_json = "1.0"
openssl = "0.10"

View File

@ -0,0 +1,143 @@
use std::io::{Error, ErrorKind};
use std::io::Write;
use std::mem;
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io;
use tokio::io::{AsyncWriteExt, BufReader, AsyncBufReadExt, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct Connection {
stream_rx: Mutex<Option<BufReader<ReadHalf<tokio::net::TcpStream>>>>,
stream_tx: Mutex<Option<WriteHalf<tokio::net::TcpStream>>>,
}
impl Connection {
pub fn new() -> Arc<Self> {
Arc::new(Connection {
stream_rx: Mutex::new(None),
stream_tx: Mutex::new(None),
})
}
pub async fn connect<T: ToSocketAddrs>(&self, host: T) -> Result<(), Error> {
let connection = TcpStream::connect(host).await?;
let (rd, wd) = io::split(connection);
let mut writer_lock = self.stream_tx.lock().await;
let mut reader_lock = self.stream_rx.lock().await;
let _ = mem::replace(&mut *writer_lock, Some(wd));
let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
Ok(())
}
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where T: Serialize {
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
writeln!(&mut out_buffer, "{}", out)?;
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
return if let Some(mut writer) = old {
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
} else {
Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"))
}
}
pub async fn read<T>(&self) -> Result<T,Error>
where T: DeserializeOwned {
let mut buffer = String::new();
let mut reader_lock = self.stream_rx.lock().await;
let old = mem::replace(&mut *reader_lock, None);
if let Some(mut reader) = old {
let _ = reader.read_line(&mut buffer).await?;
let _ = mem::replace(&mut *reader_lock, Some(reader));
Ok(serde_json::from_str(&buffer).unwrap())
} else {
Err(Error::new(ErrorKind::Interrupted, "Reader does not exist"))
}
}
}
impl From<TcpStream> for Connection {
fn from(stream: TcpStream) -> Self {
let (rd, wd) = io::split(stream);
Connection {
stream_tx: Mutex::new(Some(wd)),
stream_rx: Mutex::new(Some(BufReader::new(rd))),
}
}
}
#[cfg(test)]
mod test {
use std::future::Future;
use std::io::Error;
use std::panic;
use tokio::net::TcpListener;
use serde::{Serialize,Deserialize};
use crate::connection::Connection;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum TestMessages {
Ping,
Pong
}
#[tokio::test]
async fn a() -> Result<(), Error> {
wrap_setup(|port| {
async move {
println!("{}", port);
let connection = Connection::new();
connection.connect(format!("localhost:{}", &port)).await.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
}
}).await
}
async fn wrap_setup<T,F>(test: T) -> Result<(), std::io::Error>
where T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future
{
let server = TcpListener::bind("localhost:0").await?;
let addr = server.local_addr()?;
// create tokio server execution
tokio::spawn(async move {
while let Ok((stream, addr)) = server.accept().await {
use TestMessages::{Ping,Pong};
println!("[server]: Connected {}", &addr);
let connection = Connection::from(stream);
if let Ok(Ping) = connection.read::<TestMessages>().await {
connection.write::<TestMessages>(Pong).await.unwrap()
}
}
});
test(addr.port()).await;
Ok(())
}
}

View File

@ -1,6 +1,10 @@
extern crate core;
pub mod encryption;
pub mod messages;
pub mod prelude;
pub mod connection;
pub mod test;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

View File

@ -19,7 +19,7 @@ pub enum ClientStreamIn {
Disconnect,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ClientStreamOut {
Connected,
@ -33,3 +33,14 @@ pub enum ClientStreamOut {
Error,
}
impl PartialEq for ClientStreamOut {
fn eq(&self, other: &Self) -> bool {
use ClientStreamOut::{Connected, Disconnected};
match (self, other) {
(Connected, Connected) => true,
(Disconnected, Disconnected) => true,
_ => false
}
}
}

View File

@ -1,24 +1,40 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum NetworkSockIn {
Info,
Connect {
uuid: String,
uuid: Uuid,
username: String,
address: String,
},
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum NetworkSockOut<'a> {
pub enum NetworkSockOut {
Request,
GotInfo {
server_name: &'a str,
server_owner: &'a str,
server_name: String,
server_owner: String,
},
Connecting,
Error
}
impl PartialEq for NetworkSockOut {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(NetworkSockOut::Request, NetworkSockOut::Request) => true,
(NetworkSockOut::GotInfo {server_name,server_owner},
NetworkSockOut::GotInfo {server_owner: owner_other,server_name: name_other})
=> server_name == name_other && server_owner == owner_other,
(NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true,
_ => false
}
}
}

View File

@ -1,15 +1,46 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
pub trait IMessagable<TMessage, TSender> {
fn send_message(&self, msg: TMessage);
fn set_sender(&self, sender: TSender);
}
pub trait ICooperative {
fn tick(&self);
}
/// This is used with all managers to implement multitasking
#[async_trait]
pub trait IManager {
pub trait IPreemptive {
fn run(arc: &Arc<Self>);
fn start(arc: &Arc<Self>);
}
/// This defines some setup before the tokio loop is started
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{}
/// this is used to get a future that can be awaited
async fn run(self: &Arc<Self>);
/// This is used to start a future through tokio
fn start(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{
let weak_self: Weak<Self> = Arc::downgrade(self);
// this looks horrid but works
tokio::spawn(async move {
let weak_self = weak_self.clone();
let a = weak_self.upgrade().unwrap();
a.init().await;
drop(a);
loop {
sleep(Duration::new(1,0)).await;
if let Some(manager) =
Weak::upgrade(&weak_self)
{
manager.run().await
} else { () }
}
});
}
}

View File

@ -0,0 +1,23 @@
use std::io::{Error};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::join;
use tokio::net::{TcpStream,TcpListener};
use crate::connection::Connection;
pub async fn create_connection_pair()
-> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr )), Error> {
let listener: TcpListener = TcpListener::bind("localhost:0000").await?;
let port = listener.local_addr()?.port();
let (server_res,client_res) = join!(
async { TcpStream::connect(format!("localhost:{}", port)).await },
async { listener.accept().await }
);
let (client,addr) = client_res?;
let server = Arc::new(Connection::from(server_res?));
let client = Arc::new(Connection::from(client));
Ok((server,(client,addr)))
}

View File

@ -0,0 +1,3 @@
mod connection_pair;
pub use connection_pair::create_connection_pair;

View File

@ -6,6 +6,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "serverlib"
path = "src/lib.rs"
[[bin]]
name = "server"
path = "src/main.rs"
[dependencies]
clap = "2.33.3"
uuid = {version = "0.8", features = ["serde", "v4"]}
@ -17,5 +25,6 @@ zeroize = "1.1.0"
openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
futures = "0.3.16"
async-trait = "0.1.52"
foundation = {path = '../foundation'}

View File

@ -1,4 +1,3 @@
use std::ops::Index;
use crate::client::Client;
use crate::messages::ServerMessage;
use std::sync::{Arc, Weak};
@ -8,28 +7,31 @@ use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Message {
content: String,
sender: Weak<Client>,
sender: Weak<Client<>>,
}
impl Message {
pub fn new(content: String, sender: Weak<Client>) -> Message {
#[allow(unused)]
pub fn new(content: String, sender: Weak<Client<>>) -> Message {
Message { content, sender }
}
}
enum ChatManagerMessage {
AddMessage {sender: Weak<Client>, content: String}
AddMessage {sender: Weak<Client<>>, content: String}
}
pub struct ChatManager {
messages: Mutex<Vec<Message>>,
server_channel: Sender<ServerMessage>,
#[allow(unused)]
tx: Sender<ChatManagerMessage>,
rx: Mutex<Receiver<ChatManagerMessage>>,
}
impl ChatManager {
#[allow(unused)]
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (tx, rx) = channel::<ChatManagerMessage>(1024);
@ -44,33 +46,34 @@ impl ChatManager {
manager
}
#[allow(unused)]
fn start(self: &Arc<ChatManager>) {
let manager = self.clone();
tokio::spawn(async move {
use ServerMessage::{BroadcastGlobalMessage};
use ChatManagerMessage::{AddMessage};
while let message = manager.rx.lock().await.recv().await {
while let Some(message) = manager.rx.lock().await.recv().await {
match message {
Some(AddMessage { content,sender }) => {
AddMessage { content,sender } => {
let sender = &sender.upgrade().unwrap().details.uuid;
manager.server_channel.send(
BroadcastGlobalMessage {sender: sender.clone(), content}
).await.unwrap();
}
None => {
println!("None found in message broadcast some how");
}
}
} });
}
});
}
#[allow(unused)]
pub async fn add_message(self: &Arc<Self>, sender: Weak<Client>, content: String) {
let mut a = self.messages.lock().await;
a.push(Message::new(content, sender))
}
#[allow(unused)]
pub async fn get_all_messages(self: &Arc<Self>) -> Vec<Message> {
self.messages.lock().await.clone()
}

View File

@ -1,22 +1,33 @@
use std::cmp::Ordering;
use std::fmt::Write;
use std::io::Error;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zeroize::Zeroize;
use async_trait::async_trait;
use futures::lock::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{ReadHalf, WriteHalf};
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::ClientDetails;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use crate::messages::{ClientMessage};
/// # ClientInMessage
///
/// Messages that are sent internally
/// when functions are called on the client
#[derive(Serialize, Deserialize)]
enum ClientInMessage {
MessageTo,
UpdateRequest,
}
/// # Client
/// This struct represents a connected user.
@ -29,258 +40,206 @@ use foundation::ClientDetails;
/// - stream_writer: the buffered writer used to send messages
/// - owner: An optional reference to the owning object.
#[derive(Debug)]
pub struct Client {
pub struct Client<Out: 'static>
where
Out: From<ClientMessage> + Send
{
pub details: ClientDetails,
// server send channel
server_channel: Mutex<Sender<ServerMessage>>,
// object channels
tx: Sender<ClientMessage>,
rx: Mutex<Receiver<ClientMessage>>,
stream_rx: Mutex<BufReader<ReadHalf<tokio::net::TcpStream>>>,
stream_tx: Mutex<WriteHalf<tokio::net::TcpStream>>,
out_channel: Sender<Out>,
connection: Arc<Connection>,
}
// client funciton implmentations
impl Client {
impl<Out> Client<Out>
where
Out: From<ClientMessage> + Send {
pub fn new(
uuid: String,
uuid: Uuid,
username: String,
address: String,
stream_rx: BufReader<ReadHalf<tokio::net::TcpStream>>,
stream_tx: WriteHalf<tokio::net::TcpStream>,
server_channel: Sender<ServerMessage>,
) -> Arc<Client> {
let (sender, receiver) = channel(1024);
out_channel: Sender<Out>,
connection: Arc<Connection>
) -> Arc<Client<Out>> {
Arc::new(Client {
details: ClientDetails {
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
uuid,
username,
address,
address: address.to_string(),
public_key: None,
},
server_channel: Mutex::new(server_channel),
tx: sender,
rx: Mutex::new(receiver),
stream_rx: Mutex::new(stream_rx),
stream_tx: Mutex::new(stream_tx),
connection,
out_channel,
})
}
pub fn start(self: &Arc<Client>) {
let t1_client = self.clone();
let t2_client = self.clone();
// client stream read task
tokio::spawn(async move {
use ClientMessage::Disconnect;
let client = t1_client;
let mut lock = client.stream_tx.lock().await;
let mut buffer = String::new();
// tell client that is is now connected
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = lock.write_all(&buffer.as_bytes());
let _ = lock.flush().await;
drop(lock);
loop {
let mut stream_reader = client.stream_rx.lock().await;
let mut buffer = String::new();
if let Ok(_size) = stream_reader.read_line(&mut buffer).await {
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
println!("[Client {:?}]: recieved {}", client.details.uuid, &buffer);
match command {
Ok(ClientStreamIn::Disconnect) => {
println!(
"[Client {:?}]: Disconnect recieved",
&client.details.uuid
);
client.send_message(Disconnect).await;
return;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!(
"[Client {:?}]: send message to: {:?}",
&client.details.uuid, &to
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientSendMessage {
from: client.details.uuid,
to,
content,
})
.await;
}
Ok(ClientStreamIn::Update) => {
println!(
"[Client {:?}]: update received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientUpdate {
to: client.details.uuid,
})
.await;
}
Ok(ClientStreamIn::SendGlobalMessage {content}) => {
println!(
"[Client {:?}]: send global message received",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::BroadcastGlobalMessage { content, sender: *&client.details.uuid.clone() })
.await;
}
_ => {
println!(
"[Client {:?}]: command not found",
&client.details.uuid
);
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientError {
to: client.details.uuid,
})
.await;
}
}
buffer.zeroize();
}
async fn handle_connection(&self, value: Result<ClientStreamIn, Error>) {
match value {
Ok(ClientStreamIn::Disconnect) => {
println!(
"[Client {:?}]: Disconnect received",
self.details.uuid
);
self.disconnect().await;
return;
}
});
// client channel read thread
tokio::spawn(async move {
use ClientMessage::{Disconnect, Error, Message, SendClients};
let client = t2_client;
loop {
let mut channel = client.rx.lock().await;
let mut buffer = String::new();
let message = channel.recv().await.unwrap();
drop(channel);
println!("[Client {:?}]: {:?}", &client.details.uuid, message);
match message {
Disconnect => {
let lock = client.server_channel.lock().await;
let _ = lock
.send(ServerMessage::ClientDisconnected {
id: client.details.uuid,
})
.await;
return;
}
Message { from, content } => {
let msg = ClientStreamOut::UserMessage { from, content };
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
drop(stream);
}
SendClients { clients } => {
let client_details_vec: Vec<ClientDetails> = clients
.iter()
.map(|client| &client.details)
.cloned()
.collect();
let msg = ClientStreamOut::ConnectedClients {
clients: client_details_vec,
};
let _ =
writeln!(buffer, "{}", serde_json::to_string(&msg).unwrap());
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
Error => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Error).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
ClientMessage::GlobalBroadcastMessage { from,content } => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::GlobalMessage {from, content}).unwrap()
);
let mut stream = client.stream_tx.lock().await;
let _ = stream.write_all(&buffer.as_bytes()).await;
let _ = stream.flush().await;
}
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
let _ = self.out_channel.send(
ClientMessage::IncomingMessage {from: self.details.uuid, to, content}.into()
).await;
}
});
Ok(ClientStreamIn::SendGlobalMessage { content }) => {
let _ = self.out_channel.send(
ClientMessage::IncomingGlobalMessage {from: self.details.uuid, content}.into()
).await;
}
_ => {
self.error("Command not found").await;
}
}
}
pub async fn send_message(self: &Arc<Client>, msg: ClientMessage) {
let _ = self.tx.send(msg).await;
pub async fn broadcast_message(&self, from: Uuid, content: String) -> Result<(), Error> {
self.connection.write(ClientStreamOut::GlobalMessage { from, content }).await?;
Ok(())
}
pub async fn user_message(&self, from: Uuid, content: String) -> Result<(), Error> {
self.connection.write(ClientStreamOut::UserMessage { from, content }).await?;
Ok(())
}
async fn disconnect(&self) {
let _ = self.out_channel
.send(ClientMessage::Disconnect {
id: self.details.uuid,
}.into()).await;
}
async fn error(&self, msg: &str) {
let _ = self.connection.write(ClientStreamOut::Error).await;
}
}
#[async_trait]
impl<Out> IManager for Client<Out>
where
Out: From<ClientMessage> + Send
{
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static
{
let _ = self.connection.write(Connected).await;
}
async fn run(self: &Arc<Self>) {
let client = self.clone();
select! {
val = self.connection.read::<ClientStreamIn>() => {
tokio::spawn(async move {
client.handle_connection(val).await;
});
}
}
}
}
// MARK: - use to handle disconnecting
impl<Out> Drop for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn drop(&mut self) {
let connection = self.connection.clone();
let id = self.details.uuid.clone();
tokio::spawn(async move {
let _ = connection.write(Disconnected).await;
});
}
}
// MARK: - used for sorting.
impl PartialEq for Client {
impl<Out> PartialEq for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn eq(&self, other: &Self) -> bool {
self.details.uuid == other.details.uuid
}
}
impl Eq for Client {}
impl<Out> Eq for Client<Out>
where
Out: From<ClientMessage> + Send
{}
impl PartialOrd for Client {
impl<Out> PartialOrd for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Client {
impl<Out> Ord for Client<Out>
where
Out: From<ClientMessage> + Send
{
fn cmp(&self, other: &Self) -> Ordering {
self.details.uuid.cmp(&other.details.uuid)
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] dropped!");
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::ClientStreamOut::{Connected, Disconnected};
use foundation::prelude::IManager;
use foundation::test::create_connection_pair;
use crate::client::{Client};
use crate::messages::ClientMessage;
use crate::messages::ClientMessage::Disconnect;
#[tokio::test]
async fn create_client_and_drop() -> Result<(), Error> {
let (sender, mut receiver) =
channel::<ClientMessage>(1024);
let (server, (client_conn, addr)) =
create_connection_pair().await?;
// client details
let uuid = Uuid::new_v4();
let username = "TestUser".to_string();
let client = Client::new(
uuid,
username,
addr.to_string(),
sender.clone(),
server
);
client.start();
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Connected);
drop(client);
let res = client_conn.read::<ClientStreamOut>().await?;
assert_eq!(res, Disconnected);
// fetch from out_channel
let disconnect_msg = receiver.recv().await.unwrap();
assert_eq!(disconnect_msg, Disconnect {id: uuid, connection: Connection::new()});
Ok(())
}
}
}

View File

@ -1,122 +1,229 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use futures::future::join_all;
use futures::lock::Mutex;
use tokio::join;
use futures::future::join_all;
use tokio::sync::Mutex;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use uuid::Uuid;
use async_trait::async_trait;
use foundation::prelude::IManager;
use foundation::connection::Connection;
use crate::client::Client;
use crate::messages::ClientMessage;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
#[derive(Debug)]
pub enum ClientMgrMessage {
#[allow(dead_code)]
Remove {
id: Uuid
},
#[allow(dead_code)]
SendClients {
to: Uuid
},
SendMessage {
from: Uuid,
to: Uuid,
content: String,
},
BroadcastGlobalMessage {from: Uuid, content: String},
}
impl From<ClientMessage> for ClientMgrMessage {
fn from(msg: ClientMessage) -> Self {
use ClientMessage::{IncomingMessage,IncomingGlobalMessage,Disconnect};
match msg {
IncomingMessage {
from,
to,
content
} => ClientMgrMessage::SendMessage {
from,
to,
content
},
IncomingGlobalMessage{
from,
content
} => ClientMgrMessage::BroadcastGlobalMessage {
from,
content
},
Disconnect {id} => ClientMgrMessage::Remove {id},
_ => unimplemented!()
}
}
}
/// # ClientManager
/// This struct manages all connected users
#[derive(Debug)]
pub struct ClientManager {
clients: Mutex<HashMap<Uuid, Arc<Client>>>,
/// This struct manages all users connected to the server.
///
/// ## Attributes
/// - clients: a vector of all clients being managed.
/// - server_channel: a channel to the parent that manages this object.
/// - tx: the sender that clients will send their messages to.
/// - rx: the receiver where messages are sent to.
pub struct ClientManager<Out: 'static>
where
Out: From<ClientMgrMessage> + Send
{
clients: Mutex<HashMap<Uuid, Arc<Client<ClientMgrMessage>>>>,
server_channel: Mutex<Sender<ServerMessage>>,
#[allow(dead_code)]
server_channel: Mutex<Sender<Out>>,
tx: Sender<ClientMgrMessage>,
rx: Mutex<Receiver<ClientMgrMessage>>,
}
impl ClientManager {
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
impl<Out> ClientManager<Out>
where
Out: From<ClientMgrMessage> + Send
{
pub fn new(out_channel: Sender<Out>) -> Arc<Self> {
let (tx, rx) = channel(1024);
Arc::new(ClientManager {
clients: Mutex::default(),
server_channel: Mutex::new(server_channel),
server_channel: Mutex::new(out_channel),
tx,
rx: Mutex::new(rx),
})
}
pub fn start(self: &Arc<ClientManager>) {
let client_manager = self.clone();
tokio::spawn(async move {
use ClientMgrMessage::{Add, Remove, SendClients, SendError, SendMessage};
loop {
let mut receiver = client_manager.rx.lock().await;
let message = receiver.recv().await.unwrap();
println!("[Client manager]: recieved message: {:?}", message);
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
client.start();
let mut lock = client_manager.clients.lock().await;
if lock.insert(client.details.uuid, client).is_none() {
println!("value is new");
}
}
Remove(uuid) => {
println!("[Client Manager]: removing client: {:?}", &uuid);
if let Some(client) =
client_manager.clients.lock().await.remove(&uuid)
{
client.send_message(ClientMessage::Disconnect).await;
}
}
SendMessage { to, from, content } => {
client_manager
.send_to_client(&to, ClientMessage::Message { from, content })
.await;
}
SendClients { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
let clients_vec: Vec<Arc<Client>> =
lock.values().cloned().collect();
client
.send_message(ClientMessage::SendClients {
clients: clients_vec,
})
.await
}
}
ClientMgrMessage::BroadcastGlobalMessage {sender, content} => {
use futures::stream::TryStreamExt;
let lock = client_manager.clients.lock().await;
let futures = lock.iter()
.map(|i| i.1.send_message(
ClientMessage::GlobalBroadcastMessage {from: sender, content: content.clone()}
));
join_all(futures).await;
}
SendError { to } => {
let lock = client_manager.clients.lock().await;
if let Some(client) = lock.get(&to) {
client.send_message(ClientMessage::Error).await
}
}
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}
}
});
#[allow(dead_code)]
pub async fn get_count(&self) -> usize {
self.clients.lock().await.len()
}
async fn send_to_client(self: &Arc<ClientManager>, id: &Uuid, msg: ClientMessage) {
let lock = self.clients.lock().await;
if let Some(client) = lock.get(&id) {
client.clone().send_message(msg).await;
pub async fn add_client(
&self,
id: Uuid,
username: String,
address: String,
connection: Arc<Connection>
) {
let client = Client::new(
id,
username,
address,
self.tx.clone(),
connection
);
client.start();
let mut lock = self.clients.lock().await;
lock.insert(client.details.uuid, client);
}
#[allow(dead_code)]
pub async fn remove_client(&self, id: Uuid) {
let mut lock = self.clients.lock().await;
lock.remove(&id);
}
pub async fn handle_channel(&self, message: Option<ClientMgrMessage>) {
use ClientMgrMessage::{Remove, SendClients, BroadcastGlobalMessage, SendMessage};
println!("Handling channel");
match message {
Some(Remove {id}) => {
println!("[Client Manager]: removing client: {:?}", &id);
let mut lock = self.clients.lock().await;
lock.remove(&id);
},
Some(SendClients {to: _ }) => {
let lock = self.clients.lock().await;
let futures = lock.iter().map(|(_,_)| async {
println!("Send message to Client")
});
join_all(futures).await;
}
Some(BroadcastGlobalMessage {from, content}) => {
let lock = self.clients.lock().await;
let futures = lock.iter()
.map(|(_,c)| (c.clone(),content.clone()))
.map(|(c,s)| async move {
c.broadcast_message(from, s).await.unwrap();
});
join_all(futures).await;
},
Some(SendMessage { from, to, content }) => {
let lock = self.clients.lock().await;
let client = lock.get(&to).unwrap();
let _ = client.user_message(from, content).await;
},
Some(Remove {id}) => {
self.clients.lock().await.remove(&id);
}
_ => {
unimplemented!()
}
}
}
}
pub async fn send_message(self: Arc<ClientManager>, message: ClientMgrMessage) {
let _ = self.tx.send(message).await;
#[async_trait]
impl<Out> IManager for ClientManager<Out>
where
Out: From<ClientMgrMessage> + Send
{
async fn run(self: &Arc<Self>) {
loop {
let mut receiver = self.rx.lock().await;
select! {
val = receiver.recv() => {
self.handle_channel(val).await;
}
}
}
}
}
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::messages::client::ClientStreamOut;
use foundation::prelude::IManager;
use foundation::test::create_connection_pair;
use crate::client_manager::{ClientManager, ClientMgrMessage};
#[tokio::test]
async fn add_new_client_to_manager() -> Result<(), Error> {
let (sender, mut receiver) =
channel::<ClientMgrMessage>(1024);
let (server, (client, addr)) = create_connection_pair().await?;
let client_manager = ClientManager::new(sender);
client_manager.start();
let id = Uuid::new_v4();
let username = "TestUser".to_string();
client_manager.add_client(
id,
username.clone(),
addr.to_string(),
server
).await;
assert_eq!(client_manager.get_count().await, 1);
let msg = client.read::<ClientStreamOut>().await?;
assert_eq!(msg, ClientStreamOut::Connected);
Ok(())
}
}

8
server/src/lib.rs Normal file
View File

@ -0,0 +1,8 @@
// mod chat_manager;
mod client;
mod client_manager;
mod messages;
mod network_manager;
mod server;
pub use server::Server;

View File

@ -1,4 +1,4 @@
pub mod chat_manager;
// pub mod chat_manager;
pub mod client;
pub mod client_manager;
pub mod messages;
@ -29,7 +29,7 @@ async fn main() -> io::Result<()> {
)
.get_matches();
let server = Server::new().unwrap();
let server = Server::new().await.unwrap();
server.start().await;
Ok(())

View File

@ -1,58 +1,48 @@
use std::sync::{Arc, Weak};
use std::sync::{Arc};
use uuid::Uuid;
use crate::chat_manager::Message;
use crate::client::Client;
use foundation::connection::Connection;
/// # ClientMessage
///
/// These messages are send from the client to a receiver
/// when events from the client happen that need to be delegated
///
/// ## Variants
///
///
/// ## Methods
///
#[derive(Debug)]
pub enum ClientMessage {
Message { from: Uuid, content: String },
GlobalBroadcastMessage {from: Uuid, content:String},
SendClients { clients: Vec<Arc<Client>> },
#[allow(dead_code)]
Connected,
Disconnect,
#[allow(dead_code)]
IncomingMessage { from: Uuid, to: Uuid, content: String },
#[allow(dead_code)]
IncomingGlobalMessage { from: Uuid, content: String },
#[allow(dead_code)]
RequestedUpdate { from: Uuid },
Disconnect { id: Uuid },
Error,
}
#[derive(Debug)]
pub enum ClientMgrMessage {
Remove(Uuid),
Add(Arc<Client>),
SendClients {
to: Uuid,
},
SendMessage {
from: Uuid,
to: Uuid,
content: String,
},
BroadcastGlobalMessage {sender: Uuid, content: String},
SendError {
to: Uuid,
},
}
impl PartialEq for ClientMessage {
fn eq(&self, other: &Self) -> bool {
use ClientMessage::{Disconnect, Connected, Error};
#[derive(Debug)]
pub enum ServerMessage {
ClientConnected {
client: Arc<Client>,
},
ClientSendMessage {
from: Uuid,
to: Uuid,
content: String,
},
ClientDisconnected {
id: Uuid,
},
ClientUpdate {
to: Uuid,
},
ClientError {
to: Uuid,
},
BroadcastGlobalMessage {sender: Uuid, content: String}
match (self,other) {
(Connected, Connected) => true,
(Error, Error) => true,
(Disconnect {id, .. }, Disconnect {id: other_id, .. }) => id == other_id,
_ => {
false
}
}
}
}

View File

@ -1,104 +1,220 @@
use std::io::Write;
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use uuid::Uuid;
use async_trait::async_trait;
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio::task;
use tokio::{select};
use tokio::sync::Mutex;
use foundation::connection::Connection;
use crate::client::Client;
use crate::messages::ServerMessage;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager {
address: String,
server_channel: Sender<ServerMessage>,
#[derive(Debug)]
pub enum NetworkManagerMessage {
ClientConnecting {
uuid: Uuid,
address: String,
username: String,
connection: Arc<Connection>
},
}
impl NetworkManager {
pub fn new(
_port: String,
server_channel: Sender<ServerMessage>,
) -> Arc<NetworkManager> {
Arc::new(NetworkManager {
address: "0.0.0.0:5600".to_string(),
server_channel,
})
impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::ClientConnecting;
match (self, other) {
(ClientConnecting {uuid,address,username, .. },
ClientConnecting {
uuid: other_uuid,
address: other_address,
username: other_username, ..
}) => uuid == other_uuid && address == other_address && username == other_username,
#[allow(unreachable_patterns)]
_ => false
}
}
}
/// # NetworkManager
///
/// This handles all new incoming connections to the server, involved with the chat services.
///
/// ## Fields
/// - address: the socket address that the server is listening on.
/// - listener: the TcpListener that is receiving connections.
/// - out_channel: the channel that will be sent events from NetworkManager.
pub struct NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
listener: Mutex<TcpListener>,
out_channel: Sender<Out>,
}
impl<Out> NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
pub async fn new(
address: &str,
out_channel: Sender<Out>
) -> Result<Arc<NetworkManager<Out>>, Error> {
let listener = TcpListener::bind(address).await?;
Ok(Arc::new(NetworkManager {
listener: Mutex::new(listener),
out_channel,
}))
}
pub fn start(self: &Arc<NetworkManager>) {
let network_manager = self.clone();
/// This fetches the port from the NetworkManager
pub async fn port(&self) -> u16 {
self.listener.lock().await.local_addr().unwrap().port()
}
tokio::spawn(async move {
let listener = TcpListener::bind(network_manager.address.clone())
.await
.unwrap();
/// This fetches the IP address from the NetworkManager
#[allow(dead_code)]
pub async fn address(&self) -> String {
self.listener.lock().await.local_addr().unwrap().ip().to_string()
}
loop {
let (connection, _) = listener.accept().await.unwrap();
let (rd, mut wd) = io::split(connection);
async fn handle_connection(&self, connection: Arc<Connection>) -> Result<(), Error>{
use NetworkSockIn::{Info, Connect};
use NetworkSockOut::{GotInfo, Request, Connecting};
let mut reader = BufReader::new(rd);
let server_channel = network_manager.server_channel.clone();
connection.write(Request).await?;
task::spawn(async move {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
match connection.read().await? {
Info => connection.write(GotInfo {
server_name: "TestServer".into(),
server_owner: "Michael".into()
}).await?,
Connect { uuid, address, username } => {
connection.write(Connecting).await?;
// write request
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
println!("{:?}", &a);
let _ = writeln!(out_buffer, "{}", a);
let _ = self.out_channel.send(NetworkManagerMessage::ClientConnecting {
uuid,
address,
username,
let _ = wd.write_all(&out_buffer).await;
let _ = wd.flush().await;
// get response
let _ = reader.read_line(&mut in_buffer).await.unwrap();
//match the response
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
let _ = wd
.write_all(
serde_json::to_string(&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
})
.unwrap()
.as_bytes(),
)
.await;
let _ = wd.write_all(b"\n").await;
let _ = wd.flush().await;
}
NetworkSockIn::Connect {
uuid,
username,
address,
} => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
reader,
wd,
server_channel.clone(),
);
let _ = server_channel
.send(ServerMessage::ClientConnected {
client: new_client,
})
.await;
}
}
}
});
connection,
}.into()).await;
}
});
#[allow(unreachable_patterns)]
_ => {
return Err(Error::new(ErrorKind::InvalidData, "Did not receive valid message"));
}
}
Ok(())
}
}
#[async_trait]
impl<Out: 'static> IManager for NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
async fn run(self: &Arc<Self>) {
let lock = self.listener.lock().await;
select! {
val = lock.accept() => {
if let Ok((stream, _addr)) = val {
let conn = self.clone();
tokio::spawn(async move {
let _ = conn.handle_connection(Arc::new(stream.into())).await;
});
}
}
}
}
}
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::network::NetworkSockIn::{Connect, Info};
use foundation::messages::network::NetworkSockOut;
use foundation::messages::network::NetworkSockOut::{Connecting, GotInfo, Request};
use foundation::prelude::IManager;
use crate::network_manager::{NetworkManager, NetworkManagerMessage::{ClientConnecting}, NetworkManagerMessage};
#[tokio::test]
async fn test_network_fetch_info() -> Result<(), Error> {
let (tx,_rx) = channel::<NetworkManagerMessage>(16);
let network_manager =
NetworkManager::new("localhost:0",tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
client.write(Info).await?;
let out = client.read::<NetworkSockOut>().await?;
assert_eq!(
out,
GotInfo {server_owner: "Michael".into(), server_name: "TestServer".into()}
);
Ok(())
}
#[tokio::test]
async fn test_network_login() -> Result<(), Error> {
let (tx, mut rx) = channel::<NetworkManagerMessage>(16);
let network_manager =
NetworkManager::new("localhost:0",tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
// construct client data
let uuid = Uuid::new_v4();
let address = "localhost";
let username = "TestUser";
client.write(Connect {
uuid,
address: address.to_string(),
username: username.to_string()
}).await?;
let res: NetworkSockOut = client.read().await?;
assert_eq!(res, Connecting);
let network_out = rx.recv().await.unwrap();
assert_eq!(network_out, ClientConnecting {
uuid,
address: address.to_string(),
username: username.to_string(),
connection: client
});
Ok(())
}
}

View File

@ -1,14 +1,65 @@
use std::io::Error;
use std::sync::Arc;
// use crossbeam_channel::{unbounded, Receiver};
use futures::lock::Mutex;
use tokio::sync::mpsc::{channel, Receiver};
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::prelude::IManager;
use crate::client_manager::{ClientManager, ClientMgrMessage};
use crate::network_manager::{NetworkManager, NetworkManagerMessage};
#[derive(Debug)]
pub enum ServerMessage {
ClientConnected {
uuid: Uuid,
address: String,
username: String,
connection: Arc<Connection>
},
BroadcastGlobalMessage {from: Uuid, content: String},
}
impl From<NetworkManagerMessage> for ServerMessage {
fn from(msg: NetworkManagerMessage) -> Self {
use NetworkManagerMessage::{ClientConnecting};
match msg {
ClientConnecting {
uuid,
address,
username,
connection
} => ServerMessage::ClientConnected {
uuid,
address,
username,
connection
},
#[allow(unreachable_patterns)]
_ => unimplemented!()
}
}
}
impl From<ClientMgrMessage> for ServerMessage {
fn from(msg: ClientMgrMessage) -> Self {
use ClientMgrMessage::{BroadcastGlobalMessage,};
match msg {
BroadcastGlobalMessage {
from,
content,
} => ServerMessage::BroadcastGlobalMessage {
from,
content
},
_ => unimplemented!()
}
}
}
use crate::client_manager::ClientManager;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
/// # Server
/// authors: @michael-bailey, @Mitch161
@ -16,14 +67,14 @@ use crate::network_manager::NetworkManager;
/// it is componsed of a client manager and a network manager
///
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
client_manager: Arc<ClientManager<ServerMessage>>,
network_manager: Arc<NetworkManager<ServerMessage>>,
receiver: Mutex<Receiver<ServerMessage>>,
}
impl Server {
/// Create a new server object
pub fn new() -> Result<Arc<Server>, Box<dyn std::error::Error>> {
pub async fn new() -> Result<Arc<Server>, Error> {
let (
sender,
receiver
@ -31,11 +82,15 @@ impl Server {
Ok(Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
network_manager: NetworkManager::new("0.0.0.0:5600", sender).await?,
receiver: Mutex::new(receiver),
}))
}
pub async fn port(self: &Arc<Server>) -> u16 {
self.network_manager.port().await
}
pub async fn start(self: &Arc<Server>) {
// start client manager and network manager
self.network_manager.clone().start();
@ -44,54 +99,39 @@ impl Server {
// clone block items
let server = self.clone();
use ClientMgrMessage::{Add, Remove, SendMessage};
loop {
let mut lock = server.receiver.lock().await;
if let Some(message) = lock.recv().await {
println!("[server]: received message {:?}", &message);
match message {
ServerMessage::ClientConnected { client } => {
server
.client_manager
.clone()
.send_message(Add(client))
.await
}
ServerMessage::ClientDisconnected { id } => {
println!("disconnecting client {:?}", id);
server.client_manager.clone().send_message(Remove(id)).await;
}
ServerMessage::ClientSendMessage { from, to, content } => {
server
.client_manager
.clone()
.send_message(SendMessage { from, to, content })
.await
}
ServerMessage::ClientUpdate { to } => {
server
.client_manager
.clone()
.send_message(ClientMgrMessage::SendClients { to })
.await
}
ServerMessage::ClientError { to } => {
server
.client_manager
.clone()
.send_message(ClientMgrMessage::SendError { to })
.await
}
ServerMessage::BroadcastGlobalMessage {sender,content} => {
server
.client_manager
.clone()
.send_message(
ClientMgrMessage::BroadcastGlobalMessage {sender, content}
ServerMessage::ClientConnected {
uuid,
address,
username,
connection
} => {
server.client_manager
.add_client(
uuid,
username,
address,
connection
).await
},
ServerMessage::BroadcastGlobalMessage {
from: _,
content: _,
} => {
// server
// .client_manager
// .clone()
// .send_message(
// ClientMgrMessage::BroadcastGlobalMessage {sender, content}
// ).await
}
#[allow(unreachable_patterns)]
_ => {unimplemented!()}
}
}
}