merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
2 changed files with 162 additions and 18 deletions
Showing only changes of commit d8b0884014 - Show all commits

View File

@ -1,30 +1,51 @@
use crate::network::Connection;
use std::net::SocketAddr;
use crate::network::{Connection, ConnectionOuput};
use crate::prelude::ObservableMessage;
use actix::{Actor, Addr, Context, Handler, Message, WeakAddr, Recipient, Running, ArbiterHandle};
use serde_json::to_string;
use actix::{Actor, Addr, Context, Handler, Message, MessageResponse, WeakAddr, Recipient, Running, ArbiterHandle, AsyncContext};
use serde_json::{from_str, to_string};
use foundation::ClientDetails;
use crate::network::ConnectionMessage;
use uuid::Uuid;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use crate::client_management::client::ClientObservableMessage::UpdateRequest;
use crate::network::ConnectionMessage::SendData;
use crate::prelude::ObservableMessage::Subscribe;
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientMessage {
pub enum ClientMessage {
SendUpdate(Vec<ClientDetails>),
sendMessage {
from: Uuid,
content: String,
},
}
#[derive(Message)]
#[rtype(result = "ClientDetailsResponse")]
pub struct ClientDataMessage;
#[derive(MessageResponse)]
pub struct ClientDetailsResponse(pub ClientDetails);
/// messages the client will send to itself
enum SelfMessage {
ReceivedMessage(ClientStreamIn)
}
/// message that is sent to all observers of the current client.
#[derive(Message)]
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub(crate) enum ClientObservableMessage {
pub enum ClientObservableMessage {
SendMessageRequest(WeakAddr<Client>, Uuid, String),
UpdateRequest(WeakAddr<Client>),
}
/// # Client
/// This represents a connected client.
/// it will handle received message from a connection.
pub(crate) struct Client {
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>
@ -42,20 +63,76 @@ impl Client {
}
.start()
}
fn handle_request(
&mut self,
ctx: &mut Context<Client>,
sender: Addr<Connection>,
addr: SocketAddr,
data: String
) {
use ClientStreamIn::{Update, SendMessage, SendGlobalMessage, Disconnect};
let msg = from_str::<ClientStreamIn>(data.as_str()).expect("[Client] failed to decode incoming message");
match msg {
Update => self.handle_update(ctx),
SendMessage { to, content } => self.handle_send(ctx, to, content),
SendGlobalMessage { content } => self.handle_global_send(ctx, content),
Disconnect => self.handle_disconnect(ctx),
_ => todo!()
}
}
#[inline]
fn handle_update(&self,
ctx: &mut Context<Client>,
) {
self.broadcast(UpdateRequest(ctx.address().downgrade()));
}
#[inline]
fn handle_send(&self, ctx: &mut Context<Client>, uuid: Uuid, content: String) {
todo!()
}
#[inline]
fn handle_global_send(&self, p0: &mut Context<Client>, p1: String) {
todo!()
}
#[inline]
fn handle_disconnect(&self, ctx: &mut Context<Client>) {
todo!()
}
#[inline]
fn broadcast(&self, message: ClientObservableMessage) {
for recp in &self.observers {
recp.do_send(message.clone());
}
}
}
impl Actor for Client {
type Context = Context<Self>;
// tells the client that it has been connected
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Connected;
use ConnectionMessage::{SendData};
println!("[Client] started");
self.connection.do_send(Subscribe(ctx.address().recipient()));
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Connected).unwrap()));
}
}
impl Handler<ClientDataMessage> for Client {
type Result = ClientDetailsResponse;
fn handle(&mut self, msg: ClientDataMessage, ctx: &mut Self::Context) -> Self::Result {
ClientDetailsResponse(self.details.clone())
}
}
// Handles incoming messages to the client.
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(
@ -63,12 +140,38 @@ impl Handler<ClientMessage> for Client {
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientMessage::SendUpdate;
use ClientStreamOut::{ConnectedClients};
match msg {
SendUpdate(clients) => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&ConnectedClients { clients }
).expect("[Client] Failed to encode string"))),
_ => todo!(),
}
}
}
// Handles outputs from the connection.
impl Handler<ConnectionOuput> for Client {
type Result = ();
fn handle(
&mut self,
msg: ConnectionOuput,
ctx: &mut Self::Context
) -> Self::Result {
use ConnectionOuput::RecvData;
match msg {
RecvData(sender, addr, data) => self.handle_request(ctx, sender, addr, data),
_ => todo!()
}
}
}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
type Result = ();

View File

@ -1,14 +1,21 @@
use crate::client_management::Client;
use actix::{Actor, ArbiterHandle, Recipient, Running, WeakAddr};
use actix::{Actor, ActorFutureExt, ActorStreamExt, ArbiterHandle, MailboxError, Recipient, Running, StreamHandler, WeakAddr};
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::{Message, MessageResponse};
use actix::WeakRecipient;
use std::collections::HashMap;
use actix::fut::{wrap_future, wrap_stream};
use futures::TryStreamExt;
use uuid::Uuid;
use crate::client_management::client::ClientObservableMessage;
use tokio_stream::StreamExt;
use foundation::ClientDetails;
use foundation::messages::client::ClientStreamIn;
use crate::client_management::client::ClientMessage;
use crate::client_management::client::{ClientDataMessage, ClientObservableMessage};
use crate::network::NetworkOutput;
use crate::prelude::ObservableMessage;
#[derive(Message)]
@ -20,15 +27,48 @@ pub(crate) enum ClientManagerMessage {
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerOutput {}
pub enum ClientManagerOutput {
UpdateRequest(Addr<ClientManager>),
}
pub(crate) struct ClientManager {
pub struct ClientManager {
clients: HashMap<Uuid, Addr<Client>>,
delegate: WeakRecipient<ClientManagerOutput>,
}
impl ClientManager {
pub(crate) fn send_message_request(&self, ctx: &mut Context<ClientManager>, addr: WeakAddr<Client>, uuid: Uuid, content: String) {
pub(crate) fn send_update(&mut self, ctx: &mut Context<Self>, addr: WeakAddr<Client>) {
use ClientMessage::SendUpdate;
let self_addr = ctx.address();
if let Some(to_send) = addr.upgrade() {
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
.collect();
let collection =
tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
.collect();
let fut = wrap_future(async move {
let a: Vec<_> = collection.await;
to_send.send(SendUpdate(a)).await;
});
ctx.spawn(fut);
}
}
pub(crate) fn send_message_request(
&self,
ctx: &mut Context<ClientManager>,
addr: WeakAddr<Client>,
uuid: Uuid,
content: String
) {
todo!()
}
}
@ -65,7 +105,6 @@ impl Actor for ClientManager {
fn started(&mut self, ctx: &mut Self::Context) {
println!("[ClientManager] started");
}
}
@ -90,9 +129,11 @@ impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(&mut self, msg: ClientObservableMessage, ctx: &mut Self::Context) -> Self::Result {
use ClientObservableMessage::SendMessageRequest;
use ClientObservableMessage::{SendMessageRequest, UpdateRequest};
match msg {
SendMessageRequest(addr, uuid, content) => self.send_message_request(ctx, addr, uuid, content),
UpdateRequest(addr) => self.send_update(ctx, addr),
_ => todo!()
}
}
}