Update network_manager.rs

Converted over to new messaging system.
This commit is contained in:
michael-bailey 2022-02-25 20:13:07 +00:00
parent bed642a31d
commit d904e83f14
1 changed files with 212 additions and 92 deletions

View File

@ -1,109 +1,229 @@
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::io::{Error, ErrorKind};
use std::sync::{Arc,Weak};
use uuid::Uuid;
use async_trait::async_trait;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio::task;
use tokio::sync::mpsc::{channel, Sender};
use tokio::{select};
use tokio::sync::Mutex;
use foundation::connection::Connection;
use crate::client::Client;
use crate::messages::ServerMessage;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager {
address: SocketAddr,
server_channel: Sender<ServerMessage>,
#[derive(Debug)]
pub enum NetworkManagerMessage {
ClientConnecting {
uuid: Uuid,
address: String,
username: String,
connection: Connection
},
}
impl NetworkManager {
pub fn new(
port: u16,
server_channel: Sender<ServerMessage>,
) -> Arc<NetworkManager> {
Arc::new(NetworkManager {
address: format!("0.0.0.0:{}", port).parse().unwrap(),
server_channel,
})
impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::ClientConnecting;
match (self, other) {
(ClientConnecting {uuid,address,username, .. },
ClientConnecting {
uuid: other_uuid,
address: other_address,
username: other_username, ..
}) => uuid == other_uuid && address == other_address && username == other_username,
_ => false
}
}
pub fn port(&self) -> u16 {
self.address.port()
}
/// # NetworkManager
///
/// This handles all new incoming connections to the server, involved with the chat services.
///
/// ## Fields
/// - address: the socket address that the server is listening on.
/// - listener: the TcpListener that is receiving connections.
/// - out_channel: the channel that will be sent events from NetworkManager.
pub struct NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
listener: Mutex<TcpListener>,
out_channel: Sender<Out>,
}
impl<Out> NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
pub async fn new(
address: &str,
out_channel: Sender<Out>
) -> Result<Arc<NetworkManager<Out>>, Error> {
let listener = TcpListener::bind(address).await?;
Ok(Arc::new(NetworkManager {
listener: Mutex::new(listener),
out_channel,
}))
}
pub fn start(self: &Arc<NetworkManager>) {
let network_manager = self.clone();
/// This fetches the port from the NetworkManager
pub async fn port(&self) -> u16 {
self.listener.lock().await.local_addr().unwrap().port()
}
/// This fetches the IP address from the NetworkManager
pub async fn address(&self) -> String {
self.listener.lock().await.local_addr().unwrap().ip().to_string()
}
async fn handle_connection(&self, connection: Connection) -> Result<(), Error>{
use NetworkSockIn::{Info, Connect};
use NetworkSockOut::{GotInfo, Request, Connecting};
connection.write(Request).await?;
match connection.read().await? {
Info => connection.write(GotInfo {
server_name: "TestServer".into(),
server_owner: "Michael".into()
}).await?,
Connect { uuid, address, username } => {
connection.write(Connecting).await?;
let _ = self.out_channel.send(NetworkManagerMessage::ClientConnecting {
uuid,
address,
username,
connection,
}.into()).await;
}
_ => {
return Err(Error::new(ErrorKind::InvalidData, "Did not receive valid message"));
}
}
Ok(())
}
}
#[async_trait]
impl<Out: 'static> IManager for NetworkManager<Out>
where
Out: From<NetworkManagerMessage> + Send
{
async fn run(self: &Arc<Self>) {
let lock = self.listener.lock().await;
select! {
val = lock.accept() => {
if let Ok((stream, addr)) = val {
let _ = self.handle_connection(stream.into()).await;
}
}
}
}
fn start(self: &Arc<Self>) {
let weak_self = Arc::downgrade(self);
let network = Mutex::new(weak_self.clone());
// this looks horrid but works
tokio::spawn(async move {
let listener = TcpListener::bind(network_manager.address.clone())
.await
.unwrap();
loop {
let (connection, _) = listener.accept().await.unwrap();
let (rd, mut wd) = io::split(connection);
let mut reader = BufReader::new(rd);
let server_channel = network_manager.server_channel.clone();
task::spawn(async move {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
// write request
let a = serde_json::to_string(&NetworkSockOut::Request).unwrap();
println!("{:?}", &a);
let _ = writeln!(out_buffer, "{}", a);
let _ = wd.write_all(&out_buffer).await;
let _ = wd.flush().await;
// get response
let _ = reader.read_line(&mut in_buffer).await.unwrap();
//match the response
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
let _ = wd
.write_all(
serde_json::to_string(&NetworkSockOut::GotInfo {
server_name: "oof".to_string(),
server_owner: "michael".to_string(),
})
.unwrap()
.as_bytes(),
)
.await;
let _ = wd.write_all(b"\n").await;
let _ = wd.flush().await;
}
NetworkSockIn::Connect {
uuid,
username,
address,
} => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
reader,
wd,
server_channel.clone(),
);
let _ = server_channel
.send(ServerMessage::ClientConnected {
client: new_client,
})
.await;
}
}
}
});
if let Some(network_manager) =
Weak::upgrade(&*network.lock().await)
{
network_manager.run().await
} else { () }
}
});
}
}
#[cfg(test)]
mod test {
use std::io::Error;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use foundation::connection::Connection;
use foundation::messages::network::NetworkSockIn::{Connect, Info};
use foundation::messages::network::NetworkSockOut;
use foundation::messages::network::NetworkSockOut::{Connecting, GotInfo, Request};
use foundation::prelude::IManager;
use crate::network_manager::{NetworkManager, NetworkManagerMessage::{ClientConnecting}, NetworkManagerMessage};
#[tokio::test]
async fn test_network_fetch_info() -> Result<(), Error> {
let (tx,_rx) = channel::<NetworkManagerMessage>(16);
let network_manager =
NetworkManager::new("localhost:0",tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
client.write(Info).await?;
let out = client.read::<NetworkSockOut>().await?;
assert_eq!(
out,
GotInfo {server_owner: "Michael".into(), server_name: "TestServer".into()}
);
Ok(())
}
#[tokio::test]
async fn test_network_login() -> Result<(), Error> {
let (tx, mut rx) = channel::<NetworkManagerMessage>(16);
let network_manager =
NetworkManager::new("localhost:0",tx).await?;
network_manager.start();
let port = network_manager.port().await;
let client = Connection::new();
client.connect(format!("localhost:{}", port)).await?;
assert_eq!(client.read::<NetworkSockOut>().await?, Request);
// construct client data
let uuid = Uuid::new_v4();
let address = "localhost";
let username = "TestUser";
client.write(Connect {
uuid,
address: address.to_string(),
username: username.to_string()
}).await?;
let res: NetworkSockOut = client.read().await?;
assert_eq!(res, Connecting);
let network_out = rx.recv().await.unwrap();
assert_eq!(network_out, ClientConnecting {
uuid,
address: address.to_string(),
username: username.to_string(),
connection: client
});
Ok(())
}
}