Grpc-manager #22

Merged
michael-bailey merged 28 commits from grpc-manager into master 2024-05-30 19:42:42 +00:00
4 changed files with 359 additions and 0 deletions
Showing only changes of commit 74ca0a1a80 - Show all commits

View File

@ -0,0 +1,98 @@
use std::{io, net::SocketAddr};
use foundation::{
messages::client::ClientStreamIn,
networking::json::read_message,
};
use tokio::{io::ReadHalf, net::TcpStream, sync::mpsc::UnboundedSender};
use uuid::Uuid;
use crate::{
connection::connection_manager::ConnectionManagerMessage,
network::ClientReader,
};
pub struct JSONClientReader {
reader: ReadHalf<TcpStream>,
addr: SocketAddr,
uuid: Uuid,
}
impl JSONClientReader {
pub fn new(
reader: ReadHalf<TcpStream>,
addr: SocketAddr,
uuid: Uuid,
) -> Self {
Self { reader, addr, uuid }
}
// move to other one
pub async fn get_message(&mut self) -> io::Result<ClientStreamIn> {
read_message::<ReadHalf<TcpStream>, ClientStreamIn>(&mut self.reader).await
}
pub fn handle_message(
&self,
msg: ClientStreamIn,
channel: &UnboundedSender<ConnectionManagerMessage>,
) {
println!("[JSONClientReader:{}] got message", self.addr);
let uuid = self.uuid;
_ = match msg {
ClientStreamIn::GetClients => {
channel.send(ConnectionManagerMessage::SendClientsTo { uuid })
}
ClientStreamIn::GetMessages => {
channel.send(ConnectionManagerMessage::SendGlobalMessages { uuid })
}
ClientStreamIn::SendMessage { to, content } => {
channel.send(ConnectionManagerMessage::SendPrivateMessage {
uuid: Uuid::new_v4(),
from: uuid,
to,
content,
})
}
ClientStreamIn::SendGlobalMessage { content } => {
channel.send(ConnectionManagerMessage::BroadcastGlobalMessage {
from: uuid,
content,
})
}
ClientStreamIn::Disconnect => {
channel.send(ConnectionManagerMessage::Disconnect { uuid })
}
};
}
}
impl ClientReader for JSONClientReader {
fn start_run(
mut self: Box<Self>,
uuid: Uuid,
channel: UnboundedSender<ConnectionManagerMessage>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
let msg = self.get_message().await;
let Ok(msg) = msg else {
let error = msg.unwrap_err();
println!(
"[JSONClientReader:{}] errored with '{}' disconnecting",
self.addr, error
);
_ = channel.send(ConnectionManagerMessage::Disconnected { uuid });
return;
};
self.handle_message(msg, &channel);
}
})
}
}

View File

@ -0,0 +1,117 @@
use std::net::SocketAddr;
use async_trait::async_trait;
use chrono::Local;
use foundation::{
messages::client::ClientStreamOut,
models::message::Message,
networking::json::write_message,
prelude::{GlobalMessage, PrivateMessage},
ClientDetails,
};
use tokio::{io::WriteHalf, net::TcpStream};
use uuid::Uuid;
use crate::network::ClientWriter;
#[allow(dead_code)]
pub struct JSONClientWriter {
writer: WriteHalf<TcpStream>,
addr: SocketAddr,
uuid: Uuid,
}
impl JSONClientWriter {
pub fn new(
writer: WriteHalf<TcpStream>,
addr: SocketAddr,
uuid: Uuid,
) -> Self {
Self { writer, addr, uuid }
}
}
#[async_trait]
impl ClientWriter for JSONClientWriter {
async fn send_clients(
&mut self,
clients: Vec<foundation::prelude::ClientDetails>,
) {
let message = ClientStreamOut::ConnectedClients {
clients: clients
.into_iter()
.map(|c| ClientDetails {
uuid: c.uuid.parse().unwrap(),
username: c.name,
address: c.address,
public_key: None,
})
.collect(),
};
println!("[JSONClientWriter:{}] sending clients", self.addr);
write_message(&mut self.writer, message).await;
}
async fn send_client_joined(
&mut self,
details: foundation::prelude::ClientDetails,
) {
let message = ClientStreamOut::ClientConnected {
id: details.uuid.parse().unwrap(),
username: details.name,
};
println!(
"[ProtobufClientWriter:{}] sending client connected message",
self.addr
);
write_message(&mut self.writer, message).await;
}
async fn send_client_left(&mut self, uuid: Uuid) {
let message = ClientStreamOut::ClientRemoved { id: uuid };
println!(
"[ProtobufClientWriter:{}] sending client connected message",
self.addr
);
write_message(&mut self.writer, message).await;
}
async fn send_global_messages(&mut self, messages: Vec<GlobalMessage>) {
let message = ClientStreamOut::GlobalChatMessages {
messages: messages
.into_iter()
.map(|m| Message {
id: m.uuid.parse().unwrap(),
from: m.from.parse().unwrap(),
content: m.content,
time: Local::now(),
})
.collect(),
};
println!("[JSONClientWriter:{}] sending global messages", self.addr);
write_message(&mut self.writer, message).await;
}
async fn send_private_message(&mut self, message: PrivateMessage) {
let message = ClientStreamOut::UserMessage {
from: message.from.parse().unwrap(),
content: message.content,
};
println!("[JSONClientWriter:{}] sending private message", self.addr);
write_message(&mut self.writer, message).await;
}
async fn send_global_message(&mut self, message: GlobalMessage) {
let message = ClientStreamOut::GlobalMessage {
from: message.from.parse().unwrap(),
content: message.content,
};
write_message(&mut self.writer, message).await;
}
async fn send_disconnect(&mut self) {
let message = ClientStreamOut::Disconnected;
println!("[JSONClientWriter:{}] sending disconnect", self.addr);
write_message(&mut self.writer, message).await;
}
}

View File

@ -0,0 +1,57 @@
use async_trait::async_trait;
use tokio::{
net::TcpListener,
select,
sync::mpsc::UnboundedSender,
task::JoinHandle,
};
use crate::{
network::{ConnectionType, NetworkListener},
server_va::ServerMessages,
};
/// # Listener Manager
/// This stores and awaits for connections from listeners.
/// When a connection is received, it is passed to the server
pub struct JSONListener {
listener: TcpListener,
sender: UnboundedSender<ServerMessages>,
}
#[async_trait]
impl NetworkListener for JSONListener {
/// Binds listeners and stores them in the ListenerManager
async fn new(sender: UnboundedSender<ServerMessages>) -> Self {
let address = "0.0.0.0:5600";
println!("[JSONListener] setting up listeners");
let listener = TcpListener::bind(address)
.await
.expect("[JSONListener] failed to bind to 0.0.0.0:5600");
Self { listener, sender }
}
async fn run(&self) {
loop {
println!("[JSONListener] waiting for connection");
let accept_protobuf = self.listener.accept();
let msg = select! {
Ok((stream, addr)) = accept_protobuf => {
println!("[JSONListener] accepted connection");
ServerMessages::NewConnection(ConnectionType::JsonConnection(stream, addr))
}
};
println!("[JSONListener] passing message to server");
self.sender.send(msg).unwrap();
}
}
fn start_run(sender: UnboundedSender<ServerMessages>) -> JoinHandle<()> {
tokio::spawn(async move {
JSONListener::new(sender).await.run().await;
})
}
}

View File

@ -0,0 +1,87 @@
use std::{io, net::SocketAddr};
use foundation::{
messages::network::{NetworkSockIn, NetworkSockOut},
networking::json::{read_message, write_message},
};
use tokio::{io::split, net::TcpStream};
use uuid::Uuid;
use crate::network::{
json::{
json_client_reader::JSONClientReader,
json_client_writer::JSONClientWriter,
},
ClientReader,
ClientWriter,
NetworkConnection,
ServerRequest,
};
pub struct JSONNetworkConnection {
pub(super) stream: TcpStream,
pub(super) addr: SocketAddr,
}
impl JSONNetworkConnection {
pub fn new(stream: TcpStream, addr: SocketAddr) -> Self {
Self { stream, addr }
}
}
#[async_trait::async_trait]
impl NetworkConnection for JSONNetworkConnection {
async fn get_request(&mut self) -> io::Result<ServerRequest> {
println!("[JSONNetworkConnection] sending request");
write_message(&mut self.stream, NetworkSockOut::Request).await;
println!("[JSONNetworkConnection] waiting for response");
let request =
read_message::<TcpStream, NetworkSockIn>(&mut self.stream).await?;
println!("[JSONNetworkConnection] returning request");
match request {
NetworkSockIn::Info => Ok(ServerRequest::GetInfo),
NetworkSockIn::Connect {
uuid,
username,
address: _,
} => Ok(ServerRequest::Connect {
username,
uuid,
addr: self.addr,
}),
// _ => Ok(ServerRequest::Ignore),
}
}
async fn send_info(mut self: Box<Self>, name: String, owner: String) {
println!("[JSONNetworkConnection] Sending info to client");
write_message(
&mut self.stream,
NetworkSockOut::GotInfo {
server_name: name,
server_owner: owner,
},
)
.await;
println!("[JSONNetworkConnection] droping connection");
}
async fn send_connected(
mut self: Box<Self>,
uuid: Uuid,
) -> (Box<dyn ClientWriter>, Box<dyn ClientReader>) {
write_message(&mut self.stream, NetworkSockOut::Connected).await;
let (read, write) = split(self.stream);
let writer = Box::new(JSONClientWriter::new(write, self.addr, uuid));
let reader = Box::new(JSONClientReader::new(read, self.addr, uuid));
(writer, reader)
}
}