refactored message into foundation, added get message support for clients

This commit is contained in:
michael-bailey 2022-09-28 08:50:53 +01:00
parent db4af038d7
commit 42677d71b5
9 changed files with 123 additions and 98 deletions

View File

@ -1,9 +1,11 @@
use actix::{Actor, Addr, Context, Handler};
use foundation::models::message::Message;
use uuid::Uuid;
use crate::client_management::chat_manager::{
message_type::Message,
messages::{ChatManagerDataMessage, ChatManagerDataResponse, ChatManagerMessage},
use crate::client_management::chat_manager::messages::{
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
};
pub(crate) struct ChatManager {

View File

@ -1,21 +0,0 @@
use chrono::{DateTime, Local};
use uuid::Uuid;
#[derive(Clone)]
pub struct Message {
_id: Uuid,
_from: Uuid,
_content: String,
_time: DateTime<Local>,
}
impl Message {
pub fn new(from: Uuid, content: String) -> Self {
Self {
_id: Uuid::new_v4(),
_from: from,
_content: content,
_time: Local::now(),
}
}
}

View File

@ -1,8 +1,7 @@
use actix::{Message as ActixMessage, MessageResponse};
use foundation::models::message::Message;
use uuid::Uuid;
use super::Message;
#[derive(ActixMessage, Debug)]
#[rtype(result = "()")]
pub enum ChatManagerMessage {

View File

@ -5,11 +5,10 @@
//! - Mesage type
mod actor;
mod message_type;
mod messages;
pub(crate) use actor::ChatManager;
use message_type::Message;
pub(crate) use messages::{
ChatManagerDataMessage,
ChatManagerDataResponse,

View File

@ -1,5 +1,3 @@
use std::net::SocketAddr;
use actix::{Actor, Addr, AsyncContext, Context, Handler, Recipient};
use foundation::{messages::client::ClientStreamIn, ClientDetails};
use uuid::Uuid;
@ -10,18 +8,11 @@ use crate::{
ClientDataResponse,
ClientMessage,
ClientObservableMessage,
ClientObservableMessage::{GlobalMessage, Message, Update},
},
network::{Connection, ConnectionOuput},
prelude::messages::ObservableMessage,
};
/// messages the client will send to itself
#[allow(dead_code)]
enum SelfMessage {
ReceivedMessage(ClientStreamIn),
}
/// # Client
/// This represents a connected client.
/// it will handle received message from a connection.
@ -41,48 +32,33 @@ impl Client {
.start()
}
fn handle_request(
&mut self,
ctx: &mut Context<Client>,
_sender: Addr<Connection>,
_addr: SocketAddr,
data: String,
) {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
GetClients => self.handle_update(ctx),
SendMessage { to, content } => self.handle_send(ctx, to, content),
SendGlobalMessage { content } => self.handle_global_send(ctx, content),
Disconnect => self.handle_disconnect(ctx),
_ => todo!(),
}
#[inline]
fn get_clients(&self, ctx: &mut Context<Client>) {
use ClientObservableMessage::GetClients;
self.broadcast(GetClients(ctx.address().downgrade()));
}
#[inline]
fn handle_update(&self, ctx: &mut Context<Client>) {
self.broadcast(Update(ctx.address().downgrade()));
fn get_messages(&self, ctx: &mut Context<Client>) {
use ClientObservableMessage::GetGlobalMessages;
self.broadcast(GetGlobalMessages(ctx.address().downgrade()));
todo!()
}
#[inline]
fn handle_send(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
fn send_message(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
use ClientObservableMessage::Message;
self.broadcast(Message(ctx.address().downgrade(), to, content));
}
#[inline]
fn handle_global_send(&self, ctx: &mut Context<Client>, content: String) {
fn send_gloal_message(&self, ctx: &mut Context<Client>, content: String) {
use ClientObservableMessage::GlobalMessage;
self.broadcast(GlobalMessage(ctx.address().downgrade(), content));
}
#[inline]
fn handle_disconnect(&self, _ctx: &mut Context<Client>) {
fn disconnect(&self, _ctx: &mut Context<Client>) {
todo!()
}
@ -125,6 +101,9 @@ impl Actor for Client {
network::ConnectionMessage::SendData,
prelude::messages::ObservableMessage::Unsubscribe,
};
println!("[Client] stopped");
self
.connection
.do_send::<ObservableMessage<ConnectionOuput>>(Unsubscribe(
@ -152,29 +131,42 @@ impl Handler<ClientMessage> for Client {
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Self::Context) -> Self::Result {
use foundation::messages::client::{
ClientStreamOut,
ClientStreamOut::{ConnectedClients, GlobalMessage, UserMessage},
ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
},
};
use serde_json::to_string;
use crate::{
client_management::client::messages::ClientMessage::{
GlobalMessage as ClientGlobalMessage,
Message,
Update,
ClientList,
ClientlySentMessage,
GloballySentMessage,
MessageList,
},
network::ConnectionMessage::SendData,
};
match msg {
Update(clients) => self.connection.do_send(SendData(
ClientList(clients) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&ConnectedClients { clients })
.expect("[Client] Failed to encode string"),
)),
Message { content, from } => self.connection.do_send(SendData(
MessageList(messages) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalChatMessages { messages })
.expect("[Client] Failed to encode string"),
)),
ClientlySentMessage { content, from } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
ClientGlobalMessage { from, content } => self.connection.do_send(SendData(
GloballySentMessage { from, content } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
@ -188,9 +180,24 @@ impl Handler<ConnectionOuput> for Client {
fn handle(&mut self, msg: ConnectionOuput, ctx: &mut Self::Context) -> Self::Result {
use crate::network::ConnectionOuput::RecvData;
match msg {
RecvData(sender, addr, data) => self.handle_request(ctx, sender, addr, data),
_ => todo!(),
if let RecvData(_sender, _addr, data) = msg {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
GetMessages,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => self.send_gloal_message(ctx, content),
Disconnect => self.disconnect(ctx),
}
}
}
}

View File

@ -1,5 +1,5 @@
use actix::{Message, MessageResponse, WeakAddr};
use foundation::ClientDetails;
use foundation::{models::message::Message as StoredMessage, ClientDetails};
use uuid::Uuid;
use crate::client_management::client::Client;
@ -8,9 +8,11 @@ use crate::client_management::client::Client;
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientMessage {
Update(Vec<ClientDetails>),
Message { from: Uuid, content: String },
GlobalMessage { from: Uuid, content: String },
ClientList(Vec<ClientDetails>),
MessageList(Vec<StoredMessage>),
ClientlySentMessage { from: Uuid, content: String },
GloballySentMessage { from: Uuid, content: String },
}
#[derive(Message)]
@ -36,5 +38,6 @@ pub enum ClientDataResponse {
pub enum ClientObservableMessage {
Message(WeakAddr<Client>, Uuid, String),
GlobalMessage(WeakAddr<Client>, String),
Update(WeakAddr<Client>),
GetClients(WeakAddr<Client>),
GetGlobalMessages(WeakAddr<Client>),
}

View File

@ -3,6 +3,7 @@ use std::collections::HashMap;
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
@ -15,13 +16,18 @@ use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::client_management::{
chat_manager::{ChatManager, ChatManagerMessage},
chat_manager::{
ChatManager,
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
},
client::{
Client,
ClientDataMessage,
ClientDataResponse,
ClientDataResponse::Details,
ClientMessage::Message,
ClientMessage,
ClientObservableMessage,
},
messages::{
@ -49,9 +55,13 @@ impl ClientManager {
.start()
}
pub(crate) fn send_update(&mut self, ctx: &mut Context<Self>, addr: WeakAddr<Client>) {
pub(crate) fn send_client_list(
&mut self,
ctx: &mut Context<Self>,
addr: WeakAddr<Client>,
) {
println!("[ClientManager] sending update to client");
use crate::client_management::client::ClientMessage::Update;
use crate::client_management::client::ClientMessage::ClientList;
if let Some(to_send) = addr.upgrade() {
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
@ -69,13 +79,29 @@ impl ClientManager {
let fut = wrap_future(async move {
let a: Vec<_> = collection.await;
let _ = to_send.send(Update(a)).await;
let _ = to_send.send(ClientList(a)).await;
});
ctx.spawn(fut);
}
}
pub(crate) fn send_global_messages(
&self,
ctx: &mut Context<ClientManager>,
addr: WeakAddr<Client>,
) {
if let Some(to_send) = addr.upgrade() {
let fut = wrap_future(self.chat_manager.send(ChatManagerDataMessage::GetMessages))
.map(move |out, _a, _ctx| {
if let Ok(ChatManagerDataResponse::GotMessages(res)) = out {
to_send.do_send(ClientMessage::MessageList(res));
}
});
ctx.spawn(fut);
};
}
pub(crate) fn send_message_request(
&self,
ctx: &mut Context<ClientManager>,
@ -114,7 +140,7 @@ impl ClientManager {
let pos = client_details.iter().position(|i| i.uuid == from);
if pos.is_some() {
sender
.send(Message { content, from })
.send(ClientMessage::ClientlySentMessage { content, from })
.await
.expect("TODO: panic message");
}
@ -131,7 +157,7 @@ impl ClientManager {
content: String,
) {
println!("[ClientManager] sending message to client");
use crate::client_management::client::ClientMessage::GlobalMessage;
use crate::client_management::client::ClientMessage::GloballySentMessage;
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
@ -158,7 +184,7 @@ impl ClientManager {
let collection = tokio_stream::iter(client_addr)
.then(move |addr| {
addr.send(GlobalMessage {
addr.send(GloballySentMessage {
content: cont1.clone(),
from,
})
@ -242,16 +268,18 @@ impl Handler<ClientObservableMessage> for ClientManager {
ctx: &mut Self::Context,
) -> Self::Result {
use crate::client_management::client::ClientObservableMessage::{
GetClients,
GetGlobalMessages,
GlobalMessage,
Message,
Update,
};
match msg {
Message(addr, uuid, content) => self.send_message_request(ctx, addr, uuid, content),
GlobalMessage(addr, content) => {
self.send_global_message_request(ctx, addr, content)
}
Update(addr) => self.send_update(ctx, addr),
GetClients(addr) => self.send_client_list(ctx, addr),
GetGlobalMessages(addr) => self.send_global_messages(ctx, addr),
}
}
}

View File

@ -36,5 +36,9 @@ pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
// use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{
NetworkDataMessage, NetworkDataOutput, NetworkManager, NetworkMessage, NetworkOutput,
NetworkDataMessage,
NetworkDataOutput,
NetworkManager,
NetworkMessage,
NetworkOutput,
};

View File

@ -1,9 +1,15 @@
use crate::client_management::ClientManagerDataResponse::Clients;
use crate::client_management::{ClientManager, ClientManagerDataMessage};
use crate::scripting::scriptable_client::ScriptableClient;
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::{
client_management::{
ClientManager,
ClientManagerDataMessage,
ClientManagerDataResponse::Clients,
},
scripting::scriptable_client::ScriptableClient,
};
#[derive(Clone)]
pub(crate) struct ScriptableClientManager {
addr: Addr<ClientManager>,
@ -16,10 +22,8 @@ impl UserData for ScriptableClientManager {
if let Ok(Clients(clients)) = res {
let clients: Vec<ScriptableClient> = clients
.into_iter()
.map(|a| a.upgrade())
.filter(|o| o.is_some())
.map(|o| o.unwrap())
.map(|a| ScriptableClient::from(a))
.filter_map(|a| a.upgrade())
.map(ScriptableClient::from)
.collect();
Ok(clients)