Grpc-manager #22

Merged
michael-bailey merged 28 commits from grpc-manager into master 2024-05-30 19:42:42 +00:00
44 changed files with 39 additions and 2653 deletions
Showing only changes of commit 9b5fa87952 - Show all commits

View File

@ -1,87 +0,0 @@
use actix::{Actor, Addr, Context, Handler};
use foundation::models::message::Message;
use uuid::Uuid;
use crate::client_management::chat_manager::messages::{
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
};
pub(crate) struct ChatManager {
messages: Vec<Message>,
}
impl ChatManager {
pub fn new() -> Addr<Self> {
Self {
messages: Vec::new(),
}
.start()
}
// no need for a remove methods because this is a read only system
fn add_message(
&mut self,
_ctx: &mut Context<Self>,
id: Uuid,
content: String,
) {
println!(
"[ChatManager] add_message id: {:?} content: {:?}",
id, content
);
self.messages.push(Message::new(id, content))
}
fn get_messages(&self, _ctx: &mut Context<Self>) -> ChatManagerDataResponse {
println!("[ChatManager] getting messages");
ChatManagerDataResponse::GotMessages(self.messages.clone())
}
fn get_message(
&self,
_ctx: &mut Context<Self>,
index: usize,
) -> ChatManagerDataResponse {
println!("[ChatManager] getting message index: {:?}", index);
ChatManagerDataResponse::GotMessage(self.messages.get(index).cloned())
}
}
impl Actor for ChatManager {
type Context = Context<Self>;
}
impl Handler<ChatManagerMessage> for ChatManager {
type Result = ();
fn handle(
&mut self,
msg: ChatManagerMessage,
ctx: &mut Self::Context,
) -> Self::Result {
println!("[ChatManager] got message: {:?}", msg);
match msg {
ChatManagerMessage::AddMessage(id, content) => {
self.add_message(ctx, id, content)
}
}
}
}
impl Handler<ChatManagerDataMessage> for ChatManager {
type Result = ChatManagerDataResponse;
fn handle(
&mut self,
msg: ChatManagerDataMessage,
ctx: &mut Self::Context,
) -> Self::Result {
println!("[ChatManager] got message: {:?}", msg);
match msg {
ChatManagerDataMessage::GetMessages => self.get_messages(ctx),
ChatManagerDataMessage::GetMessage(index) => self.get_message(ctx, index),
}
}
}

View File

@ -1,23 +0,0 @@
use actix::{Message as ActixMessage, MessageResponse};
use foundation::models::message::Message;
use uuid::Uuid;
#[derive(ActixMessage, Debug)]
#[rtype(result = "()")]
pub enum ChatManagerMessage {
AddMessage(Uuid, String),
}
#[allow(dead_code)]
#[derive(ActixMessage, Debug)]
#[rtype(result = "ChatManagerDataResponse")]
pub enum ChatManagerDataMessage {
GetMessages,
GetMessage(usize),
}
#[derive(MessageResponse)]
pub enum ChatManagerDataResponse {
GotMessages(Vec<Message>),
GotMessage(Option<Message>),
}

View File

@ -1,16 +0,0 @@
//! Contains all the structures for managing chat storage.
//! it contains:
//! - ChatManager
//! - Messages
//! - Mesage type
mod actor;
mod messages;
pub(crate) use actor::ChatManager;
pub(crate) use messages::{
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
};

View File

@ -1,287 +0,0 @@
use actix::{Actor, Addr, AsyncContext, Context, Handler, WeakRecipient};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamOut},
ClientDetails,
};
use uuid::Uuid;
use crate::{
client_management::client::messages::{
ClientDataMessage,
ClientDataResponse,
ClientMessage,
ClientObservableMessage,
},
network::{Connection, ConnectionObservableOutput},
prelude::messages::{ConnectionMessage, ObservableMessage},
};
/// # Client
/// This represents a connected client.
/// it will handle received message from a connection.
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<WeakRecipient<ClientObservableMessage>>,
}
impl Client {
pub(crate) fn new(
connection: Addr<Connection>,
details: ClientDetails,
) -> Addr<Self> {
Client {
connection,
details,
observers: Vec::default(),
}
.start()
}
#[inline]
fn get_clients(&self, ctx: &mut Context<Client>) {
println!("[Client] getting clients");
use ClientObservableMessage::GetClients;
self.broadcast(GetClients(ctx.address().downgrade()));
}
#[inline]
fn get_messages(&self, ctx: &mut Context<Client>) {
println!("[Client] getting messages");
use ClientObservableMessage::GetGlobalMessages;
self.broadcast(GetGlobalMessages(ctx.address().downgrade()));
}
#[inline]
fn send_message(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
println!("[Client] sending message");
use ClientObservableMessage::Message;
self.broadcast(Message(ctx.address().downgrade(), to, content));
}
#[inline]
fn send_gloal_message(&self, ctx: &mut Context<Client>, content: String) {
println!("[Client] sending global message");
use ClientObservableMessage::GlobalMessage;
self.broadcast(GlobalMessage(ctx.address().downgrade(), content));
}
#[inline]
fn disconnect(&self, _ctx: &mut Context<Client>) {
println!("[Client] disconnecting");
use ClientObservableMessage::Disconnecting;
self.broadcast(Disconnecting(self.details.uuid));
}
#[inline]
fn broadcast(&self, message: ClientObservableMessage) {
println!("[Client] broadcasting message");
for recp in &self.observers {
if let Some(upgraded) = recp.upgrade() {
upgraded.do_send(message.clone());
}
}
}
pub(crate) fn _error(&self, msg: String) {
println!("[Client] sending error: {}", msg);
use serde_json::to_string;
use ConnectionMessage::SendData;
let msg = to_string::<ClientStreamOut>(&ClientStreamOut::Error { msg })
.expect("[Client] This should not fail");
self.connection.do_send(SendData(msg));
}
}
impl Actor for Client {
type Context = Context<Self>;
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::ClientStreamOut::Connected;
use serde_json::to_string;
use crate::{
network::ConnectionMessage::SendData,
prelude::messages::ObservableMessage::Subscribe,
};
println!("[Client] started");
self
.connection
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Subscribe(
ctx.address().recipient().downgrade(),
));
self
.connection
.do_send(SendData(to_string::<ClientStreamOut>(&Connected).unwrap()));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::ClientStreamOut::Disconnected;
use serde_json::to_string;
use crate::{
network::ConnectionMessage::SendData,
prelude::messages::ObservableMessage::Unsubscribe,
};
println!("[Client] stopped");
self
.connection
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Unsubscribe(
ctx.address().recipient().downgrade(),
));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Disconnected).unwrap(),
));
}
}
impl Handler<ClientDataMessage> for Client {
type Result = ClientDataResponse;
fn handle(
&mut self,
msg: ClientDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
ClientDataMessage::Details => {
ClientDataResponse::Details(self.details.clone())
}
_ => todo!(),
}
}
}
// Handles incoming messages to the client.
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(
&mut self,
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use foundation::messages::client::ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
};
use serde_json::to_string;
use crate::{
client_management::client::messages::ClientMessage::{
ClientList,
ClientlySentMessage,
GloballySentMessage,
MessageList,
},
network::ConnectionMessage::SendData,
};
match msg {
ClientList(clients) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&ConnectedClients { clients })
.expect("[Client] Failed to encode string"),
)),
MessageList(messages) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalChatMessages { messages })
.expect("[Client] Failed to encode string"),
)),
ClientlySentMessage { content, from } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
))
}
GloballySentMessage { from, content } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage { from, content })
.expect("[Client] Failed to encode string"),
))
}
}
}
}
// Handles outputs from the connection.
impl Handler<ConnectionObservableOutput> for Client {
type Result = ();
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
GetClients,
GetMessages,
SendGlobalMessage,
SendMessage,
};
use serde_json::from_str;
use crate::network::ConnectionObservableOutput::RecvData;
match msg {
RecvData(_sender, data) => {
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => {
self.send_gloal_message(ctx, content)
}
Disconnect => self.disconnect(ctx),
}
}
}
ConnectionObservableOutput::ConnectionClosed(_) => self
.broadcast(ClientObservableMessage::Disconnecting(self.details.uuid)),
}
}
}
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ClientObservableMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
use crate::prelude::messages::ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Client] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Client] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a.upgrade() != r)
.collect();
}
}
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] Dropping value")
}
}

View File

@ -1,44 +0,0 @@
use actix::{Message, MessageResponse, WeakAddr};
use foundation::{models::message::Message as StoredMessage, ClientDetails};
use uuid::Uuid;
use crate::client_management::client::Client;
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientMessage {
ClientList(Vec<ClientDetails>),
MessageList(Vec<StoredMessage>),
ClientlySentMessage { from: Uuid, content: String },
GloballySentMessage { from: Uuid, content: String },
}
#[derive(Message)]
#[rtype(result = "ClientDataResponse")]
pub enum ClientDataMessage {
Details,
Uuid,
Username,
Address,
}
#[derive(MessageResponse)]
pub enum ClientDataResponse {
Details(ClientDetails),
Uuid(Uuid),
Username(String),
Address(String),
}
/// message that is sent to all observers of the current client.
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub enum ClientObservableMessage {
Message(WeakAddr<Client>, Uuid, String),
GlobalMessage(WeakAddr<Client>, String),
GetClients(WeakAddr<Client>),
GetGlobalMessages(WeakAddr<Client>),
Disconnecting(Uuid),
}

View File

@ -1,5 +0,0 @@
mod actor;
mod messages;
pub use actor::Client;
pub use messages::*;

View File

@ -1,336 +0,0 @@
use std::collections::HashMap;
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
WeakAddr,
WeakRecipient,
};
use foundation::ClientDetails;
use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::client_management::{
chat_manager::{
ChatManager,
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
},
client::{
Client,
ClientDataMessage,
ClientDataResponse,
ClientDataResponse::Details,
ClientMessage,
ClientObservableMessage,
},
messages::{
ClientManagerDataMessage,
ClientManagerDataResponse,
ClientManagerDataResponse::{ClientCount, Clients},
ClientManagerMessage,
ClientManagerOutput,
},
};
pub struct ClientManager {
clients: HashMap<Uuid, Addr<Client>>,
chat_manager: Addr<ChatManager>,
_delegate: WeakRecipient<ClientManagerOutput>,
}
impl ClientManager {
pub(crate) fn new(
delegate: WeakRecipient<ClientManagerOutput>,
) -> Addr<Self> {
ClientManager {
_delegate: delegate,
clients: HashMap::new(),
chat_manager: ChatManager::new(),
}
.start()
}
pub(crate) fn send_client_list(
&self,
ctx: &mut Context<Self>,
sender: WeakAddr<Client>,
) {
println!("[ClientManager] sending update to client");
use crate::client_management::client::ClientMessage::ClientList;
if let Some(to_send) = sender.upgrade() {
let client_addr: Vec<Addr<Client>> =
self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage::Details))
.map(|val| {
if let Details(details) = val.unwrap() {
details
} else {
ClientDetails::default()
}
})
.collect();
let fut = wrap_future(async move {
let a: Vec<_> = collection.await;
let _ = to_send.send(ClientList(a)).await;
});
ctx.spawn(fut);
}
}
pub(crate) fn send_global_messages(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
) {
if let Some(to_send) = sender.upgrade() {
let fut = wrap_future(
self.chat_manager.send(ChatManagerDataMessage::GetMessages),
)
.map(move |out, _a, _ctx| {
if let Ok(ChatManagerDataResponse::GotMessages(res)) = out {
to_send.do_send(ClientMessage::MessageList(res));
}
});
ctx.spawn(fut);
};
}
pub(crate) fn send_message_request(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
to: Uuid,
content: String,
) {
println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> =
self.clients.values().cloned().collect();
let collection = tokio_stream::iter(client_addr.clone())
.then(|addr| addr.send(ClientDataMessage::Details))
.map(|val| val.unwrap())
.map(|val: ClientDataResponse| {
if let Details(details) = val {
details
} else {
ClientDetails::default()
}
})
.collect();
let fut = wrap_future(async move {
if let Some(sender) = sender.upgrade() {
let sender_details: ClientDataResponse =
sender.send(ClientDataMessage::Details).await.unwrap();
let from = if let Details(details) = sender_details {
details.uuid
} else {
ClientDetails::default().uuid
};
let client_details: Vec<ClientDetails> = collection.await;
let pos = client_details.iter().position(|i| i.uuid == to);
if let Some(pos) = pos {
client_addr[pos]
.send(ClientMessage::ClientlySentMessage { content, from })
.await
.expect("TODO: panic message");
}
}
});
ctx.spawn(fut);
}
pub(crate) fn send_global_message_request(
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
content: String,
) {
println!("[ClientManager] sending message to client");
use crate::client_management::client::ClientMessage::GloballySentMessage;
let client_addr: Vec<Addr<Client>> =
self.clients.values().cloned().collect();
if let Some(sender) = sender.upgrade() {
let cm = self.chat_manager.clone();
let snd1 = sender.clone();
let snd2 = sender;
let cont1 = content.clone();
let cont2 = content;
let fut = wrap_future(async move {
println!("[ClientManager] sending to all clients");
let details: ClientDataResponse =
snd1.send(ClientDataMessage::Details).await.unwrap();
let from = if let Details(details) = details {
details.uuid
} else {
ClientDetails::default().uuid
};
let collection = tokio_stream::iter(client_addr)
.then(move |addr| {
addr.send(GloballySentMessage {
content: cont1.clone(),
from,
})
})
.collect();
let _: Vec<_> = collection.await;
});
let chat_manager_fut = wrap_future(async move {
println!("[ClientManager] storing in chat manager");
let details: ClientDataResponse =
snd2.send(ClientDataMessage::Details).await.unwrap();
let from = if let Details(details) = details {
details.uuid
} else {
ClientDetails::default().uuid
};
let _ = cm.send(ChatManagerMessage::AddMessage(from, cont2)).await;
});
ctx.spawn(fut);
ctx.spawn(chat_manager_fut);
}
}
fn add_client(
&mut self,
ctx: &mut Context<ClientManager>,
uuid: Uuid,
addr: Addr<Client>,
) {
println!("[ClientManager] adding client");
use crate::prelude::messages::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
println!("[ClientManager] sending subscribe message to client");
addr.do_send(Subscribe(recp.downgrade()));
self.clients.insert(uuid, addr);
for (_k, v) in self.clients.clone() {
self.send_client_list(ctx, v.downgrade())
}
}
fn remove_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid) {
println!("[ClientManager] removing client");
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
let addr = self.clients.remove(&uuid);
if let Some(addr) = addr {
println!("[ClientManager] sending unsubscribe message to client");
addr.do_send(Unsubscribe(recp.downgrade()));
}
println!("[ClientManager] sending client list to other clients");
for (_k, v) in self.clients.iter() {
self.send_client_list(ctx, v.downgrade())
}
}
fn disconnect_client(
&mut self,
ctx: &mut Context<ClientManager>,
uuid: Uuid,
) {
println!("[ClientManager] disconnecting client");
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp.downgrade()));
self.remove_client(ctx, uuid);
}
}
}
impl Actor for ClientManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[ClientManager] started");
}
}
impl Handler<ClientManagerMessage> for ClientManager {
type Result = ();
fn handle(
&mut self,
msg: ClientManagerMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ClientManagerMessage::{AddClient, RemoveClient};
match msg {
// todo: Add subscription to the client.
AddClient(uuid, addr) => self.add_client(ctx, uuid, addr),
// todo: remove subscription to client.
RemoveClient(uuid) => self.remove_client(ctx, uuid),
}
}
}
impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(
&mut self,
msg: ClientObservableMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use crate::client_management::client::ClientObservableMessage::{
Disconnecting,
GetClients,
GetGlobalMessages,
GlobalMessage,
Message,
};
match msg {
Message(sender, to, content) => {
self.send_message_request(ctx, sender, to, content)
}
GlobalMessage(sender, content) => {
self.send_global_message_request(ctx, sender, content)
}
GetClients(sender) => self.send_client_list(ctx, sender),
GetGlobalMessages(sender) => self.send_global_messages(ctx, sender),
Disconnecting(uuid) => self.disconnect_client(ctx, uuid),
}
}
}
impl Handler<ClientManagerDataMessage> for ClientManager {
type Result = ClientManagerDataResponse;
fn handle(
&mut self,
msg: ClientManagerDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
ClientManagerDataMessage::ClientCount => {
ClientCount(self.clients.values().count())
}
ClientManagerDataMessage::Clients => {
Clients(self.clients.values().map(|a| a.downgrade()).collect())
}
}
}
}

View File

@ -1,32 +0,0 @@
use actix::{Addr, Message, MessageResponse, WeakAddr};
use uuid::Uuid;
use crate::client_management::{client::Client, ClientManager};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerMessage {
AddClient(Uuid, Addr<Client>),
#[allow(dead_code)]
RemoveClient(Uuid),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerOutput {
#[allow(dead_code)]
UpdateRequest(Addr<ClientManager>),
}
#[derive(Message)]
#[rtype(result = "ClientManagerDataResponse")]
pub enum ClientManagerDataMessage {
ClientCount,
Clients,
}
#[derive(MessageResponse)]
pub enum ClientManagerDataResponse {
ClientCount(usize),
Clients(Vec<WeakAddr<Client>>),
}

View File

@ -1,28 +0,0 @@
//! Contains code that handles the lifecycle of connected clients
//!
//! This collects all parts used by the client manager actor
//!
//! It's responsibility is:
//! - to handle client to client communication.
//! - to handle server to client communication.
//! - to handler client lifecycle events such as dicconection.
mod chat_manager;
pub mod client;
mod client_manager;
mod messages;
#[allow(unused_imports)]
use chat_manager::{
ChatManager,
ChatManagerDataMessage,
ChatManagerDataResponse,
ChatManagerMessage,
};
pub(crate) use client_manager::ClientManager;
pub(crate) use messages::{
ClientManagerDataMessage,
ClientManagerDataResponse,
ClientManagerMessage,
ClientManagerOutput,
};

View File

@ -1,14 +0,0 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Arguments {
#[clap(short, long, value_parser = clap::value_parser!(u16).range(1..))]
pub port: Option<u16>,
#[clap(short, long, value_parser)]
pub name: Option<String>,
#[clap(short, long, value_parser)]
pub owner: Option<String>,
}

View File

@ -1,32 +0,0 @@
use actix::{Actor, Addr};
use crate::config_manager::{arg_parser::Arguments, ConfigManager};
pub(super) struct Builder {
pub(super) file_path: String,
pub(super) args: Option<Arguments>,
}
impl Builder {
pub(super) fn new() -> Self {
Self {
file_path: "./config_file.toml".to_owned(),
args: None,
}
}
#[allow(dead_code)]
pub fn config_path(mut self, path: impl Into<String>) -> Self {
self.file_path = path.into();
self
}
pub fn args(mut self, args: Arguments) -> Self {
self.args.replace(args);
self
}
pub(super) fn build(self) -> Addr<ConfigManager> {
ConfigManager::from(self).start()
}
}

View File

@ -1,175 +0,0 @@
use std::{
collections::BTreeMap,
fs::{File, OpenOptions},
io::Read,
sync::Once,
};
use actix::{Actor, Addr, Context, Handler, Recipient};
use clap::Parser;
use toml::Value;
use crate::{
config_manager::{
arg_parser::Arguments,
builder::Builder,
messages::{
ConfigManagerDataMessage,
ConfigManagerDataResponse,
ConfigManagerOutput,
},
types::ConfigValue::{Dict, Number, String as ConfigString},
ConfigValue,
},
prelude::messages::ObservableMessage,
};
static mut SHARED: Option<Addr<ConfigManager>> = None;
static INIT: Once = Once::new();
#[allow(dead_code)]
pub(crate) struct ConfigManager {
file: File,
stored: ConfigValue,
root: ConfigValue,
subscribers: Vec<Recipient<ObservableMessage<ConfigManagerOutput>>>,
}
// static methods
impl ConfigManager {
pub fn shared() -> Addr<Self> {
INIT.call_once(|| {
let builder = Self::create().args(Arguments::parse()).build();
unsafe { SHARED = Some(builder) }
});
unsafe { SHARED.clone().unwrap() }
}
pub(super) fn create() -> Builder {
Builder::new()
}
}
// instance methods
impl ConfigManager {
pub fn get_value(&self, key: String) -> Option<ConfigValue> {
if let Dict(dict) = &self.root {
dict.get(&key).cloned()
} else {
None
}
}
pub fn set_value(
&mut self,
key: String,
value: Option<ConfigValue>,
) -> Option<ConfigValue> {
value.and_then(|value| {
if let (Dict(stored), Dict(root)) = (&mut self.stored, &mut self.root) {
stored.insert(key.clone(), value.clone());
root.insert(key.clone(), value.clone());
Some(value)
} else {
None
}
})
}
// this doesn't work for now
pub fn soft_set_value(
&mut self,
key: String,
value: Option<ConfigValue>,
) -> Option<ConfigValue> {
value.and_then(|value| {
if let Dict(root) = &mut self.root {
root.insert(key, value.clone());
Some(value)
} else {
None
}
})
}
}
impl Actor for ConfigManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[ConfigManager] starting");
println!("[ConfigManager] started");
}
}
impl Handler<ConfigManagerDataMessage> for ConfigManager {
type Result = ConfigManagerDataResponse;
fn handle(
&mut self,
msg: ConfigManagerDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ConfigManagerDataResponse::{GotValue, SetValue, SoftSetValue};
match msg {
ConfigManagerDataMessage::GetValue(val) => GotValue(self.get_value(val)),
ConfigManagerDataMessage::SetValue(key, value) => {
SetValue(key.clone(), self.set_value(key, value))
}
ConfigManagerDataMessage::SoftSetValue(key, value) => {
SoftSetValue(key.clone(), self.soft_set_value(key, value))
}
}
}
}
impl From<Builder> for ConfigManager {
fn from(builder: Builder) -> Self {
println!("got args: {:#?}", builder.args);
let mut file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(builder.file_path)
.ok()
.unwrap();
let mut output = String::new();
file
.read_to_string(&mut output)
.expect("failed to read from file");
let stored = output
.parse::<Value>()
.map(|v| v.into())
.ok()
.unwrap_or_else(|| Dict(BTreeMap::new()));
println!("[ConfigManager] got stored: {:?}", stored);
let mut root = stored.clone();
if let Dict(root) = &mut root {
builder.args.map(|v| {
v.port
.map(|p| root.insert("Network.Port".to_owned(), Number(p.into())));
v.name.map(|n| {
root.insert("Server.Name".to_owned(), ConfigString(n.into()))
});
v.owner.map(|o| {
root.insert("Server.Owner".to_owned(), ConfigString(o.into()))
});
});
}
Self {
file,
root,
stored,
subscribers: Vec::default(),
}
}
}

View File

@ -1,27 +0,0 @@
use actix::{Message, MessageResponse};
use crate::config_manager::types::ConfigValue;
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub enum ConfigManagerOutput {
#[allow(dead_code)]
ConfigUpdated(String, ConfigValue),
}
#[derive(Message, Debug)]
#[rtype(result = "ConfigManagerDataResponse")]
pub enum ConfigManagerDataMessage {
GetValue(String),
#[allow(dead_code)]
SetValue(String, Option<ConfigValue>),
#[allow(dead_code)]
SoftSetValue(String, Option<ConfigValue>),
}
#[derive(MessageResponse, Debug)]
pub enum ConfigManagerDataResponse {
GotValue(Option<ConfigValue>),
SetValue(String, Option<ConfigValue>),
SoftSetValue(String, Option<ConfigValue>),
}

View File

@ -1,16 +0,0 @@
//! # config_manager
//! This module contains all the code that deals with server configuration.
//! It tries to implement a singleton actor, that will be fetchable globaly.
pub mod arg_parser;
mod builder;
mod config_manager;
mod messages;
mod types;
pub(crate) use config_manager::ConfigManager;
pub(crate) use messages::{
ConfigManagerDataMessage,
ConfigManagerDataResponse,
};
pub(crate) use types::ConfigValue;

View File

@ -1,51 +0,0 @@
use std::collections::BTreeMap;
use toml::value::Value;
/// # ConfigValue
/// Each value type that can be used within a config file.
/// gets used when reading and writing to a config file.
#[derive(Clone, Debug)]
pub enum ConfigValue {
Dict(BTreeMap<String, Self>),
Array(Vec<Self>),
String(String),
Number(i64),
Float(f64),
Bool(bool),
}
impl From<ConfigValue> for Value {
fn from(v: ConfigValue) -> Self {
match v {
ConfigValue::Dict(dict) => {
Value::Table(dict.into_iter().map(|(k, v)| (k, v.into())).collect())
}
ConfigValue::Array(arr) => {
Value::Array(arr.into_iter().map(|v| v.into()).collect())
}
ConfigValue::String(s) => Value::String(s),
ConfigValue::Number(n) => Value::Integer(n),
ConfigValue::Float(f) => Value::Float(f),
ConfigValue::Bool(b) => Value::Boolean(b),
}
}
}
impl From<Value> for ConfigValue {
fn from(v: Value) -> Self {
match v {
Value::Table(dict) => ConfigValue::Dict(
dict.into_iter().map(|(k, v)| (k, v.into())).collect(),
),
Value::Array(arr) => {
ConfigValue::Array(arr.into_iter().map(|v| v.into()).collect())
}
Value::String(s) => ConfigValue::String(s),
Value::Integer(n) => ConfigValue::Number(n),
Value::Float(f) => ConfigValue::Float(f),
Value::Boolean(b) => ConfigValue::Bool(b),
Value::Datetime(d) => ConfigValue::String(d.to_string()),
}
}
}

View File

@ -1,17 +1,8 @@
//! This is the main module of the actix server.
//! It starts the server and sleeps for the remainder of the program
pub(crate) mod client_management;
pub(crate) mod config_manager;
pub(crate) mod network;
pub(crate) mod prelude;
pub(crate) mod scripting;
pub(crate) mod server;
pub mod listener_manager;
pub mod network_connection;
pub mod os_signal_manager;
pub mod server_va;

View File

@ -0,0 +1,20 @@
use std::net::SocketAddr;
use tokio::net::TcpStream;
use crate::network_connection::NetworkConnection;
struct ClientConnection {
stream: TcpStream,
_addr: SocketAddr,
}
impl From<NetworkConnection> for ClientConnection {
fn from(value: NetworkConnection) -> Self {
Self {
stream: value.
}
}
}
impl ClientConnection {}

View File

@ -1,266 +0,0 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use actix::{
clock::timeout,
fut::wrap_future,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
WeakRecipient,
};
use futures::{future::join_all, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
sync::Mutex,
};
use super::{ConnectionMessage, ConnectionObservableOutput};
use crate::{
network::connection::messages::ConnectionPrivateMessage,
prelude::messages::ObservableMessage,
};
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
_address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
let addr = Connection {
write_half: Arc::new(Mutex::new(write_half)),
_address: address,
observers: Vec::new(),
}
.start();
addr.do_send(ConnectionPrivateMessage::DoRead(BufReader::new(read_half)));
addr
}
#[inline]
fn broadcast(
&self,
ctx: &mut <Self as Actor>::Context,
data: ConnectionObservableOutput,
) {
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let data = data.clone();
async move {
if let Some(r) = r.upgrade() {
let _ = r.send(data).await;
}
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
#[inline]
fn do_read(
&mut self,
ctx: &mut <Self as Actor>::Context,
mut buf_reader: BufReader<ReadHalf<TcpStream>>,
) {
let weak_addr = ctx.address().downgrade();
let read_fut = async move {
let dur = Duration::from_millis(100);
let mut buffer_string: String = Default::default();
let read_fut = buf_reader.read_line(&mut buffer_string);
let Ok(Ok(len)) = timeout(dur, read_fut).await else {
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
return;
};
if len == 0 {
println!("[Connection] readline returned 0");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::Close);
}
return;
}
if let Some(addr) = weak_addr.upgrade() {
let _ = addr
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
buffer_string.clone(),
),
))
.await;
}
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
};
ctx.spawn(wrap_future(read_fut));
}
fn close_connection(&self, ctx: &mut <Self as Actor>::Context) {
use ConnectionObservableOutput::ConnectionClosed;
self.broadcast(ctx, ConnectionClosed(ctx.address().downgrade()))
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[Connection] started");
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionObservableOutput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
if let Some(recp) = recp.upgrade() {
recp.do_send(ConnectionClosed(ctx.address().downgrade()))
}
}
}
}
impl Handler<ObservableMessage<ConnectionObservableOutput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionObservableOutput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionObservableOutput>>>::Result{
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a.upgrade() != r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = Arc::downgrade(&self.write_half);
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
let Some(writer) = writer.upgrade() else {
return;
};
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
let _ = writeln!(&mut buffer, "{}", d.as_str());
let _ = lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
// impl Handler<SelfMessage> for Connection {
// type Result = ();
// fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result {
// use ConnectionObservableOutput::RecvData;
// use SelfMessage::UpdateObserversWithData;
// match msg {
// UpdateObserversWithData(data) => {
// let send = ctx.address();
// let addr = self.address;
// // this is a mess
// let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
// .observers
// .iter()
// .cloned()
// .map(|r| {
// let send = send.clone();
// let data = data.clone();
// async move {
// let _ = r.send(RecvData(send, addr, data)).await;
// }
// .boxed()
// })
// .collect();
// let _ = ctx.spawn(wrap_future(async {
// join_all(futs).await;
// }));
// }
// };
// }
// }
impl Handler<ConnectionPrivateMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionPrivateMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionPrivateMessage::Broadcast;
match msg {
Broadcast(data) => self.broadcast(ctx, data),
ConnectionPrivateMessage::DoRead(buf_reader) => {
self.do_read(ctx, buf_reader)
}
ConnectionPrivateMessage::Close => self.close_connection(ctx),
};
}
}
impl Drop for Connection {
fn drop(&mut self) {
println!("[Connection] Dropping value")
}
}

View File

@ -1,30 +0,0 @@
use actix::{Message, WeakAddr};
use tokio::{
io::{BufReader, ReadHalf},
net::TcpStream,
};
use crate::prelude::actors::Connection;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub(crate) enum ConnectionObservableOutput {
RecvData(WeakAddr<Connection>, String),
ConnectionClosed(WeakAddr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput),
DoRead(BufReader<ReadHalf<TcpStream>>),
Close,
}

View File

@ -1,5 +0,0 @@
mod actor;
mod messages;
pub(crate) use actor::Connection;
pub(crate) use messages::{ConnectionMessage, ConnectionObservableOutput};

View File

@ -1,170 +0,0 @@
use std::net::SocketAddr;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::InitiatorOutput,
prelude::{
actors::Connection,
messages::{
ConnectionMessage,
ConnectionObservableOutput,
ObservableMessage,
},
},
};
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let (Some(delegate), Some(sender)) =
(self.delegate.upgrade(), sender.upgrade())
{
match msg {
Info => {
delegate.do_send(InfoRequest(ctx.address().downgrade(), sender))
}
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address().downgrade(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: WeakAddr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
if let Some(sender) = sender.upgrade() {
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error {
msg: "Error in connection initiator?".to_owned(),
})
.unwrap(),
));
sender.do_send(CloseConnection);
}
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use ConnectionMessage::SendData;
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient().downgrade()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient().downgrade()));
}
}
impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, data) = msg {
self.handle_request(sender, ctx, data)
}
}
}
impl Drop for ConnectionInitiator {
fn drop(&mut self) {
println!("[ConnectionInitiator] Dropping value")
}
}

View File

@ -1,15 +0,0 @@
use actix::{Addr, Message, WeakAddr};
use foundation::ClientDetails;
use crate::prelude::actors::{Connection, ConnectionInitiator};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(WeakAddr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(
WeakAddr<ConnectionInitiator>,
Addr<Connection>,
ClientDetails,
),
}

View File

@ -1,5 +0,0 @@
mod actor;
mod messages;
pub(crate) use actor::ConnectionInitiator;
pub(crate) use messages::InitiatorOutput;

View File

@ -0,0 +1,11 @@
use std::collections::HashMap;
use uuid::Uuid;
struct ClientStore {
conn:
}
struct conneciton_manager {
client_map: HashMap<Uuid, Client>,
}

View File

@ -1,110 +0,0 @@
use std::net::{SocketAddr, ToSocketAddrs};
use actix::{
fut::wrap_future,
Actor,
Addr,
AsyncContext,
Context,
Handler,
Message,
SpawnHandle,
WeakRecipient,
};
use tokio::net::TcpListener;
use crate::network::connection::Connection;
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerOutput {
NewConnection(Addr<Connection>),
}
pub(super) struct NetworkListener {
address: SocketAddr,
delegate: WeakRecipient<ListenerOutput>,
looper: Option<SpawnHandle>,
}
impl NetworkListener {
pub(crate) fn new<T: ToSocketAddrs>(
address: T,
delegate: WeakRecipient<ListenerOutput>,
) -> Addr<NetworkListener> {
NetworkListener {
address: address
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0],
delegate,
looper: None,
}
.start()
}
/// called when the actor is to start listening
fn start_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("[NetworkListener] started listening");
let addr = self.address;
let delegate = self.delegate.clone();
ctx.spawn(wrap_future(async move {
use ListenerOutput::NewConnection;
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
println!("[NetworkListener] accepted socket");
let conn = Connection::new(stream, addr);
let Some(delegate) = delegate.upgrade() else {
break;
};
println!("[NetworkListener] sending connection to delegate");
delegate.do_send(NewConnection(conn))
}
}));
}
/// called when the actor is to stop listening
fn stop_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("[NetworkListener] stopped listening");
if let Some(fut) = self.looper.take() {
ctx.cancel_future(fut);
}
}
}
impl Actor for NetworkListener {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] started");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("[NetworkListener] stopped");
}
}
impl Handler<ListenerMessage> for NetworkListener {
type Result = ();
fn handle(
&mut self,
msg: ListenerMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> Self::Result {
use ListenerMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listening(ctx),
StopListening => self.stop_listening(ctx),
}
}
}

View File

@ -1,48 +1,4 @@
#![doc = r"# Network
This module contains network code for the server.
This includes:
- The network manager: For that handles all server network connections.
- The network listener: For listening for connections on a port.
- The conneciton: An abstraction over sockets sockets, for actix.
- The connection initiator: For initiating new connections to the server
## Diagrams
```mermaid
sequenceDiagram
Server->>NetworkManager: creates
NetworkManager->>NetworkListener: create
NetworkManager->>+NetworkListener: start listening
loop async tcp listen
NetworkListener->>NetworkListener: check for new connections
end
NetworkListener->>Connection: create from socket
NetworkListener->>NetworkManager: new connection
NetworkManager->>Server: new connection
Server->>ConnectionInitiator: create with connection
```"]
mod connection;
mod connection_initiator;
mod listener;
mod network_manager;
pub(crate) use connection::{
Connection,
ConnectionMessage,
ConnectionObservableOutput,
};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
// use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{
NetworkDataMessage,
NetworkDataOutput,
NetworkManager,
NetworkMessage,
NetworkOutput,
};
pub mod client_connection;
pub mod connection_manager;
pub mod listener_manager;
pub mod network_connection;

View File

@ -1,256 +0,0 @@
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
WeakAddr,
WeakRecipient,
};
use foundation::ClientDetails;
use crate::{
config_manager::{ConfigManager, ConfigManagerDataMessage, ConfigValue},
network::{
listener::{ListenerMessage, ListenerOutput, NetworkListener},
network_manager::{
messages::{NetworkMessage, NetworkOutput},
Builder,
},
Connection,
ConnectionInitiator,
InitiatorOutput,
NetworkDataMessage,
NetworkDataOutput,
},
};
/// # NetworkManager
/// this struct will handle all networking functionality.
///
pub struct NetworkManager {
config_manager: WeakAddr<ConfigManager>,
listener_addr: Option<Addr<NetworkListener>>,
delegate: WeakRecipient<NetworkOutput>,
initiators: Vec<Addr<ConnectionInitiator>>,
}
impl NetworkManager {
pub fn new(delegate: WeakRecipient<NetworkOutput>) -> Addr<NetworkManager> {
NetworkManager {
listener_addr: None,
delegate,
initiators: Vec::new(),
config_manager: ConfigManager::shared().downgrade(),
}
.start()
}
pub fn create(delegate: WeakRecipient<NetworkOutput>) -> Builder {
Builder::new(delegate)
}
fn start_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StartListening;
println!("[NetworkManager] got Listen message");
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StartListening);
}
}
fn stop_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StopListening;
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StopListening);
}
}
/// Handles a new connection from the Listener.
/// This creates a new ConnectionInitaliser.
/// This completes the first part of the protocol.
#[inline]
fn new_connection(
&mut self,
ctx: &mut <Self as Actor>::Context,
connection: Addr<Connection>,
) {
println!("[NetworkManager] Got new connection");
let init = ConnectionInitiator::new(
ctx.address().recipient().downgrade(),
connection,
);
self.initiators.push(init);
}
#[inline]
fn remove_initiator(&mut self, sender: WeakAddr<ConnectionInitiator>) {
if let Some(sender) = sender.upgrade() {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
let _ = self.initiators.remove(index);
}
}
/// handles a initiator client request
/// this will, forward the conenction and client details
/// to the server actor to be dispatched to the appropriate
/// manager
#[inline]
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
client_details: ClientDetails,
) {
use NetworkOutput::NewClient;
println!("[NetworkManager] recieved client request");
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(NewClient(connection, client_details));
}
self.remove_initiator(sender);
}
/// This sends the connection to the server
/// which will in turn take over the protocol by sending
/// the servers infomation.
#[inline]
fn info_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
) {
use NetworkOutput::InfoRequested;
println!("[NetworkManager] Got recieved info request");
if let Some(delegate) = self.delegate.upgrade() {
delegate.do_send(InfoRequested(connection));
}
self.remove_initiator(sender);
}
}
impl Actor for NetworkManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("[NetworkManager] Starting");
let config_mgr = self.config_manager.clone().upgrade();
if let Some(config_mgr) = config_mgr {
let fut = wrap_future(config_mgr.send(
ConfigManagerDataMessage::GetValue("Network.Port".to_owned()),
))
.map(
|out, actor: &mut NetworkManager, ctx: &mut Context<NetworkManager>| {
use crate::config_manager::ConfigManagerDataResponse::GotValue;
println!("[NetworkManager] got config manager value {:?}", out);
let recipient = ctx.address().recipient();
let port = if let Ok(GotValue(Some(ConfigValue::Number(port)))) = out
{
port
} else {
5600
};
println!("[NetworkManager] got port: {:?}", port);
let nl = NetworkListener::new(
format!("0.0.0.0:{}", port),
recipient.downgrade(),
);
nl.do_send(ListenerMessage::StartListening);
actor.listener_addr.replace(nl);
},
);
ctx.spawn(fut);
}
}
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("[NetworkManager] network manager stopped");
}
}
impl Handler<NetworkMessage> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: NetworkMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> <Self as Handler<NetworkMessage>>::Result {
use NetworkMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listener(ctx),
StopListening => self.stop_listener(ctx),
}
}
}
impl Handler<NetworkDataMessage> for NetworkManager {
type Result = NetworkDataOutput;
fn handle(
&mut self,
msg: NetworkDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
NetworkDataMessage::IsListening => {
NetworkDataOutput::IsListening(self.listener_addr.is_some())
}
}
}
}
impl Handler<ListenerOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: ListenerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ListenerOutput::NewConnection;
match msg {
NewConnection(connection) => {
println!("new connection");
self.new_connection(ctx, connection)
}
};
}
}
impl Handler<InitiatorOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(sender, addr, client_details) => {
self.client_request(ctx, sender, addr, client_details)
}
InfoRequest(sender, addr) => self.info_request(ctx, sender, addr),
}
}
}
impl From<Builder> for NetworkManager {
fn from(builder: Builder) -> Self {
Self {
listener_addr: None,
delegate: builder.delegate,
initiators: Vec::default(),
config_manager: ConfigManager::shared().downgrade(),
}
}
}

View File

@ -1,20 +0,0 @@
use actix::{Actor, Addr, WeakRecipient};
use crate::network::{
network_manager::messages::NetworkOutput,
NetworkManager,
};
pub struct Builder {
pub(super) delegate: WeakRecipient<NetworkOutput>,
}
impl Builder {
pub(super) fn new(delegate: WeakRecipient<NetworkOutput>) -> Self {
Self { delegate }
}
pub fn build(self) -> Addr<NetworkManager> {
NetworkManager::from(self).start()
}
}

View File

@ -1,29 +0,0 @@
use actix::{Addr, Message, MessageResponse};
use foundation::ClientDetails;
use crate::network::Connection;
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub enum NetworkMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum NetworkOutput {
NewClient(Addr<Connection>, ClientDetails),
InfoRequested(Addr<Connection>),
}
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "NetworkDataOutput")]
pub enum NetworkDataMessage {
IsListening,
}
#[derive(MessageResponse)]
pub enum NetworkDataOutput {
IsListening(bool),
}

View File

@ -1,16 +0,0 @@
//! # network_manager
//! This module contains the network manager actor
//! it's role involves handling new oncomming network connections
mod actor;
mod builder;
mod messages;
pub(crate) use actor::NetworkManager;
pub(crate) use builder::*;
pub(crate) use messages::{
NetworkDataMessage,
NetworkDataOutput,
NetworkMessage,
NetworkOutput,
};

View File

@ -1,29 +0,0 @@
//! # prelude
//! A module that coalesces different types into one module of defined structure
mod observer;
#[allow(unused_imports)]
pub mod actors {
//! exports all actors used in the program.
pub use crate::server::Server;
pub(crate) use crate::{
client_management::{client::Client, ClientManager},
network::{Connection, ConnectionInitiator, NetworkManager},
};
}
#[allow(unused_imports)]
pub mod messages {
//! exports all messages used in the program.
pub(crate) use super::observer::ObservableMessage;
pub(crate) use crate::{
client_management::{ClientManagerMessage, ClientManagerOutput},
network::{
ConnectionMessage,
ConnectionObservableOutput,
NetworkMessage,
NetworkOutput,
},
};
}

View File

@ -1,17 +0,0 @@
//! # observer.rs
//! crates a message type for the observer pattern.
use actix::{Message, WeakRecipient};
/// # ObservableMessage
/// represents common messages for observers
#[derive(Message)]
#[rtype(result = "()")]
pub enum ObservableMessage<M>
where
M: Message + Send,
M::Result: Send,
{
Subscribe(WeakRecipient<M>),
Unsubscribe(WeakRecipient<M>),
}

View File

@ -1,4 +0,0 @@
pub(crate) mod scriptable_client;
pub(crate) mod scriptable_client_manager;
pub(crate) mod scriptable_network_manager;
pub(crate) mod scriptable_server;

View File

@ -1,60 +0,0 @@
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::client_management::client::{
Client,
ClientDataMessage,
ClientDataResponse,
ClientDataResponse::{Username, Uuid},
};
#[derive(Clone)]
pub(crate) struct ScriptableClient {
addr: Addr<Client>,
}
impl UserData for ScriptableClient {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("username", |_lua, obj, ()| async move {
let name: Option<ClientDataResponse> =
obj.addr.send(ClientDataMessage::Username).await.ok();
if let Some(Username(name)) = name {
Ok(name)
} else {
Err(Error::RuntimeError(
"Name returned null or other value".to_string(),
))
}
});
methods.add_async_method("uuid", |_lua, obj, ()| async move {
let uuid: Option<ClientDataResponse> =
obj.addr.send(ClientDataMessage::Uuid).await.ok();
if let Some(Uuid(uuid)) = uuid {
Ok(uuid.to_string())
} else {
Err(Error::RuntimeError(
"Uuid returned null or other value".to_string(),
))
}
});
methods.add_async_method("address", |_lua, obj, ()| async move {
let address: Option<ClientDataResponse> =
obj.addr.send(ClientDataMessage::Address).await.ok();
if let Some(Username(address)) = address {
Ok(address)
} else {
Err(Error::RuntimeError(
"address returned null or other value".to_string(),
))
}
});
}
}
impl From<Addr<Client>> for ScriptableClient {
fn from(addr: Addr<Client>) -> Self {
Self { addr }
}
}

View File

@ -1,43 +0,0 @@
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::{
client_management::{
ClientManager,
ClientManagerDataMessage,
ClientManagerDataResponse::Clients,
},
scripting::scriptable_client::ScriptableClient,
};
#[derive(Clone)]
pub(crate) struct ScriptableClientManager {
addr: Addr<ClientManager>,
}
impl UserData for ScriptableClientManager {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("clients", |_lua, obj, ()| async move {
let res = obj.addr.send(ClientManagerDataMessage::Clients).await;
if let Ok(Clients(clients)) = res {
let clients: Vec<ScriptableClient> = clients
.into_iter()
.filter_map(|a| a.upgrade())
.map(ScriptableClient::from)
.collect();
Ok(clients)
} else {
Err(Error::RuntimeError(
"clients returned null or other value".to_string(),
))
}
})
}
}
impl From<Addr<ClientManager>> for ScriptableClientManager {
fn from(addr: Addr<ClientManager>) -> Self {
Self { addr }
}
}

View File

@ -1,35 +0,0 @@
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::network::{
NetworkDataMessage,
NetworkDataOutput::IsListening,
NetworkManager,
};
#[derive(Clone)]
pub(crate) struct ScriptableNetworkManager {
addr: Addr<NetworkManager>,
}
impl UserData for ScriptableNetworkManager {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("Listening", |_lua, obj, ()| async move {
let is_listening =
obj.addr.send(NetworkDataMessage::IsListening).await.ok();
if let Some(IsListening(is_listening)) = is_listening {
Ok(is_listening)
} else {
Err(Error::RuntimeError(
"Uuid returned null or other value".to_string(),
))
}
});
}
}
impl From<Addr<NetworkManager>> for ScriptableNetworkManager {
fn from(addr: Addr<NetworkManager>) -> Self {
Self { addr }
}
}

View File

@ -1,55 +0,0 @@
use actix::WeakAddr;
use mlua::{Error, UserData, UserDataMethods};
use crate::server::{ServerDataResponse::Name, *};
#[derive(Clone)]
pub(crate) struct ScriptableServer {
pub(super) addr: WeakAddr<Server>,
}
impl UserData for ScriptableServer {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("name", |_lua, obj, ()| async move {
let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Name)) else {
return Err(Error::RuntimeError(
"[ScriptableServer:name] Server doesn't exist. Dunno how you got here".to_string(),
))
};
let name: Option<ServerDataResponse> = send_fut.await.ok();
let Some(Name(name)) = name else {
return Err(Error::RuntimeError(
"[ScriptableServer:name] Name returned nil".to_string(),
))
};
Ok(name)
});
methods.add_async_method("owner", |_lua, obj, ()| async move {
let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Owner)) else {
return Err(Error::RuntimeError(
"[ScriptableServer:owner] Server doesn't exist. Dunno how you got here".to_string(),
))
};
let owner: Option<ServerDataResponse> = send_fut.await.ok();
let Some(Name(owner)) = owner else {
return Err(Error::RuntimeError(
"[ScriptableServer:owner] Owner returned nil".to_string(),
))
};
Ok(owner)
});
}
}
impl From<WeakAddr<Server>> for ScriptableServer {
fn from(addr: WeakAddr<Server>) -> Self {
Self { addr }
}
}

View File

@ -1,31 +0,0 @@
use actix::{Actor, Addr};
use super::*;
pub struct ServerBuilder {
pub(super) name: String,
pub(super) owner: String,
}
impl<'rhai> ServerBuilder {
pub(super) fn new() -> Self {
Self {
name: "<UNKNOWN>".into(),
owner: "<UNKNOWN>".into(),
}
}
pub fn name(mut self, name: String) -> Self {
self.name = name;
self
}
pub fn owner(mut self, owner: String) -> Self {
self.owner = owner;
self
}
pub fn build(self) -> Addr<Server> {
Server::from(self).start()
}
}

View File

@ -1,21 +0,0 @@
use actix::{Addr, Message, MessageResponse};
use crate::{client_management::ClientManager, network::NetworkManager};
#[derive(Message, Clone)]
#[rtype(result = "ServerDataResponse")]
pub enum ServerDataMessage {
Name,
Owner,
ClientManager,
NetworkManager,
}
#[derive(MessageResponse, Clone)]
pub enum ServerDataResponse {
Name(String),
Port(u16),
Owner(String),
ClientManager(Option<Addr<ClientManager>>),
NetworkManager(Option<Addr<NetworkManager>>),
}

View File

@ -1,13 +0,0 @@
//! # actix_server
//! this holds the server actor
//! the server acts as teh main actor
//! and supervisor to the actor system.
mod server;
mod builder;
mod messages;
pub use builder::ServerBuilder;
pub use messages::*;
pub use server::Server;

View File

@ -1,191 +0,0 @@
//! This crate holds the implementations and functions for the server
//! including server boot procedures
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
};
use foundation::{messages::network::NetworkSockOut::GotInfo, ClientDetails};
use crate::{
client_management::{
client::Client,
ClientManager,
ClientManagerMessage::AddClient,
ClientManagerOutput,
},
config_manager::{
ConfigManager,
ConfigManagerDataMessage,
ConfigManagerDataResponse,
ConfigValue,
},
network::{
Connection,
ConnectionMessage::{CloseConnection, SendData},
NetworkManager,
NetworkOutput,
NetworkOutput::{InfoRequested, NewClient},
},
prelude::messages::NetworkMessage,
server::{builder, ServerBuilder, ServerDataMessage, ServerDataResponse},
};
/// This struct is the main actor of the server.
/// all other actors are ran through here.
pub struct Server {
name: String,
owner: String,
network_manager: Option<Addr<NetworkManager>>,
client_manager: Option<Addr<ClientManager>>,
}
impl Server {
pub(crate) fn create() -> builder::ServerBuilder {
ServerBuilder::new()
}
pub(crate) fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
addr: Addr<Connection>,
details: ClientDetails,
) {
if let Some(mgr) = self.client_manager.as_ref() {
let client = Client::new(addr, details.clone());
mgr.do_send(AddClient(details.uuid, client));
}
}
pub(crate) fn info_request(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
server_name: self.name.clone(),
server_owner: self.owner.clone(),
})
.expect("Failed to serialise"),
)),
)
// equivalent to using .then() in js
.map(move |_out, _act: &mut Self, _ctx| {
sender.do_send(CloseConnection);
});
ctx.spawn(fut);
}
}
impl Actor for Server {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
use ConfigManagerDataMessage::GetValue;
use ConfigManagerDataResponse::GotValue;
let addr = ctx.address().downgrade();
let nm = NetworkManager::create(addr.clone().recipient()).build();
let cm = ClientManager::new(addr.recipient());
self.network_manager.replace(nm.clone());
self.client_manager.replace(cm.clone());
nm.do_send(NetworkMessage::StartListening);
let name_fut = wrap_future(
ConfigManager::shared().send(GetValue("Server.Name".to_owned())),
)
.map(|out, actor: &mut Server, _ctx| {
if let Ok(GotValue(Some(ConfigValue::String(val)))) = out {
actor.name = val
}
});
let owner_fut = wrap_future(
ConfigManager::shared().send(GetValue("Server.Owner".to_owned())),
)
.map(|out, actor: &mut Server, _ctx| {
if let Ok(GotValue(Some(ConfigValue::String(val)))) = out {
actor.owner = val
}
});
ctx.spawn(name_fut);
ctx.spawn(owner_fut);
}
}
impl Handler<ServerDataMessage> for Server {
type Result = ServerDataResponse;
fn handle(
&mut self,
msg: ServerDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
println!("[Server] got data message");
match msg {
ServerDataMessage::Name => ServerDataResponse::Name(self.name.clone()),
ServerDataMessage::Owner => ServerDataResponse::Owner(self.owner.clone()),
ServerDataMessage::ClientManager => {
ServerDataResponse::ClientManager(self.client_manager.clone())
}
ServerDataMessage::NetworkManager => {
ServerDataResponse::NetworkManager(self.network_manager.clone())
}
}
}
}
impl Handler<NetworkOutput> for Server {
type Result = ();
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
println!("[ServerActor] received message");
match msg {
// This uses promise like funcionality to queue
// a set of async operations,
// so they occur in the right order
InfoRequested(sender) => self.info_request(ctx, sender),
// A new client is to be added
NewClient(addr, details) => self.client_request(ctx, addr, details),
};
}
}
impl Handler<ClientManagerOutput> for Server {
type Result = ();
fn handle(
&mut self,
_msg: ClientManagerOutput,
_ctx: &mut Self::Context,
) -> Self::Result {
todo!()
}
}
impl From<ServerBuilder> for Server {
fn from(builder: ServerBuilder) -> Self {
Server {
name: builder.name,
owner: builder.owner,
network_manager: None,
client_manager: None,
}
}
}

View File

@ -7,8 +7,10 @@ use tokio::{
};
use crate::{
listener_manager::{ConnectionType, ListenerManager},
network_connection::{NetworkConnection, ServerRequest},
network::{
listener_manager::{ConnectionType, ListenerManager},
network_connection::{NetworkConnection, ServerRequest},
},
os_signal_manager::OSSignalManager,
};