merge develop into master #20

Merged
michael-bailey merged 181 commits from develop into master 2023-12-01 21:48:28 +00:00
2 changed files with 95 additions and 14 deletions
Showing only changes of commit d0c50366aa - Show all commits

View File

@ -7,19 +7,23 @@ use foundation::ClientDetails;
use crate::network::ConnectionMessage;
use uuid::Uuid;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use crate::client_management::client::ClientObservableMessage::UpdateRequest;
use crate::client_management::client::ClientObservableMessage::{SendGlobalMessageRequest, SendMessageRequest, UpdateRequest};
use crate::network::ConnectionMessage::SendData;
use crate::prelude::ObservableMessage::Subscribe;
use crate::prelude::ObservableMessage::{Subscribe, Unsubscribe};
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientMessage {
SendUpdate(Vec<ClientDetails>),
sendMessage {
SendMessage {
from: Uuid,
content: String,
},
SendGlobalMessage {
from: Uuid,
content: String,
}
}
#[derive(Message)]
@ -39,6 +43,7 @@ enum SelfMessage {
#[rtype(result = "()")]
pub enum ClientObservableMessage {
SendMessageRequest(WeakAddr<Client>, Uuid, String),
SendGlobalMessageRequest(WeakAddr<Client>, String),
UpdateRequest(WeakAddr<Client>),
}
@ -90,13 +95,17 @@ impl Client {
}
#[inline]
fn handle_send(&self, ctx: &mut Context<Client>, uuid: Uuid, content: String) {
todo!()
fn handle_send(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
self.broadcast(SendMessageRequest(
ctx.address().downgrade(),
to,
content
));
}
#[inline]
fn handle_global_send(&self, p0: &mut Context<Client>, p1: String) {
todo!()
fn handle_global_send(&self, ctx: &mut Context<Client>, content: String) {
self.broadcast(SendGlobalMessageRequest(ctx.address().downgrade(), content));
}
#[inline]
@ -123,6 +132,13 @@ impl Actor for Client {
self.connection.do_send(Subscribe(ctx.address().recipient()));
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Connected).unwrap()));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Disconnected;
use ConnectionMessage::{SendData};
self.connection.do_send(Unsubscribe(ctx.address().recipient()));
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Disconnected).unwrap()));
}
}
impl Handler<ClientDataMessage> for Client {
@ -140,15 +156,22 @@ impl Handler<ClientMessage> for Client {
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientMessage::SendUpdate;
use ClientStreamOut::{ConnectedClients};
use ClientMessage::{SendUpdate, SendMessage, SendGlobalMessage};
use ClientStreamOut::{ConnectedClients, UserMessage, GlobalMessage};
match msg {
SendUpdate(clients) => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&ConnectedClients { clients }
).expect("[Client] Failed to encode string"))),
SendMessage {content, from} => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&UserMessage {from,content}
).expect("[Client] Failed to encode string"))),
SendGlobalMessage { from, content } => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&GlobalMessage {from,content}
).expect("[Client] Failed to encode string"))),
_ => todo!(),
}
}

View File

@ -8,13 +8,15 @@ use actix::{Message, MessageResponse};
use actix::WeakRecipient;
use std::collections::HashMap;
use actix::fut::{wrap_future, wrap_stream};
use futures::TryStreamExt;
use futures::{SinkExt, TryStreamExt};
use uuid::Uuid;
use tokio_stream::StreamExt;
use foundation::ClientDetails;
use foundation::messages::client::ClientStreamIn;
use foundation::messages::client::ClientStreamIn::SendGlobalMessage;
use crate::client_management::client::ClientMessage;
use crate::client_management::client::{ClientDataMessage, ClientObservableMessage};
use crate::client_management::client::ClientMessage::SendMessage;
use crate::network::NetworkOutput;
use crate::prelude::ObservableMessage;
@ -38,6 +40,7 @@ pub struct ClientManager {
impl ClientManager {
pub(crate) fn send_update(&mut self, ctx: &mut Context<Self>, addr: WeakAddr<Client>) {
println!("[ClientManager] sending update to client");
use ClientMessage::SendUpdate;
let self_addr = ctx.address();
if let Some(to_send) = addr.upgrade() {
@ -51,6 +54,7 @@ impl ClientManager {
tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
// .filter(|val| )
.collect();
let fut = wrap_future(async move {
@ -65,11 +69,62 @@ impl ClientManager {
pub(crate) fn send_message_request(
&self,
ctx: &mut Context<ClientManager>,
addr: WeakAddr<Client>,
sender: WeakAddr<Client>,
uuid: Uuid,
content: String
) {
todo!()
println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
.collect();
let collection =
tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
.collect();
let fut = wrap_future(async move {
if let Some(sender)= sender.upgrade() {
let from: Uuid = sender.send(ClientDataMessage).await.unwrap().0.uuid;
let client_details: Vec<ClientDetails> = collection.await;
let pos = client_details.iter().position(|i| i.uuid == from);
if let Some(pos) = pos {
sender.send(SendMessage {content, from}).await;
}
}
});
ctx.spawn(fut);
}
pub(crate) fn send_global_message_request(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
content: String
) {
use ClientMessage::SendGlobalMessage;
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
.collect();
if let Some(sender)= sender.upgrade() {
let fut = wrap_future(async move {
let from: Uuid = sender.send(ClientDataMessage).await.unwrap().0.uuid;
let collection =
tokio_stream::iter(client_addr)
.then(move |addr| addr.send(SendGlobalMessage { content: content.clone(), from }))
.collect();
let a: Vec<_> = collection.await;
});
ctx.spawn(fut);
}
}
}
@ -85,6 +140,7 @@ impl ClientManager {
}
fn add_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid, addr: Addr<Client>) {
println!("[ClientManager] adding client");
use crate::prelude::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
addr.do_send(Subscribe(recp));
@ -92,6 +148,7 @@ impl ClientManager {
}
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] removing client");
use crate::prelude::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
@ -129,9 +186,10 @@ impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(&mut self, msg: ClientObservableMessage, ctx: &mut Self::Context) -> Self::Result {
use ClientObservableMessage::{SendMessageRequest, UpdateRequest};
use ClientObservableMessage::{SendMessageRequest, UpdateRequest, SendGlobalMessageRequest};
match msg {
SendMessageRequest(addr, uuid, content) => self.send_message_request(ctx, addr, uuid, content),
SendGlobalMessageRequest(addr,content) => self.send_global_message_request(ctx, addr, content),
UpdateRequest(addr) => self.send_update(ctx, addr),
_ => todo!()
}