Made other changes to GRPC implementation to clean it up #23

Merged
michael-bailey merged 30 commits from grpc-manager into master 2024-09-09 16:48:10 +00:00
41 changed files with 1086 additions and 214 deletions
Showing only changes of commit 82f5ac30e9 - Show all commits

View File

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uuid.workspace = true
tokio.workspace = true
cursive = "0.20.0"
rand.workspace = true

View File

@ -0,0 +1,3 @@
pub fn exit(s: &mut Cursive) {
s.quit();
}

View File

@ -1,38 +1,31 @@
mod info_dialogue;
mod select_operation;
pub mod network;
pub mod screens;
pub mod segues;
mod settings;
pub mod state;
use cursive::{
event::Event,
menu::Tree,
views::{Menubar, Panel, TextView},
Cursive,
};
use cursive::{event::Event, menu::Tree, views::Menubar, Cursive};
use crate::{
select_operation::methods_view,
screens::main_screen::select_operation::methods_view,
settings::settings_panel,
state::State,
};
enum MethodSelection {
GetInfo,
Connect,
}
fn menu_bar(menu_bar: &mut Menubar) {
menu_bar
.add_subtree(
"Chat Kit",
Tree::new()
.leaf("Settings", open_settings)
.delimiter()
.leaf("Quit", exit),
)
.add_subtree(
"File",
Tree::new().leaf("Main View", |s| s.add_layer(methods_view())),
);
menu_bar.add_subtree(
"Chat Kit",
Tree::new()
.leaf("Settings", open_settings)
.delimiter()
.leaf("Quit", exit),
);
}
fn main() {
@ -48,7 +41,7 @@ fn main() {
s.select_menubar()
});
scr.add_layer(methods_view());
scr.add_layer(methods_view("127.0.0.1:6500".into()));
scr.run()
}

10
client/src/network/mod.rs Normal file
View File

@ -0,0 +1,10 @@
use crate::network::network_connection::NetworkConnection;
pub mod network_connection;
pub mod server_reader_connection;
pub mod server_writer_connection;
pub enum NetworkState {
Disconnected,
ConnectedNetwork(NetworkConnection),
}

View File

@ -0,0 +1,130 @@
use std::{io, net::SocketAddr};
use foundation::{
networking::{read_message, write_message},
prelude::{
network_server_message,
Connect,
GetInfo,
Info,
NetworkClientMessage,
NetworkServerMessage,
Request,
},
};
use tokio::{io::split, net::TcpStream};
use uuid::Uuid;
use crate::network::{
server_reader_connection::ServerReaderConnection,
server_writer_connection::ServerWriterConnection,
};
/// # NetworkConnection
/// encapsulates the state of the network connection
/// will connect to a server and ensure it is usinghte protobuf protocol
///
/// you can then either get info or connect to the server
pub struct NetworkConnection {
pub(super) stream: TcpStream,
}
impl NetworkConnection {
pub async fn connect(address: SocketAddr) -> io::Result<Self> {
let mut stream = TcpStream::connect(address).await.unwrap();
let msg =
read_message::<NetworkServerMessage, TcpStream>(&mut stream).await?;
let NetworkServerMessage {
message: Some(network_server_message::Message::Request(Request {})),
} = msg
else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Received invalid start message from server",
));
};
Ok(Self { stream })
}
/// Will consume the connection, and fetch the servers info.
pub async fn send_get_info(mut self) -> io::Result<Info> {
_ = write_message(
&mut self.stream,
NetworkClientMessage {
message: Some(
foundation::prelude::network_client_message::Message::GetInfo(
GetInfo {},
),
),
},
)
.await;
let message =
read_message::<NetworkServerMessage, TcpStream>(&mut self.stream).await?;
let NetworkServerMessage {
message: Some(network_server_message::Message::GotInfo(msg)),
} = message
else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"sent for info got different message back",
));
};
Ok(msg)
}
/// consumes this struct and returns a tuple of the sernding and receiving ahlfs of teh connected conneciton
pub async fn send_connect(
mut self,
uuid: Uuid,
username: String,
) -> io::Result<(ServerWriterConnection, ServerReaderConnection)> {
_ = write_message(
&mut self.stream,
NetworkClientMessage {
message: Some(
foundation::prelude::network_client_message::Message::Connect(
Connect {
username,
uuid: uuid.to_string(),
},
),
),
},
)
.await;
let message =
read_message::<NetworkServerMessage, TcpStream>(&mut self.stream).await?;
let NetworkServerMessage {
message: Some(network_server_message::Message::Connected(_)),
} = message
else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"sent connect got different message back or failed to connect",
));
};
Ok(self.into())
}
}
impl From<NetworkConnection>
for (ServerWriterConnection, ServerReaderConnection)
{
fn from(value: NetworkConnection) -> Self {
let (read_half, write_half) = split(value.stream);
(
ServerWriterConnection::new(write_half),
ServerReaderConnection::new(read_half),
)
}
}

View File

@ -0,0 +1,24 @@
use std::io;
use foundation::{networking::read_message, prelude::ConnectedServerMessage};
use tokio::{io::ReadHalf, net::TcpStream};
pub struct ServerReaderConnection {
reader: ReadHalf<TcpStream>,
}
impl ServerReaderConnection {
pub(crate) fn new(read_half: ReadHalf<TcpStream>) -> Self {
Self { reader: read_half }
}
// move to other one
pub async fn get_message(&mut self) -> io::Result<ConnectedServerMessage> {
let message = read_message::<ConnectedServerMessage, ReadHalf<TcpStream>>(
&mut self.reader,
)
.await
.unwrap();
Ok(message)
}
}

View File

@ -0,0 +1,13 @@
use tokio::{io::WriteHalf, net::TcpStream};
pub struct ServerWriterConnection {
writer: WriteHalf<TcpStream>,
}
impl ServerWriterConnection {
pub(crate) fn new(writer: WriteHalf<TcpStream>) -> Self {
todo!()
}
pub async fn request_clients(&mut self) {}
}

View File

@ -0,0 +1,2 @@
pub mod segues;
pub mod user_details_form;

View File

@ -0,0 +1,11 @@
use crate::{
screens::connect_setup::user_details_form::user_details_form,
segues::CursiveCB,
};
pub fn segue_to_user_details_form() -> CursiveCB {
Box::new(|s| {
s.pop_layer();
s.add_layer(user_details_form())
})
}

View File

@ -0,0 +1,43 @@
use cursive::{
view::Resizable,
views::{Dialog, EditView, LinearLayout, TextView},
Cursive,
View,
};
use crate::screens::main_screen::segues::segue_to_select_operation;
pub fn user_details_form() -> impl View {
Dialog::new()
.title("User Setup")
.content(
LinearLayout::vertical()
.child(
LinearLayout::horizontal()
.child(TextView::new("Username").min_width(9))
.child(EditView::new().full_width()),
)
.child(
LinearLayout::horizontal()
.child(TextView::new("UUID").min_width(9))
.child(EditView::new().full_width()),
),
)
.button("Cancel", on_cancel)
.button("Connect", on_connect)
.fixed_size((40, 10))
}
fn on_connect(s: &mut Cursive) {
println!("Attempting conneciton");
s.add_layer(
Dialog::new()
.title("LOL XD")
.content(TextView::new("Yeah this isnt iomplemented yet"))
.dismiss_button("Dismiss"),
)
}
fn on_cancel(s: &mut Cursive) {
_ = s.cb_sink().send(segue_to_select_operation());
}

View File

@ -0,0 +1,19 @@
use cursive::{
views::{Dialog, LinearLayout, PaddedView, TextView},
View,
};
pub fn info_error_dialogue(message: &str) -> impl View {
Dialog::new()
.title("Info Fetch Error")
.content(PaddedView::lrtb(
2,
2,
2,
2,
LinearLayout::vertical()
.child(TextView::new("Error fetching the Info from the server"))
.child(TextView::new(message)),
))
.dismiss_button("Big Oof")
}

View File

@ -0,0 +1,8 @@
use cursive::{
views::{Panel, TextView},
View,
};
pub fn info_loading_panel() -> impl View {
Panel::new(TextView::new("Loading"))
}

View File

@ -0,0 +1,59 @@
use cursive::Cursive;
use crate::{
network::network_connection::NetworkConnection,
screens::{
info_dialogue::segues::{
segue_to_info_dialgue,
segue_to_info_loading_panel,
segue_to_load_error_dialogue,
},
main_screen::segues::segue_open_invalid_address_dialogue,
},
state::StateObject,
};
pub mod info_dialogue;
pub mod info_error_dialogue;
pub mod info_loading_panel;
pub mod segues;
pub fn get_info(s: &mut Cursive) {
let sink = s.cb_sink().clone();
let state = s.state();
let address = state.get_host().parse();
let Ok(address) = address else {
_ = sink.send(segue_open_invalid_address_dialogue(state.get_host()));
return;
};
state.spawn(async move {
_ = sink.send(segue_to_info_loading_panel());
let conn = NetworkConnection::connect(address).await;
let Ok(conn) = conn else {
_ = sink.send(segue_to_load_error_dialogue(
"
failed to connect to the server
"
.into(),
));
return;
};
let res = conn.send_get_info().await;
let Ok(info) = res else {
_ = sink.send(segue_to_load_error_dialogue(
"
Failed to retrieve info
"
.into(),
));
return;
};
_ = sink.send(segue_to_info_dialgue(info.server_name, info.owner));
})
}

View File

@ -0,0 +1,33 @@
use cursive::Cursive;
use crate::{
screens::info_dialogue::{
info_dialogue::info_dialogue,
info_error_dialogue::info_error_dialogue,
info_loading_panel::info_loading_panel,
},
segues::CursiveCB,
};
pub fn segue_to_info_loading_panel() -> CursiveCB {
Box::new(|s: &mut Cursive| {
s.add_layer(info_loading_panel());
})
}
pub fn segue_to_load_error_dialogue(reason: String) -> CursiveCB {
Box::new(move |s| {
s.pop_layer();
s.add_layer(info_error_dialogue(&reason));
})
}
pub fn segue_to_info_dialgue(
name: String,
owner: String,
) -> Box<dyn FnOnce(&mut Cursive) + Send> {
Box::new(|s| {
s.pop_layer();
s.add_layer(info_dialogue(name, owner));
})
}

View File

@ -0,0 +1,16 @@
use cursive::{
views::{Dialog, TextView},
View,
};
pub fn invlaid_address_dialogue(address: String) -> impl View {
Dialog::new()
.title("error")
.content(TextView::new(format!(
"'{}' is an invalid address",
address
)))
.button("Dismiss", |s| {
s.pop_layer();
})
}

View File

@ -0,0 +1,4 @@
pub mod invalid_address_dialogue;
pub mod segues;
pub mod select_operation;
pub mod stream_failed;

View File

@ -0,0 +1,18 @@
use crate::{
screens::main_screen::{
invalid_address_dialogue::invlaid_address_dialogue,
select_operation::methods_view,
},
segues::CursiveCB,
};
pub fn segue_to_select_operation() -> CursiveCB {
Box::new(|s| {
s.pop_layer();
s.add_layer(methods_view("127.0.0.1:6500".into()))
})
}
pub fn segue_open_invalid_address_dialogue(address: String) -> CursiveCB {
Box::new(|s| s.add_layer(invlaid_address_dialogue(address)))
}

View File

@ -0,0 +1,69 @@
use cursive::{
view::{Nameable, Resizable},
views::{
Button,
EditView,
LinearLayout,
PaddedView,
Panel,
SelectView,
TextView,
},
Cursive,
View,
};
use crate::{
exit,
screens::{
connect_setup::segues::segue_to_user_details_form,
info_dialogue::get_info,
},
state::StateObject,
MethodSelection,
};
pub fn methods_view(host: String) -> impl View {
let horizontal = LinearLayout::horizontal();
Panel::new(PaddedView::lrtb(
2,
2,
2,
2,
LinearLayout::vertical()
.child(
EditView::new()
.content(host)
.on_edit(Cursive::set_host)
.with_name("host_input"),
)
.child(TextView::new("Select option"))
.child(
SelectView::new()
.item("Get Info", MethodSelection::GetInfo)
.item("Connect...", MethodSelection::Connect)
.on_submit(execute)
.with_name("method_selector"),
)
.child(horizontal.child(Button::new("Cancel", exit))),
))
.fixed_size((40, 10))
}
fn execute(s: &mut Cursive, item: &MethodSelection) {
println!("executing");
match item {
MethodSelection::GetInfo => run_get_info(s),
MethodSelection::Connect => {
_ = s.cb_sink().send(segue_to_user_details_form());
}
}
}
// mark: - this should be removed
fn run_get_info(s: &mut Cursive) {
let sink = s.cb_sink().clone();
_ = sink.send(Box::new(get_info));
}

View File

@ -0,0 +1,8 @@
use cursive::{
views::{Dialog, TextView},
View,
};
pub fn stream_failed_dialogue() -> impl View {
Dialog::new().content(TextView::new("stream failed to open?"))
}

View File

@ -0,0 +1,3 @@
pub mod connect_setup;
pub mod info_dialogue;
pub mod main_screen;

9
client/src/segues.rs Normal file
View File

@ -0,0 +1,9 @@
use cursive::Cursive;
pub type CursiveCB = Box<dyn FnOnce(&mut Cursive) + Send>;
pub fn segue_pop_layer() -> CursiveCB {
Box::new(|s| {
s.pop_layer();
})
}

View File

@ -1,120 +0,0 @@
use cursive::{
view::Nameable,
views::{Button, LinearLayout, PaddedView, Panel, SelectView, TextView},
CbSink,
Cursive,
View,
};
use foundation::{
networking::{read_message, write_message},
prelude::{
network_client_message,
network_server_message,
GetInfo,
Info,
NetworkClientMessage,
NetworkServerMessage,
},
};
use tokio::{net::TcpStream, process::Command};
use crate::{info_dialogue::info_dialogue, state::State, MethodSelection};
pub fn methods_view() -> impl View {
let horizontal = LinearLayout::horizontal();
Panel::new(PaddedView::lrtb(
2,
2,
2,
2,
LinearLayout::vertical()
.child(TextView::new("Select option"))
.child(
SelectView::new()
.item("Get Info", MethodSelection::GetInfo)
.on_submit(execute)
.with_name("method_selector"),
)
.child(horizontal.child(Button::new("Cancel", exit))),
))
.title("Select method")
}
fn exit(s: &mut Cursive) {
s.quit();
}
fn execute(s: &mut Cursive, item: &MethodSelection) {
let _sink = s.cb_sink().clone();
match item {
MethodSelection::GetInfo => run_get_info(s),
}
let rt = &s.user_data::<State>().unwrap().get_rt();
rt.spawn(async {});
}
fn run_get_info(s: &mut Cursive) {
let host = s.user_data::<State>().unwrap().get_host();
let sink = s.cb_sink().clone();
let rt = &s.user_data::<State>().unwrap().get_rt();
// _ = sink.send(Box::new(|s| s.add_layer(Dialog::new())));
rt.spawn(async move {
let stream_res = TcpStream::connect(host).await;
match stream_res {
Ok(stream) => {
get_request(stream, sink).await;
}
Err(_e) => {}
}
});
}
async fn get_request(mut stream: TcpStream, sink: CbSink) {
let message = read_message::<NetworkServerMessage>(&mut stream).await;
if let Ok(NetworkServerMessage {
message:
Some(network_server_message::Message::Request(
foundation::prelude::Request { a: true },
)),
}) = message
{
perform_get_info(stream, sink.clone()).await;
}
}
async fn perform_get_info(mut stream: TcpStream, sink: CbSink) {
let message = NetworkClientMessage {
message: Some(network_client_message::Message::GetInfo(GetInfo {})),
};
write_message(&mut stream, message).await.unwrap();
let message = read_message::<NetworkServerMessage>(&mut stream)
.await
.unwrap();
if let NetworkServerMessage {
message:
Some(network_server_message::Message::GotInfo(Info { owner, server_name })),
} = message
{
sink
.send(segue_to_info_dialgue(server_name, owner))
.unwrap();
}
}
fn segue_to_info_dialgue(
name: String,
owner: String,
) -> Box<dyn FnOnce(&mut Cursive) + Send> {
Box::new(|s| {
s.add_layer(info_dialogue(name, owner));
})
}

View File

@ -1,7 +1,13 @@
use std::future::Future;
use cursive::Cursive;
use tokio::runtime::Runtime;
use crate::network::NetworkState;
pub struct State {
runtime: Runtime,
connection_state: NetworkState,
host: String,
}
@ -9,7 +15,8 @@ impl State {
pub fn new() -> Self {
Self {
runtime: Runtime::new().unwrap(),
host: "localhost:6500".into(),
connection_state: NetworkState::Disconnected,
host: "127.0.0.1:6500".into(),
}
}
@ -24,4 +31,33 @@ impl State {
pub fn get_rt(&mut self) -> &mut Runtime {
&mut self.runtime
}
pub fn spawn<F>(&mut self, future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime.spawn(future);
}
}
impl Default for State {
fn default() -> Self {
Self::new()
}
}
pub trait StateObject {
fn state(&mut self) -> &mut State;
fn set_host(&mut self, host: &str, _: usize);
}
impl StateObject for Cursive {
fn set_host(&mut self, host: &str, _: usize) {
self.user_data::<State>().unwrap().set_host(host);
}
fn state(&mut self) -> &mut State {
self.user_data::<State>().unwrap()
}
}

View File

@ -5,16 +5,14 @@ use prost::{
Message,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
};
pub async fn write_message<T>(
stream: &mut TcpStream,
message: T,
) -> io::Result<()>
pub async fn write_message<T, S>(stream: &mut S, message: T) -> io::Result<()>
where
T: Message + Default,
S: AsyncWrite + AsyncWriteExt + Unpin,
{
let message = encode_message::<T>(&message)?;
stream.write_all(&message).await?;
@ -39,9 +37,10 @@ where
Ok(buffer.into())
}
pub async fn read_message<T>(stream: &mut TcpStream) -> io::Result<T>
pub async fn read_message<T, S>(stream: &mut S) -> io::Result<T>
where
T: Message + Default,
S: AsyncRead + AsyncReadExt + Unpin,
{
let size = stream.read_u32().await?;

View File

@ -2,6 +2,9 @@ use std::io::Result;
// Use this in build.rs
fn main() -> Result<()> {
prost_build::compile_protos(&["src/proto/network.proto"], &["src/proto"])?;
prost_build::compile_protos(
&["src/proto/network.proto", "src/proto/connected.proto"],
&["src/proto"],
)?;
Ok(())
}

View File

@ -0,0 +1,70 @@
syntax = "proto3";
package chatkit.messages;
// messages from the client when connected.
message ConnectedClientMessage {
oneof message {
GetClients get_clients = 1;
GetGlobalMessages get_global_message = 2;
SendGlobalMessage send_global_message = 3;
SendPrivateMessage send_private_message = 4;
Disconnect disconnect = 5;
}
}
message GetClients {}
message GetGlobalMessages {}
message SendGlobalMessage {
string content = 1;
}
message SendPrivateMessage {
string uuid = 1;
string to = 2;
string content = 3;
}
message Disconnect {}
// messages from the Server when connected.
message ConnectedServerMessage {
oneof message {
ConnectedClients connected_clients = 1;
GlobalMessages global_messages = 2;
PrivateMessage private_message = 3;
Disconnected Disconnected = 4;
}
}
message ConnectedClients {
repeated ClientDetails clients = 1;
}
message ClientDetails {
string uuid = 1;
string name = 2;
string address = 3;
}
message GlobalMessages {
repeated GlobalMessage messages = 1;
}
message GlobalMessage {
string uuid = 1;
string from = 2;
string content = 3;
}
message PrivateMessage {
string uuid = 1;
string from = 2;
string content = 3;
}
message Disconnected {
string reason = 1;
}

View File

@ -1,7 +0,0 @@
syntax = "proto3";
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;
}

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package chatkit.messages;
// Network messages from the client.
// Network messages sent from the client.
message NetworkClientMessage {
oneof message {
GetInfo get_info = 1;
@ -17,17 +17,17 @@ message Connect {
string uuid = 2;
}
// Network messages from the server.
// Network messages sent from the server.
message NetworkServerMessage {
oneof message {
Request request = 1;
Info got_info = 2;
Connected connected = 3;
}
}
message Request {
bool a = 1;
}
message Request {}
message Info {
string server_name = 1;

View File

@ -0,0 +1,32 @@
use std::net::SocketAddr;
use uuid::Uuid;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ClientInfo {
uuid: Uuid,
username: String,
addr: SocketAddr,
}
impl ClientInfo {
pub fn new(uuid: Uuid, username: String, addr: SocketAddr) -> Self {
Self {
uuid,
username,
addr,
}
}
pub fn get_uuid(&self) -> Uuid {
self.uuid
}
pub fn get_username(&self) -> String {
self.username.clone()
}
pub fn get_addr(&self) -> SocketAddr {
self.addr
}
}

View File

@ -0,0 +1,135 @@
use foundation::prelude::{
connected_client_message,
ClientDetails,
ConnectedClientMessage,
Disconnect,
GetClients,
GetGlobalMessages,
SendGlobalMessage,
SendPrivateMessage,
};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
use uuid::Uuid;
use crate::{
connection::{
client_info::ClientInfo,
connection_manager::ConnectionManagerMessage,
},
network::{
client_reader_connection::ClientReaderConnection,
client_writer_connection::ClientWriterConnection,
network_connection::NetworkConnection,
},
};
pub struct ClientThread {
read_task: JoinHandle<()>,
write_task: JoinHandle<()>,
sender: UnboundedSender<ClientMessage>,
}
impl ClientThread {
pub async fn new_run(
uuid: Uuid,
conn: NetworkConnection,
connection_manager_sender: UnboundedSender<ConnectionManagerMessage>,
) -> Self {
let (writer, reader) = conn.send_connected().await;
let (tx, rx) = unbounded_channel();
Self {
read_task: tokio::spawn(Self::run_read(
uuid,
reader,
connection_manager_sender,
)),
write_task: tokio::spawn(Self::run_write(uuid, writer, rx)),
sender: tx,
}
}
async fn run_read(
uuid: Uuid,
mut reader: ClientReaderConnection,
channel: UnboundedSender<ConnectionManagerMessage>,
) {
use connected_client_message::Message;
loop {
println!("[ClientThread:run_read:{}]", uuid);
let msg = reader.get_message().await;
match msg {
Ok(ConnectedClientMessage {
message: Some(Message::GetClients(GetClients {})),
}) => channel.send(ConnectionManagerMessage::SendClientsTo { uuid }),
Ok(ConnectedClientMessage {
message: Some(Message::GetGlobalMessage(GetGlobalMessages {})),
}) => {
channel.send(ConnectionManagerMessage::SendGlobalMessagesTo { uuid })
}
Ok(ConnectedClientMessage {
message:
Some(Message::SendPrivateMessage(SendPrivateMessage {
uuid: message_uuid,
to,
content,
})),
}) => channel.send(ConnectionManagerMessage::SendPrivateMessage {
uuid: message_uuid,
from: uuid,
to: to.parse().unwrap(),
content,
}),
Ok(ConnectedClientMessage {
message:
Some(Message::SendGlobalMessage(SendGlobalMessage { content })),
}) => channel.send(ConnectionManagerMessage::BroadcastGlobalMessage {
from: uuid,
content,
}),
Ok(ConnectedClientMessage {
message: Some(Message::Disconnect(Disconnect {})),
}) => channel.send(ConnectionManagerMessage::Disconnect { uuid }),
Ok(ConnectedClientMessage { message: None }) => unimplemented!(),
Err(_) => todo!(),
};
break;
}
}
async fn run_write(
uuid: Uuid,
mut conn: ClientWriterConnection,
mut receiver: UnboundedReceiver<ClientMessage>,
) {
loop {
let msg = receiver.recv().await;
match msg {
Some(ClientMessage::SendClients(clients)) => {
let clients = clients
.into_iter()
.map(|c| ClientDetails {
uuid: c.get_uuid().to_string(),
name: c.get_username(),
address: c.get_addr().to_string(),
})
.collect();
conn.send_clients(clients).await
}
None => {}
};
}
}
}
pub enum ClientMessage {
SendClients(Vec<ClientInfo>),
}

View File

@ -0,0 +1,110 @@
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
};
use uuid::Uuid;
use crate::{
connection::{client_info::ClientInfo, client_thread::ClientThread},
network::network_connection::NetworkConnection,
};
pub struct ConnectionManager {
receiver: Mutex<UnboundedReceiver<ConnectionManagerMessage>>,
sender: UnboundedSender<ConnectionManagerMessage>,
client_map: HashMap<Uuid, ClientInfo>,
client_tasks_map: HashMap<Uuid, ClientThread>,
}
impl ConnectionManager {
pub fn new() -> Self {
let (tx, rx) = unbounded_channel();
Self {
client_map: HashMap::new(),
client_tasks_map: HashMap::new(),
receiver: Mutex::new(rx),
sender: tx,
}
}
pub async fn run(&mut self) {
loop {
let mut lock = self.receiver.lock().await;
let msg = lock.recv().await;
drop(lock);
match msg {
Some(ConnectionManagerMessage::AddClient {
conn,
uuid,
username,
addr,
}) => self.add_client(conn, uuid, username, addr).await,
Some(_) => {}
None => todo!(),
}
}
}
async fn add_client(
&mut self,
conn: NetworkConnection,
uuid: Uuid,
username: String,
addr: SocketAddr,
) {
let store = ClientInfo::new(uuid, username, addr);
self.client_map.insert(uuid, store);
let thread = ClientThread::new_run(uuid, conn, self.sender.clone()).await;
self.client_tasks_map.insert(uuid, thread);
}
pub fn get_sender(&self) -> UnboundedSender<ConnectionManagerMessage> {
self.sender.clone()
}
}
impl Default for ConnectionManager {
fn default() -> Self {
Self::new()
}
}
pub enum ConnectionManagerMessage {
// server messages
AddClient {
conn: NetworkConnection,
uuid: Uuid,
username: String,
addr: SocketAddr,
},
// client thread messages
SendClientsTo {
uuid: Uuid,
},
SendGlobalMessagesTo {
uuid: Uuid,
},
BroadcastGlobalMessage {
from: Uuid,
content: String,
},
SendPrivateMessage {
uuid: String,
from: Uuid,
to: Uuid,
content: String,
},
Disconnect {
uuid: Uuid,
},
}

View File

@ -0,0 +1,3 @@
pub mod client_info;
pub mod client_thread;
pub mod connection_manager;

View File

@ -3,6 +3,7 @@
pub(crate) mod network;
pub mod connection;
pub mod os_signal_manager;
pub mod server_va;

View File

@ -1,20 +0,0 @@
use std::net::SocketAddr;
use tokio::net::TcpStream;
use crate::network_connection::NetworkConnection;
struct ClientConnection {
stream: TcpStream,
_addr: SocketAddr,
}
impl From<NetworkConnection> for ClientConnection {
fn from(value: NetworkConnection) -> Self {
Self {
stream: value.
}
}
}
impl ClientConnection {}

View File

@ -0,0 +1,28 @@
use std::{io, net::SocketAddr};
use foundation::{networking::read_message, prelude::ConnectedClientMessage};
use tokio::{io::ReadHalf, net::TcpStream};
pub struct ClientReaderConnection {
reader: ReadHalf<TcpStream>,
_addr: SocketAddr,
}
impl ClientReaderConnection {
pub fn new(reader: ReadHalf<TcpStream>, addr: SocketAddr) -> Self {
Self {
reader: todo!(),
_addr: todo!(),
}
}
// move to other one
pub async fn get_message(&mut self) -> io::Result<ConnectedClientMessage> {
let message = read_message::<ConnectedClientMessage, ReadHalf<TcpStream>>(
&mut self.reader,
)
.await
.unwrap();
Ok(message)
}
}

View File

@ -0,0 +1,72 @@
use std::{io, net::SocketAddr};
use foundation::{
networking::{read_message, write_message},
prelude::{
connected_server_message,
ClientDetails,
ConnectedClientMessage,
ConnectedClients,
ConnectedServerMessage,
Disconnected,
GlobalMessage,
GlobalMessages,
PrivateMessage,
},
};
use tokio::{
io::{split, WriteHalf},
net::TcpStream,
};
use crate::network::{
client_reader_connection::ClientReaderConnection,
network_connection::NetworkConnection,
};
pub struct ClientWriterConnection {
writer: WriteHalf<TcpStream>,
addr: SocketAddr,
}
impl ClientWriterConnection {
pub fn new(writer: WriteHalf<TcpStream>, addr: SocketAddr) -> Self {
Self { writer, addr }
}
pub async fn send_clients(&mut self, clients: Vec<ClientDetails>) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::ConnectedClients(
ConnectedClients { clients },
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_global_messages(&mut self, messages: Vec<GlobalMessage>) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::GlobalMessages(
GlobalMessages { messages },
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_private_message(&mut self, message: PrivateMessage) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::PrivateMessage(message)),
};
write_message(&mut self.writer, message).await.unwrap();
}
pub async fn send_disconnect(&mut self) {
let message = ConnectedServerMessage {
message: Some(connected_server_message::Message::Disconnected(
Disconnected {
reason: "shutting down".into(),
},
)),
};
write_message(&mut self.writer, message).await.unwrap();
}
}

View File

@ -1,11 +0,0 @@
use std::collections::HashMap;
use uuid::Uuid;
struct ClientStore {
conn:
}
struct conneciton_manager {
client_map: HashMap<Uuid, Client>,
}

View File

@ -1,4 +1,4 @@
pub mod client_connection;
pub mod connection_manager;
pub mod client_reader_connection;
pub mod client_writer_connection;
pub mod listener_manager;
pub mod network_connection;

View File

@ -6,6 +6,7 @@ use foundation::{
network_client_message,
network_server_message,
Connect,
Connected,
GetInfo,
Info,
NetworkClientMessage,
@ -13,35 +14,36 @@ use foundation::{
Request,
},
};
use tokio::net::TcpStream;
use tokio::{io::split, net::TcpStream};
use crate::network::{
client_reader_connection::ClientReaderConnection,
client_writer_connection::ClientWriterConnection,
};
pub struct NetworkConnection {
stream: TcpStream,
_addr: SocketAddr,
pub(super) stream: TcpStream,
pub(super) addr: SocketAddr,
}
impl NetworkConnection {
pub fn new(stream: TcpStream, addr: SocketAddr) -> Self {
Self {
stream,
_addr: addr,
}
Self { stream, addr }
}
pub async fn get_request(&mut self) -> io::Result<ServerRequest> {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::Request(Request {
a: true,
})),
message: Some(network_server_message::Message::Request(Request {})),
};
println!("[NetworkConnection] sending request");
write_message(&mut self.stream, message).await.unwrap();
println!("[NetworkConnection] waiting for response");
let request = read_message::<NetworkClientMessage>(&mut self.stream)
.await
.unwrap();
let request =
read_message::<NetworkClientMessage, TcpStream>(&mut self.stream)
.await
.unwrap();
println!("[NetworkConnection] returning request");
match request {
@ -57,6 +59,7 @@ impl NetworkConnection {
} => Ok(ServerRequest::Connect {
username,
uuid: uuid.parse().unwrap(),
addr: self.addr,
}),
_ => Ok(ServerRequest::Ignore),
}
@ -73,10 +76,38 @@ impl NetworkConnection {
write_message(&mut self.stream, message).await.unwrap();
println!("[NetworkConnection] droping connection");
}
pub async fn send_connected(
mut self,
) -> (ClientWriterConnection, ClientReaderConnection) {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::Connected(Connected {})),
};
write_message(&mut self.stream, message).await.unwrap();
self.into()
}
}
pub enum ServerRequest {
GetInfo,
Connect { username: String, uuid: uuid::Uuid },
Connect {
username: String,
uuid: uuid::Uuid,
addr: SocketAddr,
},
Ignore,
}
impl From<NetworkConnection>
for (ClientWriterConnection, ClientReaderConnection)
{
fn from(value: NetworkConnection) -> Self {
let (read, write) = split(value.stream);
let writer = ClientWriterConnection::new(write, value.addr.clone());
let reader = ClientReaderConnection::new(read, value.addr.clone());
(writer, reader)
}
}

View File

@ -1,12 +1,16 @@
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
},
task::JoinHandle,
};
use crate::{
connection::connection_manager::{
ConnectionManager,
ConnectionManagerMessage,
},
network::{
listener_manager::{ConnectionType, ListenerManager},
network_connection::{NetworkConnection, ServerRequest},
@ -14,13 +18,20 @@ use crate::{
os_signal_manager::OSSignalManager,
};
/// # Server
/// Manages communication between components in the server
/// Main functions being the handling of new connections, and setting them up.
pub struct Server {
os_event_manager_task: JoinHandle<()>,
connection_manager_sender: UnboundedSender<ConnectionManagerMessage>,
connection_manager_task: JoinHandle<()>,
listener_task: JoinHandle<()>,
os_event_manager_task: JoinHandle<()>,
receiver: Mutex<UnboundedReceiver<ServerMessages>>,
}
impl Server {
/// Loops the future, reading messages from the servers channel.
/// if exit is received, deconstructs all sub-tasks and exits the loop.
pub async fn run(&self) {
loop {
let mut lock = self.receiver.lock().await;
@ -45,6 +56,7 @@ impl Server {
}
async fn handle_protobuf_connection(&self, mut conn: NetworkConnection) {
println!("[Server] Getting request");
let req = conn.get_request().await.unwrap();
match req {
@ -54,15 +66,27 @@ impl Server {
.await
}
ServerRequest::Connect {
username: _,
uuid: _,
} => todo!(),
username,
uuid,
addr,
} => {
println!("[Server] sending connectionn and info to conneciton manager");
self.connection_manager_sender.send(
ConnectionManagerMessage::AddClient {
conn,
uuid,
username,
addr,
},
);
}
ServerRequest::Ignore => todo!(),
}
}
fn shutdown(&self) {
self.os_event_manager_task.abort();
self.connection_manager_task.abort();
self.listener_task.abort();
}
}
@ -81,14 +105,24 @@ impl Default for Server {
ListenerManager::new(tx2).await.run().await;
});
let mut connection_manager = ConnectionManager::new();
let connection_manager_sender = connection_manager.get_sender();
let connection_manager_task = tokio::spawn(async move {
connection_manager.run().await;
});
Self {
os_event_manager_task,
connection_manager_task,
connection_manager_sender,
receiver: Mutex::new(rx),
listener_task,
}
}
}
/// # ServerMessage
/// enum describing all messages that the server can handle
pub enum ServerMessages {
Exit,
NewConnection(ConnectionType),