Adding client updates on addition and removal (#21)

* adding user update support

* Adding public key storage

* replaced duplicate fields

* changed messaegs to include a type property

* added encryption example

* Moved threads to tokio async

* Moved threads to tokio async

* Created global message support

* created basic ui that counts

* Created connection abstraction

This abstracts a TcpStream await to use any serialisable types.

* exposed server as lib

+created lib output for server
+ added extra dependencies
+ added functions to retrieve the port of a server

* updated connection

+ added manager trait
+ updated connection to use more idomatic rust code converted enums to Strigns rather that &str
+

* added network Manager

tl;dr, as the title says

* Update network_manager.rs

Converted over to new messaging system.

* meant to add this earlier

* Right bare with me...

This doesnt work for now but i have a system that works.

After this commit i will be resuming the smaller changes.

I needed to do this to find a new approach.

God save my mind.

* Impl IManager for ClientManager

* Added client manager tests

* Stripped server of functinoality

This is temporary whilst the client manager is being tested.

* Update client_manager.rs

swapped lock type for tokio lock

* Update client_manager.rs

- Added type arguments to Client manager
- deprecated start method

* Update client_manager.rs

- implemented handle channel method from old start method.
- added notes to possible issues.

* Update client_manager.rs

+ updated lifetime bounds.

* Update client_manager.rs

+ created function to get the number of connected clients

* Update client_manager.rs

+ created function to add/remove clients to the client manager.

* Update server.rs

- refactored server to simplify message passing

* Update client.rs

meant to add this earlier

* Update messages.rs

formatting

* moved some messages around

* Update server.rs

- removed redundant messages

* Update client_manager.rs

bunch more changes

yes i know i'm rubbish at this plz don't say anything :(

* Added Message Broadcasting functionality

* cleared up warnings

* Update client.rs

- removed redundant fields

* Update client.rs

added global broadcasting

* Added the ability to disconnect from the server

* Update network_manager.rs

this protects the network manager from crashing when a erroneous message is sent

* implemented message sending between users

* Update client.rs

added tokio, to protect client connections.

* Added Lua dependency

* Made server, Lua scriptable.

* added basic scripting abilities to the server

* Update client.rs

fixed stray connection

* made client Lua scriptable

* Made client manager Lua scriptable

* Adde fields to server to get client manager

* Updated testing script

* added client indexing to client manager

* added basic callback support to ClientManager

* moved Lua structs to separate module

* added arbitrary self types

* Created example plugin crate

* Added libloading dependency

* Created plugin trait and create function type

* created basic PluginManager

* Added plugin manager to server

* added modules

* Updated example plugin

* added plugin manager to server

* modified plugin module visibility

* updated example plugin

* updated plagin interface

* created plugin interface

* updated plugin init process

* updated example plugin

* cleaned up lib folder

* fixed cloning issues with plugin manager

* updated plugin trait implementation

* updated module definitions

* upadted rust fmt rules for imports

* fixoing formatting and ther errors. This is a pain to look through. i'm sorry :(

* minor fix

* Fixed plugin not functioning with tokio

* Added plugin lifecycle events

added lifecycle events to the plugin entry.

This allows plugins to be in a stopped paused or running state

* Adjusted visibility modifiers

* added basics of an event system

* updated imports.

* added function to get plugin entry as interface to plugin

* updated plugin example

* fixing linting errors

* updated event architecture

* renamed responder module

* added documention

* created a event result builder

* renamed responder

* updated example with interface setting

* moved event system to foundation

* added back tokio to foundation

* modified visibility

* added functions for IResponder

* more module mess

* made event generic

* made responder generic

* added basic plugin event propogation.

* updated plugin event handler.

* same because vscode?

* made plugin event generic

* RIGHT I'M MOVING TO ACTIX

* Started work on actor conversion

* implemented GetInfo for actix server

* implementing clients using actors

* added ability to add and remove clients

* updated foundation with comments and better messages

* Update Cargo.toml

+ added tokio stream

* added auto connection closing to Connection

* Update actix_server.rs

+ added ClientConnection handlers

* added ability to get server updates

* added basic messaging functonality

* deleted old server version

* renamed files to match std structure

* reformatted project

* renamed old files

* renamed serverActor to server

* added proper prelude

* Update client.rs

minor bug fixes

* updated clap

* made server configurable

* added port configurability

~ moved components to seperate modules
~ added builder and config to network manager

* way too much happened here

+ added scriptable version of the server, networkManager and clientManager

+ added lua engine creation
+also added unfinished rhai support
+ also did some restructuring

* made client manager and clients scriptable

* added more scriptable objects

* Created config manager with path read functionality

* fixed network manager message result types

* added bootstrapper actor

* moved arg matcher funtion.

* created singleton config manager using once pattern.

* removed bootstrapper and created config builder

* fixed panic on file not existing

* ignoring config_file

* added config manager to network manager

* updated some docs

* added config support to network manager, changed lifecycle a bit as well

* removed redundant handlers

* added configuration through args support to config manager and network manager.

* added config manager support to server and removed old serverConfig references

* performed cleanup of network manager

* performed cleanup of server

* updated config manager api to use optionals. This leads to pre-configuration and easier readability and understanding.

* updated server and network manager to new style for setting config values

* cleaned up prelude linting errors

* cleaned up network listener

* cleaned up connection initator

* cleaned up connection actor

* cleaned up lua manager

* cleaned up rhai manager

* cleaned up config manager

* cleaned up client manager

* cleaned up client actor

* cleaned up scriptable interfaces

* updated client uuid version

* fixed client cargo toml

* undone previous change

* moved arg parser to folder as file

* uhh... changes and formatting

* created chat manager and added it to the client manager

* added global message spport to client manager

* refixed network manager (again again)

* added debug messages

* refactord client code

* refactored messages and added new message types

* refactord network manager module name

* resolved warning in connection

* refactored message into foundation, added get message support for clients

* Fixed messages being sent to sender

* fixed not implemented panic, and field misnaming

* turned some references into weak variaties, to prevent memory leaks.

* turned ref into weak ref

* removing  strong references from scriping managers

* fixing lints

* making moe things weak repferences

* refactored connection initiator into spereate files

* accedentally ran cargo fmt instead of on one file

* removed the read loop, and replaced it with a recurrent messages.

* added docker files and automated build scripts

* Added client updates for addition and removal of clients
This commit is contained in:
michael bailey 2024-01-03 23:35:26 +00:00 committed by GitHub
parent ddb886df03
commit e6905cb6b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 67 additions and 42 deletions

View File

@ -83,7 +83,8 @@ impl Client {
}
}
pub(crate) fn error(&self, msg: String) {
pub(crate) fn _error(&self, msg: String) {
println!("[Client] sending error: {}", msg);
use serde_json::to_string;
use ConnectionMessage::SendData;
@ -219,29 +220,34 @@ impl Handler<ConnectionObservableOutput> for Client {
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
GetMessages,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
use crate::network::ConnectionObservableOutput::RecvData;
if let RecvData(_sender, _addr, data) = msg {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
GetMessages,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => {
self.send_gloal_message(ctx, content)
match msg {
RecvData(_sender, data) => {
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => {
self.send_gloal_message(ctx, content)
}
Disconnect => self.disconnect(ctx),
}
Disconnect => self.disconnect(ctx),
}
} else {
self.error(format!("Failed to parse Message: {}", data));
}
ConnectionObservableOutput::ConnectionClosed(_) => self
.broadcast(ClientObservableMessage::Disconnecting(self.details.uuid)),
}
}
}

View File

@ -58,7 +58,7 @@ impl ClientManager {
}
pub(crate) fn send_client_list(
&mut self,
&self,
ctx: &mut Context<Self>,
sender: WeakAddr<Client>,
) {
@ -66,7 +66,7 @@ impl ClientManager {
use crate::client_management::client::ClientMessage::ClientList;
if let Some(to_send) = sender.upgrade() {
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage::Details))
@ -115,7 +115,7 @@ impl ClientManager {
) {
println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr.clone())
.then(|addr| addr.send(ClientDataMessage::Details))
@ -164,7 +164,7 @@ impl ClientManager {
use crate::client_management::client::ClientMessage::GloballySentMessage;
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
self.clients.values().cloned().collect();
if let Some(sender) = sender.upgrade() {
let cm = self.chat_manager.clone();
@ -227,16 +227,24 @@ impl ClientManager {
println!("[ClientManager] sending subscribe message to client");
addr.do_send(Subscribe(recp.downgrade()));
self.clients.insert(uuid, addr);
for (_k, v) in self.clients.clone() {
self.send_client_list(ctx, v.downgrade())
}
}
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] removing client");
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
let addr = self.clients.remove(&uuid);
if let Some(addr) = addr {
println!("[ClientManager] sending unsubscribe message to client");
addr.do_send(Unsubscribe(recp.downgrade()));
}
println!("[ClientManager] sending client list to other clients");
for (_k, v) in self.clients.iter() {
self.send_client_list(ctx, v.downgrade())
}
}
fn disconnect_client(
@ -249,6 +257,7 @@ impl ClientManager {
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp.downgrade()));
self.remove_client(ctx, uuid);
}
}
}

View File

@ -5,15 +5,13 @@ use actix::{
fut::wrap_future,
Actor,
ActorContext,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
SpawnHandle,
WeakRecipient,
};
use futures::{future::join_all, stream::Buffered, Future, FutureExt};
use futures::{future::join_all, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
@ -37,7 +35,7 @@ use crate::{
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
_address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
}
@ -49,7 +47,7 @@ impl Connection {
let (read_half, write_half) = split(stream);
let addr = Connection {
write_half: Arc::new(Mutex::new(write_half)),
address,
_address: address,
observers: Vec::new(),
}
.start();
@ -88,7 +86,6 @@ impl Connection {
ctx: &mut <Self as Actor>::Context,
mut buf_reader: BufReader<ReadHalf<TcpStream>>,
) {
let address = self.address;
let weak_addr = ctx.address().downgrade();
let read_fut = async move {
@ -97,7 +94,6 @@ impl Connection {
let read_fut = buf_reader.read_line(&mut buffer_string);
let Ok(Ok(len)) = timeout(dur, read_fut).await else {
println!("[Connection] timeout reached");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
@ -106,6 +102,9 @@ impl Connection {
if len == 0 {
println!("[Connection] readline returned 0");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::Close);
}
return;
}
@ -114,7 +113,6 @@ impl Connection {
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
address,
buffer_string.clone(),
),
))
@ -127,6 +125,11 @@ impl Connection {
};
ctx.spawn(wrap_future(read_fut));
}
fn close_connection(&self, ctx: &mut <Self as Actor>::Context) {
use ConnectionObservableOutput::ConnectionClosed;
self.broadcast(ctx, ConnectionClosed(ctx.address().downgrade()))
}
}
impl Actor for Connection {
@ -135,7 +138,7 @@ impl Actor for Connection {
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[Connection] started");
}
@ -251,6 +254,7 @@ impl Handler<ConnectionPrivateMessage> for Connection {
ConnectionPrivateMessage::DoRead(buf_reader) => {
self.do_read(ctx, buf_reader)
}
ConnectionPrivateMessage::Close => self.close_connection(ctx),
};
}
}

View File

@ -1,5 +1,3 @@
use std::net::SocketAddr;
use actix::{Message, WeakAddr};
use tokio::{
io::{BufReader, ReadHalf},
@ -19,7 +17,7 @@ pub(crate) enum ConnectionMessage {
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub(crate) enum ConnectionObservableOutput {
RecvData(WeakAddr<Connection>, SocketAddr, String),
RecvData(WeakAddr<Connection>, String),
ConnectionClosed(WeakAddr<Connection>),
}
@ -28,4 +26,5 @@ pub(crate) enum ConnectionObservableOutput {
pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput),
DoRead(BufReader<ReadHalf<TcpStream>>),
Close,
}

View File

@ -58,7 +58,6 @@ impl ConnectionInitiator {
&mut self,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
@ -158,8 +157,8 @@ impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
if let RecvData(sender, data) = msg {
self.handle_request(sender, ctx, data)
}
}
}

View File

@ -57,6 +57,7 @@ impl NetworkListener {
let delegate = self.delegate.clone();
ctx.spawn(wrap_future(async move {
use ListenerOutput::NewConnection;
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
println!("[NetworkListener] accepted socket");
@ -66,6 +67,7 @@ impl NetworkListener {
break;
};
println!("[NetworkListener] sending connection to delegate");
delegate.do_send(NewConnection(conn))
}
}));

View File

@ -170,9 +170,12 @@ impl Actor for NetworkManager {
},
);
ctx.spawn(fut);
println!("[NetworkManager] Finished Starting");
}
}
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("[NetworkManager] network manager stopped");
}
}
impl Handler<NetworkMessage> for NetworkManager {
@ -215,7 +218,10 @@ impl Handler<ListenerOutput> for NetworkManager {
) -> Self::Result {
use ListenerOutput::NewConnection;
match msg {
NewConnection(connection) => self.new_connection(ctx, connection),
NewConnection(connection) => {
println!("new connection");
self.new_connection(ctx, connection)
}
};
}
}