Merge branch 'feature/disconnection-fix' into develop

This commit is contained in:
michael-bailey 2023-01-25 23:31:33 +00:00
commit 20b1398d13
43 changed files with 933 additions and 610 deletions

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
# First stage: build the server file.
FROM rust:alpine3.16 AS build
WORKDIR /app # avoid the root directory
COPY ./ ./
RUN cargo build --release --bin server
# Second stage: actually run the server file.
FROM alpine:latest
WORKDIR /app
COPY --from=build /app/target/release/server ./server
CMD server

View File

@ -6,7 +6,8 @@ use cursive::{
menu::{Item, Tree},
traits::Nameable,
views::{Dialog, TextView},
Cursive, CursiveExt,
Cursive,
CursiveExt,
};
use worker::Worker;

View File

@ -11,7 +11,12 @@ use tokio::{
time::sleep,
};
use crate::{managers::NetworkManager, worker_message::WorkerMessage, Cursive, TextView};
use crate::{
managers::NetworkManager,
worker_message::WorkerMessage,
Cursive,
TextView,
};
pub type CursiveSender = CrossSender<Box<dyn FnOnce(&mut Cursive) + Send>>;

View File

@ -44,23 +44,19 @@ impl Connection {
T: Serialize,
{
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
writeln!(&mut out_buffer, "{}", out)?;
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
return if let Some(mut writer) = old {
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
} else {
Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"))
let Some(mut writer) = old else {
return Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"));
};
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
}
pub async fn read<T>(&self) -> Result<T, Error>

View File

@ -14,14 +14,16 @@ mod test {
let key = sha256(b"This is a key");
let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let encrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let encrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let mut ciphertext = vec![0u8; 1024];
let cipherlen = encrypter
.unwrap()
.update(plaintext, ciphertext.as_mut_slice())
.unwrap();
let decrypter = Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let decrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let mut decrypted = vec![0u8; 1024];
decrypter
.unwrap()

View File

@ -2,7 +2,11 @@ use std::collections::HashMap;
use futures::channel::oneshot::{channel, Receiver, Sender};
use crate::event::{event_result::EventResultBuilder, EventResult, EventResultType};
use crate::event::{
event_result::EventResultBuilder,
EventResult,
EventResultType,
};
/// # Eventw
/// Object that holds details about an event being passed through the application.
@ -69,7 +73,11 @@ impl<T> EventBuilder<T> {
}
}
pub fn add_arg<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
pub fn add_arg<K: Into<String>, V: Into<String>>(
mut self,
key: K,
value: V,
) -> Self {
self.args.insert(key.into(), value.into());
self
}

View File

@ -33,7 +33,10 @@ pub struct EventResultBuilder {
}
impl EventResultBuilder {
pub(self) fn new(result_type: EventResultType, sender: Sender<EventResult>) -> Self {
pub(self) fn new(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> Self {
Self {
code: result_type,
args: HashMap::default(),

View File

@ -38,5 +38,5 @@ pub enum ClientStreamOut {
Disconnected,
// error cases
Error,
Error { msg: String },
}

View File

@ -45,7 +45,6 @@ pub trait IManager {
if let Some(manager) = Weak::upgrade(&weak_self) {
manager.run().await
}
()
}
});
}

View File

@ -1,4 +1,4 @@
max_width = 90
max_width = 80
hard_tabs = true
tab_spaces = 2
imports_layout = "HorizontalVertical"

View File

@ -21,7 +21,12 @@ impl ChatManager {
}
// no need for a remove methods because this is a read only system
fn add_message(&mut self, _ctx: &mut Context<Self>, id: Uuid, content: String) {
fn add_message(
&mut self,
_ctx: &mut Context<Self>,
id: Uuid,
content: String,
) {
println!(
"[ChatManager] add_message id: {:?} content: {:?}",
id, content
@ -51,10 +56,16 @@ impl Actor for ChatManager {
impl Handler<ChatManagerMessage> for ChatManager {
type Result = ();
fn handle(&mut self, msg: ChatManagerMessage, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ChatManagerMessage,
ctx: &mut Self::Context,
) -> Self::Result {
println!("[ChatManager] got message: {:?}", msg);
match msg {
ChatManagerMessage::AddMessage(id, content) => self.add_message(ctx, id, content),
ChatManagerMessage::AddMessage(id, content) => {
self.add_message(ctx, id, content)
}
}
}
}

View File

@ -1,5 +1,8 @@
use actix::{Actor, Addr, AsyncContext, Context, Handler, Recipient};
use foundation::{messages::client::ClientStreamIn, ClientDetails};
use actix::{Actor, Addr, AsyncContext, Context, Handler, WeakRecipient};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamOut},
ClientDetails,
};
use uuid::Uuid;
use crate::{
@ -9,8 +12,8 @@ use crate::{
ClientMessage,
ClientObservableMessage,
},
network::{Connection, ConnectionOuput},
prelude::messages::ObservableMessage,
network::{Connection, ConnectionObservableOutput},
prelude::messages::{ConnectionMessage, ObservableMessage},
};
/// # Client
@ -19,11 +22,14 @@ use crate::{
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>,
observers: Vec<WeakRecipient<ClientObservableMessage>>,
}
impl Client {
pub(crate) fn new(connection: Addr<Connection>, details: ClientDetails) -> Addr<Self> {
pub(crate) fn new(
connection: Addr<Connection>,
details: ClientDetails,
) -> Addr<Self> {
Client {
connection,
details,
@ -34,39 +40,59 @@ impl Client {
#[inline]
fn get_clients(&self, ctx: &mut Context<Client>) {
println!("[Client] getting clients");
use ClientObservableMessage::GetClients;
self.broadcast(GetClients(ctx.address().downgrade()));
}
#[inline]
fn get_messages(&self, ctx: &mut Context<Client>) {
println!("[Client] getting messages");
use ClientObservableMessage::GetGlobalMessages;
self.broadcast(GetGlobalMessages(ctx.address().downgrade()));
}
#[inline]
fn send_message(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
println!("[Client] sending message");
use ClientObservableMessage::Message;
self.broadcast(Message(ctx.address().downgrade(), to, content));
}
#[inline]
fn send_gloal_message(&self, ctx: &mut Context<Client>, content: String) {
println!("[Client] sending global message");
use ClientObservableMessage::GlobalMessage;
self.broadcast(GlobalMessage(ctx.address().downgrade(), content));
}
#[inline]
fn disconnect(&self, _ctx: &mut Context<Client>) {
todo!()
println!("[Client] disconnecting");
use ClientObservableMessage::Disconnecting;
self.broadcast(Disconnecting(self.details.uuid));
}
#[inline]
fn broadcast(&self, message: ClientObservableMessage) {
println!("[Client] broadcasting message");
for recp in &self.observers {
recp.do_send(message.clone());
if let Some(upgraded) = recp.upgrade() {
upgraded.do_send(message.clone());
}
}
}
pub(crate) fn error(&self, msg: String) {
println!("[Client] sending error: {}", msg);
use serde_json::to_string;
use ConnectionMessage::SendData;
let msg = to_string::<ClientStreamOut>(&ClientStreamOut::Error { msg })
.expect("[Client] This should not fail");
self.connection.do_send(SendData(msg));
}
}
impl Actor for Client {
@ -74,7 +100,7 @@ impl Actor for Client {
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::{ClientStreamOut, ClientStreamOut::Connected};
use foundation::messages::client::ClientStreamOut::Connected;
use serde_json::to_string;
use crate::{
@ -84,8 +110,8 @@ impl Actor for Client {
println!("[Client] started");
self
.connection
.do_send::<ObservableMessage<ConnectionOuput>>(Subscribe(
ctx.address().recipient(),
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Subscribe(
ctx.address().recipient().downgrade(),
));
self
.connection
@ -93,7 +119,7 @@ impl Actor for Client {
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use foundation::messages::client::{ClientStreamOut, ClientStreamOut::Disconnected};
use foundation::messages::client::ClientStreamOut::Disconnected;
use serde_json::to_string;
use crate::{
@ -105,8 +131,8 @@ impl Actor for Client {
self
.connection
.do_send::<ObservableMessage<ConnectionOuput>>(Unsubscribe(
ctx.address().recipient(),
.do_send::<ObservableMessage<ConnectionObservableOutput>>(Unsubscribe(
ctx.address().recipient().downgrade(),
));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Disconnected).unwrap(),
@ -116,9 +142,15 @@ impl Actor for Client {
impl Handler<ClientDataMessage> for Client {
type Result = ClientDataResponse;
fn handle(&mut self, msg: ClientDataMessage, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ClientDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
ClientDataMessage::Details => ClientDataResponse::Details(self.details.clone()),
ClientDataMessage::Details => {
ClientDataResponse::Details(self.details.clone())
}
_ => todo!(),
}
}
@ -127,15 +159,16 @@ impl Handler<ClientDataMessage> for Client {
// Handles incoming messages to the client.
impl Handler<ClientMessage> for Client {
type Result = ();
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Self::Context) -> Self::Result {
use foundation::messages::client::{
ClientStreamOut,
ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
},
fn handle(
&mut self,
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use foundation::messages::client::ClientStreamOut::{
ConnectedClients,
GlobalChatMessages,
GlobalMessage,
UserMessage,
};
use serde_json::to_string;
@ -160,25 +193,33 @@ impl Handler<ClientMessage> for Client {
.expect("[Client] Failed to encode string"),
)),
ClientlySentMessage { content, from } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
ClientlySentMessage { content, from } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
))
}
GloballySentMessage { from, content } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
GloballySentMessage { from, content } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage { from, content })
.expect("[Client] Failed to encode string"),
))
}
}
}
}
// Handles outputs from the connection.
impl Handler<ConnectionOuput> for Client {
impl Handler<ConnectionObservableOutput> for Client {
type Result = ();
fn handle(&mut self, msg: ConnectionOuput, ctx: &mut Self::Context) -> Self::Result {
use crate::network::ConnectionOuput::RecvData;
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use crate::network::ConnectionObservableOutput::RecvData;
if let RecvData(_sender, _addr, data) = msg {
use foundation::messages::client::ClientStreamIn::{
Disconnect,
@ -188,14 +229,18 @@ impl Handler<ConnectionOuput> for Client {
SendMessage,
};
use serde_json::from_str;
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => self.send_gloal_message(ctx, content),
Disconnect => self.disconnect(ctx),
if let Ok(msg) = from_str::<ClientStreamIn>(data.as_str()) {
match msg {
GetClients => self.get_clients(ctx),
GetMessages => self.get_messages(ctx),
SendMessage { to, content } => self.send_message(ctx, to, content),
SendGlobalMessage { content } => {
self.send_gloal_message(ctx, content)
}
Disconnect => self.disconnect(ctx),
}
} else {
self.error(format!("Failed to parse Message: {}", data));
}
}
}
@ -217,13 +262,20 @@ impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
}
Unsubscribe(r) => {
println!("[Client] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.filter(|a| a.upgrade() != r)
.collect();
}
}
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] Dropping value")
}
}

View File

@ -40,4 +40,5 @@ pub enum ClientObservableMessage {
GlobalMessage(WeakAddr<Client>, String),
GetClients(WeakAddr<Client>),
GetGlobalMessages(WeakAddr<Client>),
Disconnecting(Uuid),
}

View File

@ -46,7 +46,9 @@ pub struct ClientManager {
}
impl ClientManager {
pub(crate) fn new(delegate: WeakRecipient<ClientManagerOutput>) -> Addr<Self> {
pub(crate) fn new(
delegate: WeakRecipient<ClientManagerOutput>,
) -> Addr<Self> {
ClientManager {
_delegate: delegate,
clients: HashMap::new(),
@ -92,12 +94,14 @@ impl ClientManager {
sender: WeakAddr<Client>,
) {
if let Some(to_send) = sender.upgrade() {
let fut = wrap_future(self.chat_manager.send(ChatManagerDataMessage::GetMessages))
.map(move |out, _a, _ctx| {
if let Ok(ChatManagerDataResponse::GotMessages(res)) = out {
to_send.do_send(ClientMessage::MessageList(res));
}
});
let fut = wrap_future(
self.chat_manager.send(ChatManagerDataMessage::GetMessages),
)
.map(move |out, _a, _ctx| {
if let Ok(ChatManagerDataResponse::GotMessages(res)) = out {
to_send.do_send(ClientMessage::MessageList(res));
}
});
ctx.spawn(fut);
};
}
@ -220,7 +224,8 @@ impl ClientManager {
println!("[ClientManager] adding client");
use crate::prelude::messages::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
addr.do_send(Subscribe(recp));
println!("[ClientManager] sending subscribe message to client");
addr.do_send(Subscribe(recp.downgrade()));
self.clients.insert(uuid, addr);
}
@ -229,7 +234,21 @@ impl ClientManager {
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp));
println!("[ClientManager] sending unsubscribe message to client");
addr.do_send(Unsubscribe(recp.downgrade()));
}
}
fn disconnect_client(
&mut self,
ctx: &mut Context<ClientManager>,
uuid: Uuid,
) {
println!("[ClientManager] disconnecting client");
use crate::prelude::messages::ObservableMessage::Unsubscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
if let Some(addr) = self.clients.remove(&uuid) {
addr.do_send(Unsubscribe(recp.downgrade()));
}
}
}
@ -268,18 +287,22 @@ impl Handler<ClientObservableMessage> for ClientManager {
ctx: &mut Self::Context,
) -> Self::Result {
use crate::client_management::client::ClientObservableMessage::{
Disconnecting,
GetClients,
GetGlobalMessages,
GlobalMessage,
Message,
};
match msg {
Message(sender, to, content) => self.send_message_request(ctx, sender, to, content),
Message(sender, to, content) => {
self.send_message_request(ctx, sender, to, content)
}
GlobalMessage(sender, content) => {
self.send_global_message_request(ctx, sender, content)
}
GetClients(sender) => self.send_client_list(ctx, sender),
GetGlobalMessages(sender) => self.send_global_messages(ctx, sender),
Disconnecting(uuid) => self.disconnect_client(ctx, uuid),
}
}
}
@ -293,7 +316,9 @@ impl Handler<ClientManagerDataMessage> for ClientManager {
_ctx: &mut Self::Context,
) -> Self::Result {
match msg {
ClientManagerDataMessage::ClientCount => ClientCount(self.clients.values().count()),
ClientManagerDataMessage::ClientCount => {
ClientCount(self.clients.values().count())
}
ClientManagerDataMessage::Clients => {
Clients(self.clients.values().map(|a| a.downgrade()).collect())
}

View File

@ -1,8 +1,8 @@
use crate::client_management::client::Client;
use crate::client_management::ClientManager;
use actix::{Addr, Message, MessageResponse, WeakAddr};
use uuid::Uuid;
use crate::client_management::{client::Client, ClientManager};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ClientManagerMessage {

View File

@ -14,7 +14,9 @@ use crate::{
arg_parser::Arguments,
builder::Builder,
messages::{
ConfigManagerDataMessage, ConfigManagerDataResponse, ConfigManagerOutput,
ConfigManagerDataMessage,
ConfigManagerDataResponse,
ConfigManagerOutput,
},
types::ConfigValue::{Dict, Number, String as ConfigString},
ConfigValue,
@ -145,17 +147,21 @@ impl From<Builder> for ConfigManager {
.ok()
.unwrap_or_else(|| Dict(BTreeMap::new()));
println!("[ConfigManager] got stored: {:?}", stored);
let mut root = stored.clone();
if let Dict(root) = &mut root {
builder.args.map(|v| {
v.port
.map(|p| root.insert("Network.Port".to_owned(), Number(p.into())));
v.name
.map(|n| root.insert("Server.Name".to_owned(), ConfigString(n.into())));
v.name.map(|n| {
root.insert("Server.Name".to_owned(), ConfigString(n.into()))
});
v.owner
.map(|o| root.insert("Server.Owner".to_owned(), ConfigString(o.into())));
v.owner.map(|o| {
root.insert("Server.Owner".to_owned(), ConfigString(o.into()))
});
});
}

View File

@ -1,6 +1,7 @@
use crate::config_manager::types::ConfigValue;
use actix::{Message, MessageResponse};
use crate::config_manager::types::ConfigValue;
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub enum ConfigManagerOutput {

View File

@ -9,5 +9,8 @@ mod messages;
mod types;
pub(crate) use config_manager::ConfigManager;
pub(crate) use messages::{ConfigManagerDataMessage, ConfigManagerDataResponse};
pub(crate) use messages::{
ConfigManagerDataMessage,
ConfigManagerDataResponse,
};
pub(crate) use types::ConfigValue;

View File

@ -35,9 +35,9 @@ impl From<ConfigValue> for Value {
impl From<Value> for ConfigValue {
fn from(v: Value) -> Self {
match v {
Value::Table(dict) => {
ConfigValue::Dict(dict.into_iter().map(|(k, v)| (k, v.into())).collect())
}
Value::Table(dict) => ConfigValue::Dict(
dict.into_iter().map(|(k, v)| (k, v.into())).collect(),
),
Value::Array(arr) => {
ConfigValue::Array(arr.into_iter().map(|v| v.into()).collect())
}

View File

@ -1,20 +1,23 @@
use crate::client_management::ClientManager;
use crate::lua::lua_manager::LuaManager;
use crate::network::NetworkManager;
use crate::Server;
use actix::Addr;
use actix::{Addr, WeakAddr};
use crate::{
client_management::ClientManager,
lua::lua_manager::LuaManager,
network::NetworkManager,
Server,
};
pub struct Builder {
pub(super) server: Addr<Server>,
pub(super) network_manager: Addr<NetworkManager>,
pub(super) client_manager: Addr<ClientManager>,
pub(super) server: WeakAddr<Server>,
pub(super) network_manager: WeakAddr<NetworkManager>,
pub(super) client_manager: WeakAddr<ClientManager>,
}
impl Builder {
pub(super) fn new(
server: Addr<Server>,
network_manager: Addr<NetworkManager>,
client_manager: Addr<ClientManager>,
server: WeakAddr<Server>,
network_manager: WeakAddr<NetworkManager>,
client_manager: WeakAddr<ClientManager>,
) -> Self {
Builder {
server,

View File

@ -2,29 +2,31 @@
//!
//! Holds the LuaManger struct and implements it's methods
use crate::client_management::ClientManager;
use crate::lua::builder::Builder;
use crate::network::NetworkManager;
use crate::scripting::scriptable_server::ScriptableServer;
use crate::Server;
use actix::fut::wrap_future;
use actix::{Actor, Addr, AsyncContext, Context};
use actix::{fut::wrap_future, Actor, Addr, AsyncContext, Context, WeakAddr};
use mlua::{Lua, Thread};
use crate::{
client_management::ClientManager,
lua::builder::Builder,
network::NetworkManager,
scripting::scriptable_server::ScriptableServer,
Server,
};
/// # LuaManager
/// Holds common server objects
/// todo: change to weak references
pub struct LuaManager {
pub(super) server: Addr<Server>,
pub(super) _network_manager: Addr<NetworkManager>,
pub(super) _client_manager: Addr<ClientManager>,
pub(super) server: WeakAddr<Server>,
pub(super) _network_manager: WeakAddr<NetworkManager>,
pub(super) _client_manager: WeakAddr<ClientManager>,
}
impl LuaManager {
pub fn create(
server: Addr<Server>,
network_manager: Addr<NetworkManager>,
client_manager: Addr<ClientManager>,
server: WeakAddr<Server>,
network_manager: WeakAddr<NetworkManager>,
client_manager: WeakAddr<ClientManager>,
) -> Builder {
Builder::new(server, network_manager, client_manager)
}

View File

@ -11,7 +11,6 @@ pub(crate) mod scripting;
pub(crate) mod server;
use server::Server;
use tokio::time::{sleep, Duration};
/// The main function

View File

@ -0,0 +1,262 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use actix::{
clock::timeout,
fut::wrap_future,
Actor,
ActorContext,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
SpawnHandle,
WeakRecipient,
};
use futures::{future::join_all, stream::Buffered, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
sync::Mutex,
};
use super::{ConnectionMessage, ConnectionObservableOutput};
use crate::{
network::connection::messages::ConnectionPrivateMessage,
prelude::messages::ObservableMessage,
};
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<WeakRecipient<ConnectionObservableOutput>>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(crate) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
let addr = Connection {
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
}
.start();
addr.do_send(ConnectionPrivateMessage::DoRead(BufReader::new(read_half)));
addr
}
#[inline]
fn broadcast(
&self,
ctx: &mut <Self as Actor>::Context,
data: ConnectionObservableOutput,
) {
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let data = data.clone();
async move {
if let Some(r) = r.upgrade() {
let _ = r.send(data).await;
}
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
#[inline]
fn do_read(
&mut self,
ctx: &mut <Self as Actor>::Context,
mut buf_reader: BufReader<ReadHalf<TcpStream>>,
) {
let address = self.address;
let weak_addr = ctx.address().downgrade();
let read_fut = async move {
let dur = Duration::from_millis(100);
let mut buffer_string: String = Default::default();
let read_fut = buf_reader.read_line(&mut buffer_string);
let Ok(Ok(len)) = timeout(dur, read_fut).await else {
println!("[Connection] timeout reached");
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
return;
};
if len == 0 {
println!("[Connection] readline returned 0");
return;
}
if let Some(addr) = weak_addr.upgrade() {
let _ = addr
.send(ConnectionPrivateMessage::Broadcast(
ConnectionObservableOutput::RecvData(
addr.downgrade(),
address,
buffer_string.clone(),
),
))
.await;
}
if let Some(addr) = weak_addr.upgrade() {
addr.do_send(ConnectionPrivateMessage::DoRead(buf_reader));
}
};
ctx.spawn(wrap_future(read_fut));
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionObservableOutput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
if let Some(recp) = recp.upgrade() {
recp.do_send(ConnectionClosed(ctx.address().downgrade()))
}
}
}
}
impl Handler<ObservableMessage<ConnectionObservableOutput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionObservableOutput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionObservableOutput>>>::Result{
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
let r = r.upgrade();
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a.upgrade() != r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = Arc::downgrade(&self.write_half);
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
let Some(writer) = writer.upgrade() else {
return;
};
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
let _ = writeln!(&mut buffer, "{}", d.as_str());
let _ = lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
// impl Handler<SelfMessage> for Connection {
// type Result = ();
// fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result {
// use ConnectionObservableOutput::RecvData;
// use SelfMessage::UpdateObserversWithData;
// match msg {
// UpdateObserversWithData(data) => {
// let send = ctx.address();
// let addr = self.address;
// // this is a mess
// let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
// .observers
// .iter()
// .cloned()
// .map(|r| {
// let send = send.clone();
// let data = data.clone();
// async move {
// let _ = r.send(RecvData(send, addr, data)).await;
// }
// .boxed()
// })
// .collect();
// let _ = ctx.spawn(wrap_future(async {
// join_all(futs).await;
// }));
// }
// };
// }
// }
impl Handler<ConnectionPrivateMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionPrivateMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionPrivateMessage::Broadcast;
match msg {
Broadcast(data) => self.broadcast(ctx, data),
ConnectionPrivateMessage::DoRead(buf_reader) => {
self.do_read(ctx, buf_reader)
}
};
}
}
impl Drop for Connection {
fn drop(&mut self) {
println!("[Connection] Dropping value")
}
}

View File

@ -0,0 +1,31 @@
use std::net::SocketAddr;
use actix::{Message, WeakAddr};
use tokio::{
io::{BufReader, ReadHalf},
net::TcpStream,
};
use crate::prelude::actors::Connection;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub(crate) enum ConnectionObservableOutput {
RecvData(WeakAddr<Connection>, SocketAddr, String),
ConnectionClosed(WeakAddr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ConnectionPrivateMessage {
Broadcast(ConnectionObservableOutput),
DoRead(BufReader<ReadHalf<TcpStream>>),
}

View File

@ -1,202 +1,5 @@
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc};
mod actor;
mod messages;
use actix::{
fut::wrap_future,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use futures::{future::join_all, Future, FutureExt};
use tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::TcpStream,
sync::Mutex,
};
use crate::prelude::messages::ObservableMessage;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum ConnectionOuput {
RecvData(Addr<Connection>, SocketAddr, String),
ConnectionClosed(Addr<Connection>),
}
#[derive(Message)]
#[rtype(result = "()")]
enum SelfMessage {
UpdateObserversWithData(String),
}
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<Recipient<ConnectionOuput>>,
_loop_future: Option<SpawnHandle>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(super) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
_loop_future: None,
}
.start()
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
println!("[Connection] started");
let addr = ctx.address();
let read_half = self
.read_half
.take()
.expect("What the hell did yu do wrong");
ctx.spawn(wrap_future(async move {
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
while let Ok(len) = reader.read_line(&mut buffer_string).await {
use ConnectionMessage::CloseConnection;
use SelfMessage::UpdateObserversWithData;
if len == 0 {
println!("[Connection] connection closed");
addr
.send(CloseConnection)
.await
.expect("[Connection] failed to send close message to self");
return;
}
println!("[Connection] read line");
let _ = addr
.send(UpdateObserversWithData(buffer_string.clone()))
.await;
buffer_string.clear();
}
}));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ConnectionOuput::ConnectionClosed;
println!("[Connection] stopped");
for recp in self.observers.iter() {
recp.do_send(ConnectionClosed(ctx.address()));
}
}
}
impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionOuput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionOuput>>>::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Connection] adding subscriber");
self.observers.push(r);
}
Unsubscribe(r) => {
println!("[Connection] removing subscriber");
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: ConnectionMessage, ctx: &mut Self::Context) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = self.write_half.clone();
match msg {
SendData(d) => {
ctx.spawn(wrap_future(async move {
println!("[Connection] sending data");
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
let _ = writeln!(&mut buffer, "{}", d.as_str());
let _ = lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
impl Handler<SelfMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: SelfMessage, ctx: &mut Self::Context) -> Self::Result {
use ConnectionOuput::RecvData;
use SelfMessage::UpdateObserversWithData;
match msg {
UpdateObserversWithData(data) => {
let send = ctx.address();
let addr = self.address;
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let send = send.clone();
let data = data.clone();
async move {
let _ = r.send(RecvData(send, addr, data)).await;
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
};
}
}
pub(crate) use actor::Connection;
pub(crate) use messages::{ConnectionMessage, ConnectionObservableOutput};

View File

@ -0,0 +1,171 @@
use std::net::SocketAddr;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::InitiatorOutput,
prelude::{
actors::Connection,
messages::{
ConnectionMessage,
ConnectionObservableOutput,
ObservableMessage,
},
},
};
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: WeakAddr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let (Some(delegate), Some(sender)) =
(self.delegate.upgrade(), sender.upgrade())
{
match msg {
Info => {
delegate.do_send(InfoRequest(ctx.address().downgrade(), sender))
}
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address().downgrade(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(
&mut self,
ctx: &mut <Self as Actor>::Context,
sender: WeakAddr<Connection>,
) {
use ConnectionMessage::{CloseConnection, SendData};
if let Some(sender) = sender.upgrade() {
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error {
msg: "Error in connection initiator?".to_owned(),
})
.unwrap(),
));
sender.do_send(CloseConnection);
}
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use ConnectionMessage::SendData;
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient().downgrade()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient().downgrade()));
}
}
impl Handler<ConnectionObservableOutput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionObservableOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionObservableOutput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}
impl Drop for ConnectionInitiator {
fn drop(&mut self) {
println!("[ConnectionInitiator] Dropping value")
}
}

View File

@ -0,0 +1,15 @@
use actix::{Addr, Message, WeakAddr};
use foundation::ClientDetails;
use crate::prelude::actors::{Connection, ConnectionInitiator};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(WeakAddr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(
WeakAddr<ConnectionInitiator>,
Addr<Connection>,
ClientDetails,
),
}

View File

@ -1,142 +1,5 @@
use std::net::SocketAddr;
mod actor;
mod messages;
use actix::{
Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::{connection::ConnectionOuput, Connection, ConnectionMessage},
prelude::messages::ObservableMessage,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(Addr<ConnectionInitiator>, Addr<Connection>),
ClientRequest(Addr<ConnectionInitiator>, Addr<Connection>, ClientDetails),
}
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub struct ConnectionInitiator {
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: WeakRecipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: Addr<Connection>,
ctx: &mut <Self as Actor>::Context,
_address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
let msg = from_str::<NetworkSockIn>(data.as_str());
if let Err(e) = msg.as_ref() {
println!("[ConnectionInitiator] error decoding message {}", e);
self.error(ctx, sender);
return;
}
let msg = msg.unwrap();
println!("[ConnectionInitiator] matching request");
if let Some(delegate) = self.delegate.upgrade() {
match msg {
Info => delegate.do_send(InfoRequest(ctx.address(), sender)),
Connect {
uuid,
username,
address,
} => delegate.do_send(ClientRequest(
ctx.address(),
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
)),
};
ctx.stop();
}
}
fn error(&mut self, ctx: &mut <Self as Actor>::Context, sender: Addr<Connection>) {
use ConnectionMessage::{CloseConnection, SendData};
sender.do_send(SendData(
to_string::<ClientStreamOut>(&Error).expect("failed to convert error to string"),
));
sender.do_send(CloseConnection);
ctx.stop()
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
use super::ConnectionMessage::SendData;
println!("[ConnectionInitiator] started");
self
.connection
.do_send(Subscribe(ctx.address().recipient()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
println!("[ConnectionInitiator] stopped");
self
.connection
.do_send(Unsubscribe(ctx.address().recipient()));
}
}
impl Handler<ConnectionOuput> for ConnectionInitiator {
type Result = ();
fn handle(&mut self, msg: ConnectionOuput, ctx: &mut Self::Context) -> Self::Result {
use ConnectionOuput::RecvData;
if let RecvData(sender, addr, data) = msg {
self.handle_request(sender, ctx, addr, data)
}
}
}
pub(crate) use actor::ConnectionInitiator;
pub(crate) use messages::InitiatorOutput;

View File

@ -8,8 +8,8 @@ use actix::{
Context,
Handler,
Message,
Recipient,
SpawnHandle,
WeakRecipient,
};
use tokio::net::TcpListener;
@ -30,14 +30,14 @@ pub(super) enum ListenerOutput {
pub(super) struct NetworkListener {
address: SocketAddr,
delegate: Recipient<ListenerOutput>,
delegate: WeakRecipient<ListenerOutput>,
looper: Option<SpawnHandle>,
}
impl NetworkListener {
pub(crate) fn new<T: ToSocketAddrs>(
address: T,
delegate: Recipient<ListenerOutput>,
delegate: WeakRecipient<ListenerOutput>,
) -> Addr<NetworkListener> {
NetworkListener {
address: address
@ -61,7 +61,12 @@ impl NetworkListener {
while let Ok((stream, addr)) = listener.accept().await {
println!("[NetworkListener] accepted socket");
let conn = Connection::new(stream, addr);
delegate.do_send(NewConnection(conn));
let Some(delegate) = delegate.upgrade() else {
break;
};
delegate.do_send(NewConnection(conn))
}
}));
}

View File

@ -1,38 +1,42 @@
//! # Network
//!
//! This module contains network code for the server.
//!
//! This includes:
//! - The network manager: For that handles all server network connections.
//! - The network listener: For listening for connections on a port.
//! - The conneciton: An abstraction over sockets sockets, for actix.
//! - The connection initiator: For initiating new connections to the server
//!
//! ## Diagrams
//!
//! ```mermaid
//! sequenceDiagram
//! Server->>NetworkManager: creates
//! NetworkManager->>NetworkListener: create
//! NetworkManager->>+NetworkListener: start listening
//!
//! loop async tcp listen
//! NetworkListener->>NetworkListener: check for new connections
//! end
//!
//! NetworkListener->>Connection: create from socket
//! NetworkListener->>NetworkManager: new connection
//! NetworkManager->>Server: new connection
//!
//! Server->>ConnectionInitiator: create with connection
//! ```
#![doc = r"# Network
This module contains network code for the server.
This includes:
- The network manager: For that handles all server network connections.
- The network listener: For listening for connections on a port.
- The conneciton: An abstraction over sockets sockets, for actix.
- The connection initiator: For initiating new connections to the server
## Diagrams
```mermaid
sequenceDiagram
Server->>NetworkManager: creates
NetworkManager->>NetworkListener: create
NetworkManager->>+NetworkListener: start listening
loop async tcp listen
NetworkListener->>NetworkListener: check for new connections
end
NetworkListener->>Connection: create from socket
NetworkListener->>NetworkManager: new connection
NetworkManager->>Server: new connection
Server->>ConnectionInitiator: create with connection
```"]
mod connection;
mod connection_initiator;
mod listener;
mod network_manager;
pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection::{
Connection,
ConnectionMessage,
ConnectionObservableOutput,
};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
// use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{

View File

@ -80,16 +80,20 @@ impl NetworkManager {
) {
println!("[NetworkManager] Got new connection");
let init =
ConnectionInitiator::new(ctx.address().recipient().downgrade(), connection);
let init = ConnectionInitiator::new(
ctx.address().recipient().downgrade(),
connection,
);
self.initiators.push(init);
}
#[inline]
fn remove_initiator(&mut self, sender: Addr<ConnectionInitiator>) {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
self.initiators.remove(index);
fn remove_initiator(&mut self, sender: WeakAddr<ConnectionInitiator>) {
if let Some(sender) = sender.upgrade() {
let index = self.initiators.iter().position(|i| *i == sender).unwrap();
println!("[NetworkManager] removed initiator at:{}", index);
let _ = self.initiators.remove(index);
}
}
/// handles a initiator client request
@ -100,7 +104,7 @@ impl NetworkManager {
fn client_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
client_details: ClientDetails,
) {
@ -119,7 +123,7 @@ impl NetworkManager {
fn info_request(
&mut self,
_ctx: &mut <Self as Actor>::Context,
sender: Addr<ConnectionInitiator>,
sender: WeakAddr<ConnectionInitiator>,
connection: Addr<Connection>,
) {
use NetworkOutput::InfoRequested;
@ -139,9 +143,9 @@ impl Actor for NetworkManager {
let config_mgr = self.config_manager.clone().upgrade();
if let Some(config_mgr) = config_mgr {
let fut = wrap_future(config_mgr.send(ConfigManagerDataMessage::GetValue(
"Network.Port".to_owned(),
)))
let fut = wrap_future(config_mgr.send(
ConfigManagerDataMessage::GetValue("Network.Port".to_owned()),
))
.map(
|out, actor: &mut NetworkManager, ctx: &mut Context<NetworkManager>| {
use crate::config_manager::ConfigManagerDataResponse::GotValue;
@ -150,13 +154,17 @@ impl Actor for NetworkManager {
let recipient = ctx.address().recipient();
let port = if let Ok(GotValue(Some(ConfigValue::Number(port)))) = out {
let port = if let Ok(GotValue(Some(ConfigValue::Number(port)))) = out
{
port
} else {
5600
};
println!("[NetworkManager] got port: {:?}", port);
let nl = NetworkListener::new(format!("0.0.0.0:{}", port), recipient);
let nl = NetworkListener::new(
format!("0.0.0.0:{}", port),
recipient.downgrade(),
);
nl.do_send(ListenerMessage::StartListening);
actor.listener_addr.replace(nl);
},
@ -200,7 +208,11 @@ impl Handler<NetworkDataMessage> for NetworkManager {
impl Handler<ListenerOutput> for NetworkManager {
type Result = ();
fn handle(&mut self, msg: ListenerOutput, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ListenerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ListenerOutput::NewConnection;
match msg {
NewConnection(connection) => self.new_connection(ctx, connection),
@ -210,7 +222,11 @@ impl Handler<ListenerOutput> for NetworkManager {
impl Handler<InitiatorOutput> for NetworkManager {
type Result = ();
fn handle(&mut self, msg: InitiatorOutput, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(sender, addr, client_details) => {

View File

@ -1,7 +1,10 @@
use crate::network::network_manager::messages::NetworkOutput;
use crate::network::NetworkManager;
use actix::{Actor, Addr, WeakRecipient};
use crate::network::{
network_manager::messages::NetworkOutput,
NetworkManager,
};
pub struct Builder {
pub(super) delegate: WeakRecipient<NetworkOutput>,
}

View File

@ -1,8 +1,8 @@
use crate::network::Connection;
use actix::Addr;
use actix::{Message, MessageResponse};
use actix::{Addr, Message, MessageResponse};
use foundation::ClientDetails;
use crate::network::Connection;
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub enum NetworkMessage {

View File

@ -6,18 +6,24 @@ mod observer;
#[allow(unused_imports)]
pub mod actors {
//! exports all actors used in the program.
pub(crate) use crate::client_management::client::Client;
pub(crate) use crate::client_management::ClientManager;
pub(crate) use crate::network::{Connection, ConnectionInitiator, NetworkManager};
pub use crate::server::Server;
pub(crate) use crate::{
client_management::{client::Client, ClientManager},
network::{Connection, ConnectionInitiator, NetworkManager},
};
}
#[allow(unused_imports)]
pub mod messages {
//! exports all messages used in the program.
pub(crate) use super::observer::ObservableMessage;
pub(crate) use crate::client_management::{ClientManagerMessage, ClientManagerOutput};
pub(crate) use crate::network::{
ConnectionMessage, ConnectionOuput, NetworkMessage, NetworkOutput,
pub(crate) use crate::{
client_management::{ClientManagerMessage, ClientManagerOutput},
network::{
ConnectionMessage,
ConnectionObservableOutput,
NetworkMessage,
NetworkOutput,
},
};
}

View File

@ -1,7 +1,7 @@
//! # observer.rs
//! crates a message type for the observer pattern.
use actix::{Message, Recipient};
use actix::{Message, WeakRecipient};
/// # ObservableMessage
/// represents common messages for observers
@ -12,6 +12,6 @@ where
M: Message + Send,
M::Result: Send,
{
Subscribe(Recipient<M>),
Unsubscribe(Recipient<M>),
Subscribe(WeakRecipient<M>),
Unsubscribe(WeakRecipient<M>),
}

View File

@ -1,24 +1,26 @@
use actix::{Actor, Addr};
use crate::client_management::ClientManager;
use crate::network::NetworkManager;
use crate::rhai::rhai_manager::RhaiManager;
use crate::Server;
use actix::{Actor, Addr, WeakAddr};
use rhai::{Engine, Scope};
use crate::{
client_management::ClientManager,
network::NetworkManager,
rhai::rhai_manager::RhaiManager,
Server,
};
pub struct Builder {
engine: Engine,
server: Addr<Server>,
network_manager: Addr<NetworkManager>,
client_manager: Addr<ClientManager>,
server: WeakAddr<Server>,
network_manager: WeakAddr<NetworkManager>,
client_manager: WeakAddr<ClientManager>,
scope: Scope<'static>,
}
impl Builder {
pub(super) fn new(
server: Addr<Server>,
network_manager: Addr<NetworkManager>,
client_manager: Addr<ClientManager>,
server: WeakAddr<Server>,
network_manager: WeakAddr<NetworkManager>,
client_manager: WeakAddr<ClientManager>,
) -> Self {
Builder {
engine: Engine::new(),

View File

@ -1,24 +1,26 @@
use crate::client_management::ClientManager;
use crate::network::NetworkManager;
use crate::rhai::builder::Builder;
use crate::Server;
use actix::{Actor, Addr, Context};
use actix::{Actor, Context, WeakAddr};
use rhai::{Engine, Scope};
use crate::{
client_management::ClientManager,
network::NetworkManager,
rhai::builder::Builder,
Server,
};
pub struct RhaiManager {
pub(super) engine: Engine,
pub(super) _scope: Scope<'static>,
pub(super) _server: Addr<Server>,
pub(super) _network_manager: Addr<NetworkManager>,
pub(super) _client_manager: Addr<ClientManager>,
pub(super) _server: WeakAddr<Server>,
pub(super) _network_manager: WeakAddr<NetworkManager>,
pub(super) _client_manager: WeakAddr<ClientManager>,
}
impl RhaiManager {
pub fn create(
server: Addr<Server>,
network_manager: Addr<NetworkManager>,
client_manager: Addr<ClientManager>,
server: WeakAddr<Server>,
network_manager: WeakAddr<NetworkManager>,
client_manager: WeakAddr<ClientManager>,
) -> Builder {
Builder::new(
server.clone(),

View File

@ -1,9 +1,13 @@
use crate::client_management::client::Client;
use crate::client_management::client::ClientDataResponse::{Username, Uuid};
use crate::client_management::client::{ClientDataMessage, ClientDataResponse};
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::client_management::client::{
Client,
ClientDataMessage,
ClientDataResponse,
ClientDataResponse::{Username, Uuid},
};
#[derive(Clone)]
pub(crate) struct ScriptableClient {
addr: Addr<Client>,

View File

@ -1,8 +1,12 @@
use crate::network::NetworkDataOutput::IsListening;
use crate::network::{NetworkDataMessage, NetworkManager};
use actix::Addr;
use mlua::{Error, UserData, UserDataMethods};
use crate::network::{
NetworkDataMessage,
NetworkDataOutput::IsListening,
NetworkManager,
};
#[derive(Clone)]
pub(crate) struct ScriptableNetworkManager {
addr: Addr<NetworkManager>,
@ -11,7 +15,8 @@ pub(crate) struct ScriptableNetworkManager {
impl UserData for ScriptableNetworkManager {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("Listening", |_lua, obj, ()| async move {
let is_listening = obj.addr.send(NetworkDataMessage::IsListening).await.ok();
let is_listening =
obj.addr.send(NetworkDataMessage::IsListening).await.ok();
if let Some(IsListening(is_listening)) = is_listening {
Ok(is_listening)
} else {

View File

@ -1,70 +1,55 @@
use crate::scripting::scriptable_client_manager::ScriptableClientManager;
use crate::scripting::scriptable_network_manager::ScriptableNetworkManager;
use actix::Addr;
use actix::WeakAddr;
use mlua::{Error, UserData, UserDataMethods};
use crate::server::ServerDataResponse::{ClientManager, Name, NetworkManager, Owner};
use crate::server::*;
use crate::server::{ServerDataResponse::Name, *};
#[derive(Clone)]
pub(crate) struct ScriptableServer {
pub(super) addr: Addr<Server>,
pub(super) addr: WeakAddr<Server>,
}
impl UserData for ScriptableServer {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("name", |_lua, obj, ()| async move {
let name: Option<ServerDataResponse> =
obj.addr.send(ServerDataMessage::Name).await.ok();
if let Some(Name(name)) = name {
Ok(name)
} else {
Err(Error::RuntimeError(
"Name returned null or other value".to_string(),
let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Name)) else {
return Err(Error::RuntimeError(
"[ScriptableServer:name] Server doesn't exist. Dunno how you got here".to_string(),
))
}
};
let name: Option<ServerDataResponse> = send_fut.await.ok();
let Some(Name(name)) = name else {
return Err(Error::RuntimeError(
"[ScriptableServer:name] Name returned nil".to_string(),
))
};
Ok(name)
});
methods.add_async_method("owner", |_lua, obj, ()| async move {
let owner: Option<ServerDataResponse> =
obj.addr.send(ServerDataMessage::Owner).await.ok();
if let Some(Owner(name)) = owner {
Ok(name)
} else {
Err(Error::RuntimeError(
"Name returned null or other value".to_string(),
let Some(send_fut) = obj.addr.upgrade().map(|addr| addr.send(ServerDataMessage::Owner)) else {
return Err(Error::RuntimeError(
"[ScriptableServer:owner] Server doesn't exist. Dunno how you got here".to_string(),
))
}
});
};
methods.add_async_method("client_manager", |_lua, obj, ()| async move {
let name: Option<ServerDataResponse> =
obj.addr.send(ServerDataMessage::ClientManager).await.ok();
if let Some(ClientManager(Some(cm))) = name {
Ok(ScriptableClientManager::from(cm))
} else {
Err(Error::RuntimeError(
"Name returned null or other value".to_string(),
))
}
});
let owner: Option<ServerDataResponse> = send_fut.await.ok();
methods.add_async_method("network_manager", |_lua, obj, ()| async move {
let name: Option<ServerDataResponse> =
obj.addr.send(ServerDataMessage::NetworkManager).await.ok();
if let Some(NetworkManager(Some(nm))) = name {
Ok(ScriptableNetworkManager::from(nm))
} else {
Err(Error::RuntimeError(
"Name returned null or other value".to_string(),
let Some(Name(owner)) = owner else {
return Err(Error::RuntimeError(
"[ScriptableServer:owner] Owner returned nil".to_string(),
))
}
};
Ok(owner)
});
}
}
impl From<Addr<Server>> for ScriptableServer {
fn from(addr: Addr<Server>) -> Self {
impl From<WeakAddr<Server>> for ScriptableServer {
fn from(addr: WeakAddr<Server>) -> Self {
Self { addr }
}
}

View File

@ -1,6 +1,7 @@
use super::*;
use actix::{Actor, Addr};
use super::*;
pub struct ServerBuilder {
pub(super) name: String,
pub(super) owner: String,

View File

@ -1,7 +1,7 @@
use crate::client_management::ClientManager;
use crate::network::NetworkManager;
use actix::{Addr, Message, MessageResponse};
use crate::{client_management::ClientManager, network::NetworkManager};
#[derive(Message, Clone)]
#[rtype(result = "ServerDataResponse")]
pub enum ServerDataMessage {

View File

@ -99,18 +99,27 @@ impl Actor for Server {
let addr = ctx.address().downgrade();
let nm = NetworkManager::create(addr.clone().recipient()).build();
nm.do_send(NetworkMessage::StartListening);
self.network_manager.replace(nm.clone());
let cm = ClientManager::new(addr.recipient());
let rm = RhaiManager::create(
ctx.address().downgrade(),
nm.downgrade(),
cm.downgrade(),
)
.build();
let lm = LuaManager::create(
ctx.address().downgrade(),
nm.downgrade(),
cm.downgrade(),
)
.build();
self.network_manager.replace(nm.clone());
self.client_manager.replace(cm.clone());
let rm = RhaiManager::create(ctx.address(), nm.clone(), cm.clone()).build();
self.rhai_manager.replace(rm);
let lm = LuaManager::create(ctx.address(), nm, cm).build();
self.lua_manager.replace(lm);
nm.do_send(NetworkMessage::StartListening);
let name_fut = wrap_future(
ConfigManager::shared().send(GetValue("Server.Name".to_owned())),
)
@ -137,7 +146,11 @@ impl Actor for Server {
impl Handler<ServerDataMessage> for Server {
type Result = ServerDataResponse;
fn handle(&mut self, msg: ServerDataMessage, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ServerDataMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
println!("[Server] got data message");
match msg {
ServerDataMessage::Name => ServerDataResponse::Name(self.name.clone()),
@ -154,7 +167,11 @@ impl Handler<ServerDataMessage> for Server {
impl Handler<NetworkOutput> for Server {
type Result = ();
fn handle(&mut self, msg: NetworkOutput, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
println!("[ServerActor] received message");
match msg {
// This uses promise like funcionality to queue