Made other changes to GRPC implementation to clean it up #23

Merged
michael-bailey merged 30 commits from grpc-manager into master 2024-09-09 16:48:10 +00:00
20 changed files with 516 additions and 477 deletions
Showing only changes of commit 010eabf6e3 - Show all commits

View File

@ -5,7 +5,7 @@ use crate::{models::message::Message, ClientDetails};
/// This enum defined the message that the server will receive from a client
/// This uses the serde library to transform to and from json.
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ClientStreamIn {
GetClients,

View File

@ -23,7 +23,7 @@ pub enum NetworkSockOut {
server_name: String,
server_owner: String,
},
Connecting,
Connected,
Error,
}
@ -42,7 +42,7 @@ impl PartialEq for NetworkSockOut {
server_name: name_other,
},
) => server_name == name_other && server_owner == owner_other,
(NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true,
(NetworkSockOut::Connected, NetworkSockOut::Connected) => true,
_ => false,
}
}

View File

@ -4,10 +4,10 @@ use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
id: Uuid,
from: Uuid,
content: String,
time: DateTime<Local>,
pub id: Uuid,
pub from: Uuid,
pub content: String,
pub time: DateTime<Local>,
}
impl Message {

View File

@ -0,0 +1,51 @@
use std::io;
use serde::{de::DeserializeOwned, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub async fn write_message<S, M>(stream: &mut S, message: M)
where
S: AsyncWrite + AsyncWriteExt + Unpin,
M: Serialize,
{
let mut message = serde_json::to_string(&message).unwrap();
message.push('\n');
_ = stream.write(message.as_bytes()).await;
}
// todo: Handle error properly
pub async fn read_message<S, M>(stream: &mut S) -> io::Result<M>
where
S: AsyncRead + AsyncReadExt + Unpin,
M: DeserializeOwned,
{
let string = read_line(stream).await?;
Ok(serde_json::from_str(&string).unwrap())
}
#[allow(clippy::redundant_guards, clippy::needless_range_loop)]
async fn read_line<S>(stream: &mut S) -> Result<String, std::io::Error>
where
S: AsyncRead + AsyncReadExt + Unpin,
{
let mut buf = vec![0; 1024];
let mut newline_found = false;
let mut result = Vec::new();
loop {
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(String::from_utf8(result).unwrap()),
Ok(n) => n,
Err(e) => return Err(e),
};
for i in 0..n {
if buf[i] == b'\n' {
newline_found = true;
break;
}
result.push(buf[i]);
}
if newline_found {
return Ok(String::from_utf8(result).unwrap());
}
}
}

View File

@ -1,69 +1,2 @@
use std::io::{self, ErrorKind};
use prost::{
bytes::{BufMut, Bytes, BytesMut},
Message,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
};
pub async fn write_message<T, S>(stream: &mut S, message: T) -> io::Result<()>
where
T: Message + Default,
S: AsyncWrite + AsyncWriteExt + Unpin,
{
let message = encode_message::<T>(&message)?;
stream.write_all(&message).await?;
Ok(())
}
pub fn encode_message<T>(msg: &T) -> io::Result<Bytes>
where
T: Message,
{
let length = msg.encoded_len();
let mut buffer = BytesMut::with_capacity(4 + length);
buffer.put_u32(length as u32);
let encode_result = msg.encode(&mut buffer);
if let Err(err) = encode_result {
return Err(io::Error::new(
ErrorKind::InvalidInput,
format!("message encoding failed: {:?}", err),
));
}
Ok(buffer.into())
}
pub async fn read_message<T, S>(stream: &mut S) -> io::Result<T>
where
T: Message + Default,
S: AsyncRead + AsyncReadExt + Unpin,
{
let size = stream.read_u32().await?;
let mut buffer = BytesMut::with_capacity(size as usize);
unsafe { buffer.set_len(size as usize) };
stream.read_exact(&mut buffer).await?;
let message = decode_message::<T>(buffer.into())?;
Ok(message)
}
pub fn decode_message<T>(buffer: Bytes) -> io::Result<T>
where
T: Message + Default,
{
let msg_result = T::decode(buffer);
match msg_result {
Ok(msg) => Ok(msg),
Err(err) => Err(io::Error::new(
ErrorKind::InvalidInput,
format!("message decoding failed: {:?}", err),
)),
}
}
pub mod json;
pub mod protobuf;

View File

@ -0,0 +1,66 @@
use std::io::{self, ErrorKind};
use prost::{
bytes::{BufMut, Bytes, BytesMut},
Message,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub async fn write_message<T, S>(stream: &mut S, message: T) -> io::Result<()>
where
T: Message + Default,
S: AsyncWrite + AsyncWriteExt + Unpin,
{
let message = encode_message::<T>(&message)?;
stream.write_all(&message).await?;
Ok(())
}
pub fn encode_message<T>(msg: &T) -> io::Result<Bytes>
where
T: Message,
{
let length = msg.encoded_len();
let mut buffer = BytesMut::with_capacity(4 + length);
buffer.put_u32(length as u32);
let encode_result = msg.encode(&mut buffer);
if let Err(err) = encode_result {
return Err(io::Error::new(
ErrorKind::InvalidInput,
format!("message encoding failed: {:?}", err),
));
}
Ok(buffer.into())
}
pub async fn read_message<T, S>(stream: &mut S) -> io::Result<T>
where
T: Message + Default,
S: AsyncRead + AsyncReadExt + Unpin,
{
let size = stream.read_u32().await?;
let mut buffer = BytesMut::with_capacity(size as usize);
unsafe { buffer.set_len(size as usize) };
stream.read_exact(&mut buffer).await?;
let message = decode_message::<T>(buffer.into())?;
Ok(message)
}
pub fn decode_message<T>(buffer: Bytes) -> io::Result<T>
where
T: Message + Default,
{
let msg_result = T::decode(buffer);
match msg_result {
Ok(msg) => Ok(msg),
Err(err) => Err(io::Error::new(
ErrorKind::InvalidInput,
format!("message decoding failed: {:?}", err),
)),
}
}

View File

@ -35,7 +35,10 @@ message ConnectedServerMessage {
ConnectedClients connected_clients = 1;
GlobalMessages global_messages = 2;
PrivateMessage private_message = 3;
Disconnected Disconnected = 4;
Disconnected disconnected = 4;
GlobalMessage global_message = 5;
ClientConnected client_connected = 6;
ClientDisconnected client_disconnected = 7;
}
}
@ -43,6 +46,14 @@ message ConnectedClients {
repeated ClientDetails clients = 1;
}
message ClientConnected {
ClientDetails details = 1;
}
message ClientDisconnected {
string uuid = 1;
}
message ClientDetails {
string uuid = 1;
string name = 2;

View File

@ -30,13 +30,13 @@ zeroize = "1.1.0"
openssl = "0.10.33"
tokio.workspace = true
futures = "0.3.16"
async-trait = "0.1.52"
async-trait = "0.1.80"
actix = "0.13"
rhai = {version = "1.7.0"}
mlua = { version = "0.9.2", features=["lua54", "async", "serde", "macros", "vendored"] }
libloading = "0.8.1"
toml = "0.8.8"
aquamarine = "0.3.2"
tokio-stream = "0.1.9"
# protobuf

29
server/src/chat/mod.rs Normal file
View File

@ -0,0 +1,29 @@
use foundation::prelude::GlobalMessage;
pub struct ChatManager {
messages: Vec<GlobalMessage>,
}
impl ChatManager {
pub fn new() -> Self {
Self {
messages: Vec::new(),
}
}
pub fn add_message(&mut self, message: GlobalMessage) {
println!("[ChatManager] added new global message {:?}", message);
self.messages.push(message);
}
pub fn get_messages(&mut self) -> Vec<GlobalMessage> {
println!("[ChatManager] got all messages");
self.messages.clone()
}
}
impl Default for ChatManager {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,17 +1,5 @@
use foundation::prelude::{
connected_client_message,
ClientDetails,
ConnectedClientMessage,
Disconnect,
GetClients,
GetGlobalMessages,
SendGlobalMessage,
SendPrivateMessage,
};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
use foundation::prelude::{ClientDetails, GlobalMessage, PrivateMessage};
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
use uuid::Uuid;
use crate::{
@ -19,117 +7,81 @@ use crate::{
client_info::ClientInfo,
connection_manager::ConnectionManagerMessage,
},
network::{
client_reader_connection::ClientReaderConnection,
client_writer_connection::ClientWriterConnection,
network_connection::NetworkConnection,
},
network::{ClientWriter, NetworkConnection},
};
pub struct ClientThread {
read_task: JoinHandle<()>,
write_task: JoinHandle<()>,
sender: UnboundedSender<ClientMessage>,
writer: Box<dyn ClientWriter>,
}
impl ClientThread {
pub async fn new_run(
uuid: Uuid,
conn: NetworkConnection,
conn: Box<dyn NetworkConnection>,
connection_manager_sender: UnboundedSender<ConnectionManagerMessage>,
) -> Self {
let (writer, reader) = conn.send_connected().await;
let (tx, rx) = unbounded_channel();
println!("[ClientThread] creating thread");
let (writer, reader) = conn.send_connected(uuid).await;
Self {
read_task: tokio::spawn(Self::run_read(
uuid,
reader,
connection_manager_sender,
)),
write_task: tokio::spawn(Self::run_write(uuid, writer, rx)),
sender: tx,
println!("[ClientThread] creating tasks");
ClientThread {
read_task: reader.start_run(uuid, connection_manager_sender.clone()),
writer,
}
}
async fn run_read(
uuid: Uuid,
mut reader: ClientReaderConnection,
channel: UnboundedSender<ConnectionManagerMessage>,
) {
use connected_client_message::Message;
loop {
println!("[ClientThread:run_read:{}]", uuid);
let msg = reader.get_message().await;
match msg {
Ok(ConnectedClientMessage {
message: Some(Message::GetClients(GetClients {})),
}) => channel.send(ConnectionManagerMessage::SendClientsTo { uuid }),
Ok(ConnectedClientMessage {
message: Some(Message::GetGlobalMessage(GetGlobalMessages {})),
}) => {
channel.send(ConnectionManagerMessage::SendGlobalMessagesTo { uuid })
}
Ok(ConnectedClientMessage {
message:
Some(Message::SendPrivateMessage(SendPrivateMessage {
uuid: message_uuid,
to,
content,
})),
}) => channel.send(ConnectionManagerMessage::SendPrivateMessage {
uuid: message_uuid,
from: uuid,
to: to.parse().unwrap(),
content,
}),
Ok(ConnectedClientMessage {
message:
Some(Message::SendGlobalMessage(SendGlobalMessage { content })),
}) => channel.send(ConnectionManagerMessage::BroadcastGlobalMessage {
from: uuid,
content,
}),
Ok(ConnectedClientMessage {
message: Some(Message::Disconnect(Disconnect {})),
}) => channel.send(ConnectionManagerMessage::Disconnect { uuid }),
Ok(ConnectedClientMessage { message: None }) => unimplemented!(),
Err(_) => todo!(),
};
break;
}
pub async fn send_clients(&mut self, clients: Vec<ClientDetails>) {
self.writer.send_clients(clients).await
}
async fn run_write(
uuid: Uuid,
mut conn: ClientWriterConnection,
mut receiver: UnboundedReceiver<ClientMessage>,
) {
loop {
let msg = receiver.recv().await;
pub async fn send_client_joined(&mut self, details: ClientDetails) {
self.writer.send_client_joined(details).await;
}
pub async fn send_client_left(&mut self, uuid: Uuid) {
self.writer.send_client_left(uuid).await
}
match msg {
Some(ClientMessage::SendClients(clients)) => {
let clients = clients
.into_iter()
.map(|c| ClientDetails {
uuid: c.get_uuid().to_string(),
name: c.get_username(),
address: c.get_addr().to_string(),
})
.collect();
conn.send_clients(clients).await
}
None => {}
};
}
// todo: link this in with message storage
pub(crate) async fn send_global_message(&mut self, message: GlobalMessage) {
self.writer.send_global_message(message).await;
}
pub(crate) async fn send_global_messages(
&mut self,
messages: Vec<GlobalMessage>,
) {
self.writer.send_global_messages(messages).await;
}
pub(crate) async fn send_disconnected(&mut self) {
self.writer.send_disconnect().await
}
pub(crate) async fn send_private_message(
&mut self,
from: Uuid,
uuid: Uuid,
content: String,
) {
self
.writer
.send_private_message(PrivateMessage {
uuid: uuid.to_string(),
from: from.to_string(),
content,
})
.await;
}
}
impl Drop for ClientThread {
fn drop(&mut self) {
self.read_task.abort();
}
}
pub enum ClientMessage {
SendClients(Vec<ClientInfo>),
SendGlobalMessages(Vec<GlobalMessage>),
}

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, net::SocketAddr};
use foundation::prelude::{ClientDetails, GlobalMessage};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
@ -8,22 +9,25 @@ use uuid::Uuid;
use crate::{
connection::{client_info::ClientInfo, client_thread::ClientThread},
network::network_connection::NetworkConnection,
network::NetworkConnection,
server_va::ServerMessages,
};
pub struct ConnectionManager {
receiver: Mutex<UnboundedReceiver<ConnectionManagerMessage>>,
sender: UnboundedSender<ConnectionManagerMessage>,
server_sender: UnboundedSender<ServerMessages>,
client_map: HashMap<Uuid, ClientInfo>,
client_tasks_map: HashMap<Uuid, ClientThread>,
}
impl ConnectionManager {
pub fn new() -> Self {
pub fn new(server_sender: UnboundedSender<ServerMessages>) -> Self {
let (tx, rx) = unbounded_channel();
Self {
client_map: HashMap::new(),
client_tasks_map: HashMap::new(),
server_sender,
receiver: Mutex::new(rx),
sender: tx,
}
@ -42,7 +46,41 @@ impl ConnectionManager {
username,
addr,
}) => self.add_client(conn, uuid, username, addr).await,
Some(_) => {}
Some(ConnectionManagerMessage::Disconnected { uuid }) => {
self.remove_client(uuid).await
}
Some(ConnectionManagerMessage::BroadcastGlobalMessage {
from,
content,
}) => {
self.broadcast_global_message(from, content).await;
}
Some(ConnectionManagerMessage::SendClientsTo { uuid }) => {
self.send_clients_to(uuid).await;
}
Some(ConnectionManagerMessage::SendGlobalMessages { uuid }) => {
self.send_global_messages(uuid).await;
}
Some(ConnectionManagerMessage::SendGlobalMessagesTo {
uuid,
messages,
}) => {
self.send_global_messages_to(uuid, messages).await;
}
Some(ConnectionManagerMessage::SendPrivateMessage {
uuid,
from,
to,
content,
}) => {
self.send_private_message(to, from, uuid, content).await;
}
Some(ConnectionManagerMessage::Disconnect { uuid }) => {
self.disconnect(uuid).await
}
None => todo!(),
}
}
@ -50,17 +88,118 @@ impl ConnectionManager {
async fn add_client(
&mut self,
conn: NetworkConnection,
conn: Box<dyn NetworkConnection>,
uuid: Uuid,
username: String,
addr: SocketAddr,
) {
let store = ClientInfo::new(uuid, username, addr);
println!("[ConnectionManager] adding new client");
let store = ClientInfo::new(uuid, username.clone(), addr);
self.client_map.insert(uuid, store);
println!("[ConnectionManager] added client info to map");
let thread = ClientThread::new_run(uuid, conn, self.sender.clone()).await;
self.client_tasks_map.insert(uuid, thread);
println!("[ConnectionManager] created running thread for new clinet");
for c in self.client_tasks_map.iter_mut() {
c.1
.send_client_joined(ClientDetails {
uuid: uuid.to_string(),
name: username.clone(),
address: addr.to_string(),
})
.await;
}
}
async fn remove_client(&mut self, uuid: Uuid) {
println!("[ConnectionManager] removing {}", uuid);
self.client_map.remove(&uuid);
self.client_tasks_map.remove(&uuid);
for c in self.client_tasks_map.iter_mut() {
c.1.send_client_left(uuid).await;
}
}
async fn send_clients_to(&mut self, uuid: Uuid) {
let clients = self
.client_map
.values()
.cloned()
.map(|c| foundation::prelude::ClientDetails {
uuid: c.get_uuid().to_string(),
name: c.get_username(),
address: c.get_addr().to_string(),
})
.collect();
let t = self.client_tasks_map.get_mut(&uuid);
let Some(t) = t else {
return;
};
println!("[ConnectionManager] sending client list to {:?}", clients);
t.send_clients(clients).await;
}
async fn broadcast_global_message(&mut self, from: Uuid, content: String) {
let message = GlobalMessage {
uuid: Uuid::new_v4().to_string(),
from: from.to_string(),
content,
};
_ = self
.server_sender
.send(ServerMessages::AddGlobalMessage(message.clone()));
for c in self.client_tasks_map.iter_mut() {
c.1.send_global_message(message.clone()).await;
}
}
async fn send_global_messages(&mut self, uuid: Uuid) {
_ = self
.server_sender
.send(ServerMessages::SendGlobalMessages(uuid));
}
async fn send_global_messages_to(
&mut self,
uuid: Uuid,
messages: Vec<GlobalMessage>,
) {
let t = self.client_tasks_map.get_mut(&uuid);
let Some(t) = t else {
return;
};
t.send_global_messages(messages).await;
}
async fn send_private_message(
&mut self,
to: Uuid,
from: Uuid,
uuid: Uuid,
content: String,
) {
let t = self.client_tasks_map.get_mut(&to);
let Some(t) = t else {
return;
};
t.send_private_message(from, uuid, content).await
}
async fn disconnect(&mut self, uuid: Uuid) {
let t = self.client_tasks_map.get_mut(&uuid);
let Some(t) = t else {
return;
};
t.send_disconnected().await;
}
pub fn get_sender(&self) -> UnboundedSender<ConnectionManagerMessage> {
@ -68,16 +207,10 @@ impl ConnectionManager {
}
}
impl Default for ConnectionManager {
fn default() -> Self {
Self::new()
}
}
pub enum ConnectionManagerMessage {
// server messages
AddClient {
conn: NetworkConnection,
conn: Box<dyn NetworkConnection + 'static>,
uuid: Uuid,
username: String,
addr: SocketAddr,
@ -88,8 +221,13 @@ pub enum ConnectionManagerMessage {
uuid: Uuid,
},
SendGlobalMessages {
uuid: Uuid,
},
SendGlobalMessagesTo {
uuid: Uuid,
messages: Vec<GlobalMessage>,
},
BroadcastGlobalMessage {
@ -98,7 +236,7 @@ pub enum ConnectionManagerMessage {
},
SendPrivateMessage {
uuid: String,
uuid: Uuid,
from: Uuid,
to: Uuid,
content: String,
@ -107,4 +245,8 @@ pub enum ConnectionManagerMessage {
Disconnect {
uuid: Uuid,
},
Disconnected {
uuid: Uuid,
},
}

View File

@ -1,8 +1,9 @@
//! This is the main module of the actix server.
//! It starts the server and sleeps for the remainder of the program
pub(crate) mod network;
pub mod network;
pub mod chat;
pub mod connection;
pub mod os_signal_manager;
pub mod server_va;

View File

@ -1,28 +0,0 @@
use std::{io, net::SocketAddr};
use foundation::{networking::read_message, prelude::ConnectedClientMessage};
use tokio::{io::ReadHalf, net::TcpStream};
pub struct ClientReaderConnection {
reader: ReadHalf<TcpStream>,
_addr: SocketAddr,
}
impl ClientReaderConnection {
pub fn new(reader: ReadHalf<TcpStream>, addr: SocketAddr) -> Self {
Self {
reader: todo!(),
_addr: todo!(),
}
}
// move to other one
pub async fn get_message(&mut self) -> io::Result<ConnectedClientMessage> {
let message = read_message::<ConnectedClientMessage, ReadHalf<TcpStream>>(
&mut self.reader,
)
.await
.unwrap();
Ok(message)
}
}

View File

@ -1,72 +0,0 @@
use std::{io, net::SocketAddr};
use foundation::{
networking::{read_message, write_message},
prelude::{
connected_server_message,
ClientDetails,
ConnectedClientMessage,
ConnectedClients,
ConnectedServerMessage,
Disconnected,
GlobalMessage,
GlobalMessages,
PrivateMessage,
},
};
use tokio::{
io::{split, WriteHalf},
net::TcpStream,
};
use crate::network::{
client_reader_connection::ClientReaderConnection,
network_connection::NetworkConnection,
};
pub struct ClientWriterConnection {
writer: WriteHalf<TcpStream>,
addr: SocketAddr,
}
impl ClientWriterConnection {
pub fn new(writer: WriteHalf<TcpStream>, addr: SocketAddr) -> Self {
Self { writer, addr }
}
pub async fn send_clients(&mut self, clients: Vec<ClientDetails>) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::ConnectedClients(
ConnectedClients { clients },
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_global_messages(&mut self, messages: Vec<GlobalMessage>) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::GlobalMessages(
GlobalMessages { messages },
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_private_message(&mut self, message: PrivateMessage) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::PrivateMessage(message)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_disconnect(&mut self) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::Disconnected(
Disconnected {
reason: "shutting down".into(),
},
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
}

View File

@ -0,0 +1,4 @@
pub mod json_client_reader;
pub mod json_client_writer;
pub mod json_listener;
pub mod json_network_connection;

View File

@ -1,52 +0,0 @@
use std::net::SocketAddr;
use tokio::{
net::{TcpListener, TcpStream},
select,
sync::mpsc::UnboundedSender,
};
use crate::server_va::ServerMessages;
/// # Listener Manager
/// This stores and awaits for connections from listeners.
/// When a connection is received, it is passed to the server
pub struct ListenerManager {
protobuf_listener: TcpListener,
sender: UnboundedSender<ServerMessages>,
}
impl ListenerManager {
/// Binds listeners and stores them in the ListenerManager
pub async fn new(channel: UnboundedSender<ServerMessages>) -> Self {
println!("[ListenerManager] setting up listeners");
let protobuf_listener = TcpListener::bind("0.0.0.0:6500")
.await
.expect("[ListenerManager] failed to bind to 0.0.0.0:6500");
Self {
protobuf_listener,
sender: channel,
}
}
pub async fn run(&self) {
loop {
println!("[ListenerManager] waiting for connection");
let accept_protobuf = self.protobuf_listener.accept();
let msg = select! {
Ok((stream, addr)) = accept_protobuf => {
println!("[ListenerManager] accepted connection");
ServerMessages::NewConnection(ConnectionType::ProtobufConnection(stream, addr))
}
};
println!("[ListenerManager] passing message to server");
self.sender.send(msg).unwrap();
}
}
}
pub enum ConnectionType {
ProtobufConnection(TcpStream, SocketAddr),
}

View File

@ -1,4 +1,66 @@
pub mod client_reader_connection;
pub mod client_writer_connection;
pub mod listener_manager;
pub mod network_connection;
use std::{io, net::SocketAddr};
use async_trait::async_trait;
use foundation::prelude::{ClientDetails, GlobalMessage, PrivateMessage};
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender, task::JoinHandle};
use uuid::Uuid;
use crate::{
connection::connection_manager::ConnectionManagerMessage,
server_va::ServerMessages,
};
pub mod json;
pub mod protobuf;
pub enum ConnectionType {
ProtobufConnection(TcpStream, SocketAddr),
JsonConnection(TcpStream, SocketAddr),
}
#[async_trait]
pub trait NetworkListener {
async fn new(channel: UnboundedSender<ServerMessages>) -> Self;
async fn run(&self);
fn start_run(sender: UnboundedSender<ServerMessages>) -> JoinHandle<()>;
}
#[async_trait::async_trait]
pub trait NetworkConnection: Send {
async fn get_request(&mut self) -> io::Result<ServerRequest>;
async fn send_info(self: Box<Self>, name: String, owner: String);
async fn send_connected(
self: Box<Self>,
uuid: Uuid,
) -> (Box<dyn ClientWriter>, Box<dyn ClientReader>);
}
#[async_trait::async_trait]
pub trait ClientReader: Send {
fn start_run(
self: Box<Self>,
uuid: Uuid,
channel: UnboundedSender<ConnectionManagerMessage>,
) -> JoinHandle<()>;
}
#[async_trait::async_trait]
pub trait ClientWriter: Send {
async fn send_clients(&mut self, clients: Vec<ClientDetails>);
async fn send_global_messages(&mut self, messages: Vec<GlobalMessage>);
async fn send_global_message(&mut self, message: GlobalMessage);
async fn send_private_message(&mut self, message: PrivateMessage);
async fn send_disconnect(&mut self);
async fn send_client_joined(&mut self, details: ClientDetails);
async fn send_client_left(&mut self, uuid: Uuid);
}
pub enum ServerRequest {
GetInfo,
Connect {
username: String,
uuid: uuid::Uuid,
addr: SocketAddr,
},
Ignore,
}

View File

@ -1,113 +0,0 @@
use std::{io, net::SocketAddr};
use foundation::{
networking::{read_message, write_message},
prelude::{
network_client_message,
network_server_message,
Connect,
Connected,
GetInfo,
Info,
NetworkClientMessage,
NetworkServerMessage,
Request,
},
};
use tokio::{io::split, net::TcpStream};
use crate::network::{
client_reader_connection::ClientReaderConnection,
client_writer_connection::ClientWriterConnection,
};
pub struct NetworkConnection {
pub(super) stream: TcpStream,
pub(super) addr: SocketAddr,
}
impl NetworkConnection {
pub fn new(stream: TcpStream, addr: SocketAddr) -> Self {
Self { stream, addr }
}
pub async fn get_request(&mut self) -> io::Result<ServerRequest> {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::Request(Request {})),
};
println!("[NetworkConnection] sending request");
write_message(&mut self.stream, message).await.unwrap();
println!("[NetworkConnection] waiting for response");
let request =
read_message::<NetworkClientMessage, TcpStream>(&mut self.stream)
.await
.unwrap();
println!("[NetworkConnection] returning request");
match request {
NetworkClientMessage {
message: Some(network_client_message::Message::GetInfo(GetInfo {})),
} => Ok(ServerRequest::GetInfo),
NetworkClientMessage {
message:
Some(network_client_message::Message::Connect(Connect {
username,
uuid,
})),
} => Ok(ServerRequest::Connect {
username,
uuid: uuid.parse().unwrap(),
addr: self.addr,
}),
_ => Ok(ServerRequest::Ignore),
}
}
pub async fn send_info(mut self, name: String, owner: String) {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::GotInfo(Info {
server_name: name,
owner,
})),
};
println!("[NetworkConnection] Sending info to client");
write_message(&mut self.stream, message).await.unwrap();
println!("[NetworkConnection] droping connection");
}
pub async fn send_connected(
mut self,
) -> (ClientWriterConnection, ClientReaderConnection) {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::Connected(Connected {})),
};
write_message(&mut self.stream, message).await.unwrap();
self.into()
}
}
pub enum ServerRequest {
GetInfo,
Connect {
username: String,
uuid: uuid::Uuid,
addr: SocketAddr,
},
Ignore,
}
impl From<NetworkConnection>
for (ClientWriterConnection, ClientReaderConnection)
{
fn from(value: NetworkConnection) -> Self {
let (read, write) = split(value.stream);
let writer = ClientWriterConnection::new(write, value.addr.clone());
let reader = ClientReaderConnection::new(read, value.addr.clone());
(writer, reader)
}
}

View File

@ -0,0 +1,4 @@
pub mod protobuf_client_reader;
pub mod protobuf_client_writer;
pub mod protobuf_listener;
pub mod protobuf_network_connection;

View File

@ -1,3 +1,4 @@
use foundation::prelude::GlobalMessage;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
@ -5,15 +6,27 @@ use tokio::{
},
task::JoinHandle,
};
use uuid::Uuid;
use crate::{
chat::ChatManager,
connection::connection_manager::{
ConnectionManager,
ConnectionManagerMessage,
},
network::{
listener_manager::{ConnectionType, ListenerManager},
network_connection::{NetworkConnection, ServerRequest},
json::{
json_listener::JSONListener,
json_network_connection::JSONNetworkConnection,
},
protobuf::{
protobuf_listener::ProtobufListener,
protobuf_network_connection::ProtobufNetworkConnection,
},
ConnectionType,
NetworkConnection,
NetworkListener,
ServerRequest,
},
os_signal_manager::OSSignalManager,
};
@ -23,16 +36,22 @@ use crate::{
/// Main functions being the handling of new connections, and setting them up.
pub struct Server {
connection_manager_sender: UnboundedSender<ConnectionManagerMessage>,
chat_manager: ChatManager,
connection_manager_task: JoinHandle<()>,
listener_task: JoinHandle<()>,
json_listener_task: JoinHandle<()>,
os_event_manager_task: JoinHandle<()>,
receiver: Mutex<UnboundedReceiver<ServerMessages>>,
}
impl Server {
/// Loops the future, reading messages from the servers channel.
/// if exit is received, deconstructs all sub-tasks and exits the loop.
pub async fn run(&self) {
pub async fn run(&mut self) {
loop {
let mut lock = self.receiver.lock().await;
let msg = lock.recv().await;
@ -47,15 +66,35 @@ impl Server {
Some(ServerMessages::NewConnection(
ConnectionType::ProtobufConnection(stream, addr),
)) => {
let conn = NetworkConnection::new(stream, addr);
let conn = Box::new(ProtobufNetworkConnection::new(stream, addr));
println!("[Server] New protobuf connection");
self.handle_protobuf_connection(conn).await;
}
Some(ServerMessages::NewConnection(
ConnectionType::JsonConnection(stream, addr),
)) => {
let conn = Box::new(JSONNetworkConnection::new(stream, addr));
println!("[Server] New protobuf connection");
self.handle_protobuf_connection(conn).await;
}
Some(ServerMessages::SendGlobalMessages(uuid)) => {
let messages = self.chat_manager.get_messages();
println!("[Server] Sending Global Messages");
_ = self.connection_manager_sender.send(
ConnectionManagerMessage::SendGlobalMessagesTo { uuid, messages },
);
}
Some(ServerMessages::AddGlobalMessage(message)) => {
self.chat_manager.add_message(message);
}
};
}
}
async fn handle_protobuf_connection(&self, mut conn: NetworkConnection) {
async fn handle_protobuf_connection(
&self,
mut conn: Box<dyn NetworkConnection>,
) {
println!("[Server] Getting request");
let req = conn.get_request().await.unwrap();
@ -71,7 +110,7 @@ impl Server {
addr,
} => {
println!("[Server] sending connectionn and info to conneciton manager");
self.connection_manager_sender.send(
_ = self.connection_manager_sender.send(
ConnectionManagerMessage::AddClient {
conn,
uuid,
@ -87,6 +126,7 @@ impl Server {
fn shutdown(&self) {
self.os_event_manager_task.abort();
self.connection_manager_task.abort();
self.json_listener_task.abort();
self.listener_task.abort();
}
}
@ -96,25 +136,32 @@ impl Default for Server {
let (tx, rx) = unbounded_channel();
let tx1 = tx.clone();
let tx2 = tx.clone();
let tx3 = tx.clone();
let tx4 = tx.clone();
let os_event_manager_task = tokio::spawn(async move {
OSSignalManager::new(tx1).run().await;
});
let listener_task = tokio::spawn(async move {
ListenerManager::new(tx2).await.run().await;
});
let listener_task = ProtobufListener::start_run(tx2);
let json_listener_task = JSONListener::start_run(tx3);
let mut connection_manager = ConnectionManager::new();
let mut connection_manager = ConnectionManager::new(tx4);
let connection_manager_sender = connection_manager.get_sender();
let connection_manager_task = tokio::spawn(async move {
connection_manager.run().await;
});
let chat_manager = ChatManager::new();
Self {
chat_manager,
os_event_manager_task,
connection_manager_task,
connection_manager_sender,
json_listener_task,
receiver: Mutex::new(rx),
listener_task,
}
@ -125,5 +172,7 @@ impl Default for Server {
/// enum describing all messages that the server can handle
pub enum ServerMessages {
Exit,
AddGlobalMessage(GlobalMessage),
SendGlobalMessages(Uuid),
NewConnection(ConnectionType),
}