reformatted project

This commit is contained in:
michael-bailey 2022-06-15 18:20:57 +02:00
parent b45fd9a130
commit 1d90e480be
29 changed files with 613 additions and 411 deletions

View File

@ -1,33 +1,33 @@
mod worker;
mod managers;
mod worker;
mod worker_message;
use cursive::{
menu::{Item, Tree},
traits::Nameable,
views::{Dialog, TextView},
Cursive,
CursiveExt,
};
use worker::Worker;
use cursive::{Cursive, CursiveExt};
use cursive::menu::{Item, Tree};
use cursive::traits::Nameable;
use cursive::views::{Dialog, TextView};
fn main() {
let mut app = Cursive::default();
let worker_stream =
Worker::new(app.cb_sink().clone()).start();
let worker_stream = Worker::new(app.cb_sink().clone()).start();
app.set_user_data(worker_stream);
app.add_layer(Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()));
app.add_layer(
Dialog::new()
.content(TextView::new("Hello world").with_name("TextView"))
.button("close", |s| s.quit()),
);
app.menubar().autohide = false;
app.menubar().add_subtree(
"Application",
Tree::new()
.item(
Item::leaf("About", |s| s.quit())
).delimiter().item(
Item::leaf("Quit",|s| s.quit())
)
.item(Item::leaf("About", |s| s.quit()))
.delimiter()
.item(Item::leaf("Quit", |s| s.quit())),
);
app.set_fps(30);
app.run();

View File

@ -1,18 +1,25 @@
use std::{
io::{Error, ErrorKind},
mem,
sync::{atomic::AtomicBool, Arc},
};
use async_trait::async_trait;
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use foundation::{
connection::Connection,
messages::{
client::{ClientStreamIn, ClientStreamOut},
network::{NetworkSockIn, NetworkSockOut},
},
prelude::IManager,
};
use tokio::{
net::ToSocketAddrs,
sync::{mpsc::Sender, Mutex},
};
use uuid::Uuid;
use crate::managers::NetworkManagerMessage;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager<M>
where
@ -144,13 +151,14 @@ where
#[cfg(test)]
mod test {
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
use serverlib::Server;
use std::future::Future;
use serverlib::Server;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use crate::managers::{network::NetworkManagerMessage, NetworkManager};
async fn wrap_setup<T, F>(test: T)
where
T: FnOnce(u16) -> F,

View File

@ -1,5 +1,4 @@
use foundation::ClientDetails;
use foundation::messages::network::NetworkSockOut;
use foundation::{messages::network::NetworkSockOut, ClientDetails};
#[derive(Debug)]
pub enum NetworkManagerMessage {
@ -9,16 +8,22 @@ pub enum NetworkManagerMessage {
server_name: String,
server_owner: String,
},
Error(&'static str)
Error(&'static str),
}
impl From<NetworkSockOut> for NetworkManagerMessage {
fn from(other: NetworkSockOut) -> Self {
use NetworkSockOut::{GotInfo as OldInfo};
use NetworkManagerMessage::{Info as NewInfo, Error};
use NetworkManagerMessage::{Error, Info as NewInfo};
use NetworkSockOut::GotInfo as OldInfo;
match other {
OldInfo {server_name,server_owner} => NewInfo {server_name,server_owner},
_ => Error("Error occurred with conversion")
OldInfo {
server_name,
server_owner,
} => NewInfo {
server_name,
server_owner,
},
_ => Error("Error occurred with conversion"),
}
}
}
@ -27,13 +32,21 @@ impl PartialEq for NetworkManagerMessage {
fn eq(&self, other: &Self) -> bool {
use NetworkManagerMessage::Info;
match self {
Info {server_owner, server_name} => {
if let Info {server_owner: other_owner,server_name: other_name} = other {
return server_owner == other_owner && server_name == other_name;
Info {
server_owner,
server_name,
} => {
if let Info {
server_owner: other_owner,
server_name: other_name,
} = other
{
return server_owner == other_owner
&& server_name == other_name;
}
false
}
_ => {false}
_ => false,
}
}
}
}

View File

@ -3,5 +3,5 @@ mod network;
#[path = "message.rs"]
mod message;
pub use network::NetworkManager;
pub use message::NetworkManagerMessage;
pub use network::NetworkManager;

View File

@ -1,18 +1,25 @@
use std::{
io::{Error, ErrorKind},
mem,
sync::{atomic::AtomicBool, Arc},
};
use async_trait::async_trait;
use std::io::{Error, ErrorKind};
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use foundation::{
connection::Connection,
messages::{
client::{ClientStreamIn, ClientStreamOut},
network::{NetworkSockIn, NetworkSockOut},
},
prelude::IManager,
};
use tokio::{
net::ToSocketAddrs,
sync::{mpsc::Sender, Mutex},
};
use uuid::Uuid;
use crate::managers::NetworkManagerMessage;
use foundation::connection::Connection;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::prelude::IManager;
pub struct NetworkManager<M>
where
@ -144,13 +151,14 @@ where
#[cfg(test)]
mod test {
use crate::managers::network::NetworkManagerMessage;
use crate::managers::NetworkManager;
use serverlib::Server;
use std::future::Future;
use serverlib::Server;
use tokio::sync::mpsc::channel;
use uuid::Uuid;
use crate::managers::{network::NetworkManagerMessage, NetworkManager};
async fn wrap_setup<T, F>(test: T)
where
T: FnOnce(u16) -> F,

View File

@ -1,27 +1,30 @@
use std::sync::Arc;
use std::thread::spawn;
use std::time::Duration;
use std::{sync::Arc, thread::spawn, time::Duration};
use crossbeam_channel::Sender as CrossSender;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Sender as TokioSender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use foundation::ClientDetails;
use crate::{Cursive, TextView};
use crate::managers::{NetworkManager};
use crate::worker_message::WorkerMessage;
use tokio::{
runtime::Runtime,
sync::{
mpsc::{channel, Sender as TokioSender},
Mutex,
},
time::sleep,
};
use crate::{
managers::NetworkManager,
worker_message::WorkerMessage,
Cursive,
TextView,
};
pub type CursiveSender = CrossSender<Box<dyn FnOnce(&mut Cursive) + Send>>;
pub struct Worker
{
pub struct Worker {
cursive_sender: CursiveSender,
network_manager: Arc<NetworkManager<WorkerMessage>>,
number: Arc<Mutex<usize>>,
#[allow(unused)]
@ -31,24 +34,22 @@ pub struct Worker
impl Worker {
pub fn new(sender: CursiveSender) -> Worker {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
let (tx, rx) = channel::<WorkerMessage>(16);
Worker {
network_manager: NetworkManager::new(tx.clone()),
number: Arc::new(Mutex::new(0)),
user_details: Mutex::new(None),
cursive_sender: sender
cursive_sender: sender,
}
}
pub fn start(self) -> TokioSender<WorkerMessage> {
#[allow(unused)]
let (tx,rx) = channel::<WorkerMessage>(16);
let (tx, rx) = channel::<WorkerMessage>(16);
spawn(move || {
let sender = self.cursive_sender.clone();
let rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let tmp_num = self.number.clone();
#[allow(unused)]
let network_manager = self.network_manager.clone();
@ -56,17 +57,19 @@ impl Worker {
let a = &tmp_num;
loop {
let num = Arc::clone(&a);
sleep(Duration::new(1,0)).await;
let _ = sender.send(Box::new( move |s| {
sleep(Duration::new(1, 0)).await;
let _ = sender.send(Box::new(move |s| {
let num = &num.clone();
let mut num_lock = num.blocking_lock();
*num_lock += 1;
let a = *num_lock;
s.find_name::<TextView>("TextView").unwrap().set_content(a.to_string());
s.find_name::<TextView>("TextView")
.unwrap()
.set_content(a.to_string());
}));
}
})
});
tx
}
}
}

View File

@ -12,13 +12,18 @@ pub enum WorkerMessage {
impl From<NetworkManagerMessage> for WorkerMessage {
fn from(other: NetworkManagerMessage) -> Self {
#[allow(unused)]
use WorkerMessage::{Info as NewInfo, Error as NewError};
use NetworkManagerMessage::{Error, Info as OldInfo};
#[allow(unused)]
use NetworkManagerMessage::{Info as OldInfo, Error};
use WorkerMessage::{Error as NewError, Info as NewInfo};
match other {
OldInfo {server_name, server_owner}
=> NewInfo {server_owner,server_name},
_ => todo!()
OldInfo {
server_name,
server_owner,
} => NewInfo {
server_owner,
server_name,
},
_ => todo!(),
}
}
}

View File

@ -1,13 +1,16 @@
use std::io::{Error, ErrorKind};
use std::io::Write;
use std::mem;
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io;
use tokio::io::{AsyncWriteExt, BufReader, AsyncBufReadExt, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::Mutex;
use std::{
io::{Error, ErrorKind, Write},
mem,
sync::Arc,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
#[derive(Debug)]
pub struct Connection {
@ -22,24 +25,28 @@ impl Connection {
stream_tx: Mutex::new(None),
})
}
pub async fn connect<T: ToSocketAddrs>(&self, host: T) -> Result<(), Error> {
pub async fn connect<T: ToSocketAddrs>(
&self,
host: T,
) -> Result<(), Error> {
let connection = TcpStream::connect(host).await?;
let (rd, wd) = io::split(connection);
let mut writer_lock = self.stream_tx.lock().await;
let mut reader_lock = self.stream_rx.lock().await;
let _ = mem::replace(&mut *writer_lock, Some(wd));
let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
Ok(())
}
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where T: Serialize {
let mut out_buffer = Vec::new();
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where
T: Serialize,
{
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
@ -56,11 +63,13 @@ impl Connection {
Ok(())
} else {
Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"))
}
};
}
pub async fn read<T>(&self) -> Result<T,Error>
where T: DeserializeOwned {
pub async fn read<T>(&self) -> Result<T, Error>
where
T: DeserializeOwned,
{
let mut buffer = String::new();
let mut reader_lock = self.stream_rx.lock().await;
let old = mem::replace(&mut *reader_lock, None);
@ -87,48 +96,49 @@ impl From<TcpStream> for Connection {
#[cfg(test)]
mod test {
use std::future::Future;
use std::io::Error;
use std::panic;
use std::{future::Future, io::Error, panic};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use serde::{Serialize,Deserialize};
use crate::connection::Connection;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum TestMessages {
Ping,
Pong
Pong,
}
#[tokio::test]
async fn a() -> Result<(), Error> {
wrap_setup(|port| {
async move {
println!("{}", port);
let connection = Connection::new();
connection.connect(format!("localhost:{}", &port)).await.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
}
}).await
wrap_setup(|port| async move {
println!("{}", port);
let connection = Connection::new();
connection
.connect(format!("localhost:{}", &port))
.await
.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
})
.await
}
async fn wrap_setup<T,F>(test: T) -> Result<(), std::io::Error>
where T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future
async fn wrap_setup<T, F>(test: T) -> Result<(), std::io::Error>
where
T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future,
{
let server = TcpListener::bind("localhost:0").await?;
let addr = server.local_addr()?;
// create tokio server execution
tokio::spawn(async move {
while let Ok((stream, addr)) = server.accept().await {
use TestMessages::{Ping,Pong};
use TestMessages::{Ping, Pong};
println!("[server]: Connected {}", &addr);
let connection = Connection::from(stream);
if let Ok(Ping) = connection.read::<TestMessages>().await {
@ -136,7 +146,7 @@ mod test {
}
}
});
test(addr.port()).await;
Ok(())
}

View File

@ -3,8 +3,10 @@
#[cfg(test)]
mod test {
use openssl::sha::sha256;
use openssl::symm::{Cipher, Crypter, Mode};
use openssl::{
sha::sha256,
symm::{Cipher, Crypter, Mode},
};
#[test]
fn testEncryption() {

View File

@ -1,10 +1,13 @@
use crate::event::event_result::EventResultBuilder;
use crate::event::EventResult;
use crate::event::EventResultType;
use std::collections::HashMap;
use futures::channel::oneshot::{channel, Receiver, Sender};
use crate::event::{
event_result::EventResultBuilder,
EventResult,
EventResultType,
};
/// # Eventw
/// Object that holds details about an event being passed through the application.
///
@ -70,7 +73,11 @@ impl<T> EventBuilder<T> {
}
}
pub fn add_arg<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
pub fn add_arg<K: Into<String>, V: Into<String>>(
mut self,
key: K,
value: V,
) -> Self {
self.args.insert(key.into(), value.into());
self
}

View File

@ -1,6 +1,7 @@
use futures::channel::oneshot::Sender;
use std::collections::HashMap;
use futures::channel::oneshot::Sender;
pub enum EventResultType {
Success,
NoResponse,
@ -15,7 +16,10 @@ pub struct EventResult {
}
impl EventResult {
pub fn create(result_type: EventResultType, sender: Sender<EventResult>) -> EventResultBuilder {
pub fn create(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> EventResultBuilder {
EventResultBuilder::new(result_type, sender)
}
}
@ -29,7 +33,10 @@ pub struct EventResultBuilder {
}
impl EventResultBuilder {
pub(self) fn new(result_type: EventResultType, sender: Sender<EventResult>) -> Self {
pub(self) fn new(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> Self {
Self {
code: result_type,
args: HashMap::default(),
@ -43,8 +50,7 @@ impl EventResultBuilder {
}
pub fn send(self) {
self
.sender
self.sender
.send(EventResult {
code: self.code,
args: self.args,

View File

@ -3,6 +3,7 @@ mod event;
mod event_result;
mod responder;
pub use self::responder::IResponder;
pub use event::{Event, EventBuilder};
pub use event_result::{EventResult, EventResultType};
pub use self::responder::IResponder;

View File

@ -1,9 +1,11 @@
use crate::event::Event;
use std::sync::Weak;
use crate::event::Event;
pub trait IResponder<T>
where
T: Sync + Send {
T: Sync + Send,
{
fn post_event(&self, event: Event<T>) {
if let Some(next) = self.get_next() {
if let Some(next) = next.upgrade() {

View File

@ -1,8 +1,8 @@
use crate::ClientDetails;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::ClientDetails;
/// This enum defined the message that the server will receive from a client
/// This uses the serde library to transform to and from json.
#[derive(Serialize, Deserialize)]
@ -11,7 +11,6 @@ pub enum ClientStreamIn {
Connected,
Update,
SendMessage { to: Uuid, content: String },
SendGlobalMessage { content: String },
@ -40,7 +39,7 @@ impl PartialEq for ClientStreamOut {
match (self, other) {
(Connected, Connected) => true,
(Disconnected, Disconnected) => true,
_ => false
_ => false,
}
}
}

View File

@ -24,19 +24,26 @@ pub enum NetworkSockOut {
server_owner: String,
},
Connecting,
Error
Error,
}
impl PartialEq for NetworkSockOut {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(NetworkSockOut::Request, NetworkSockOut::Request) => true,
(NetworkSockOut::GotInfo {server_name,server_owner},
NetworkSockOut::GotInfo {server_owner: owner_other,server_name: name_other})
=> server_name == name_other && server_owner == owner_other,
(
NetworkSockOut::GotInfo {
server_name,
server_owner,
},
NetworkSockOut::GotInfo {
server_owner: owner_other,
server_name: name_other,
},
) => server_name == name_other && server_owner == owner_other,
(NetworkSockOut::Connecting, NetworkSockOut::Connecting) => true,
_ => false
_ => false,
}
}
}

View File

@ -1,6 +1,9 @@
use std::{
sync::{Arc, Weak},
time::Duration,
};
use async_trait::async_trait;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::sleep;
/// # IManager

View File

@ -1,23 +1,25 @@
use std::io::{Error};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::join;
use tokio::net::{TcpStream,TcpListener};
use std::{io::Error, net::SocketAddr, sync::Arc};
use tokio::{
join,
net::{TcpListener, TcpStream},
};
use crate::connection::Connection;
pub async fn create_connection_pair()
-> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr )), Error> {
pub async fn create_connection_pair(
) -> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr)), Error> {
let listener: TcpListener = TcpListener::bind("localhost:0000").await?;
let port = listener.local_addr()?.port();
let (server_res,client_res) = join!(
let (server_res, client_res) = join!(
async { TcpStream::connect(format!("localhost:{}", port)).await },
async { listener.accept().await }
);
let (client,addr) = client_res?;
let (client, addr) = client_res?;
let server = Arc::new(Connection::from(server_res?));
let client = Arc::new(Connection::from(client));
Ok((server,(client,addr)))
}
Ok((server, (client, addr)))
}

View File

@ -1,3 +1,3 @@
mod connection_pair;
pub use connection_pair::create_connection_pair;
pub use connection_pair::create_connection_pair;

View File

@ -1,26 +1,35 @@
//! # actix_server
//! this holds the server actor
//! the server acts as teh main actor
//! the server acts as teh main actor
//! and supervisor to the actor system.
use crate::client_management::{Client};
use crate::client_management::ClientManager;
use crate::client_management::ClientManagerOutput;
use crate::network::Connection;
use crate::network::ConnectionInitiator;
use crate::network::ConnectionMessage;
use crate::network::NetworkOutput;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorFutureExt;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use crate::client_management::ClientManagerMessage;
use foundation::messages::network::NetworkSockOut;
use foundation::ClientDetails;
use crate::network::{NetworkManager, NetworkMessage};
use actix::{
fut::wrap_future,
Actor,
ActorFutureExt,
Addr,
AsyncContext,
Context,
Handler,
};
use foundation::{messages::network::NetworkSockOut, ClientDetails};
use crate::{
client_management::{
Client,
ClientManager,
ClientManagerMessage,
ClientManagerOutput,
},
network::{
Connection,
ConnectionInitiator,
ConnectionMessage,
NetworkManager,
NetworkMessage,
NetworkOutput,
},
};
/// This struct is the main actor of the server.
/// all other actors are ran through here.
@ -42,9 +51,9 @@ impl ServerActor {
&mut self,
_ctx: &mut <Self as Actor>::Context,
addr: Addr<Connection>,
details: ClientDetails
details: ClientDetails,
) {
use ClientManagerMessage::{AddClient};
use ClientManagerMessage::AddClient;
if let Some(mgr) = self.client_management.as_ref() {
let client = Client::new(addr, details.clone());
mgr.do_send(AddClient(details.uuid, client));
@ -56,8 +65,8 @@ impl ServerActor {
ctx: &mut <Self as Actor>::Context,
sender: Addr<Connection>,
) {
use NetworkSockOut::GotInfo;
use ConnectionMessage::{CloseConnection, SendData};
use NetworkSockOut::GotInfo;
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
@ -81,13 +90,12 @@ impl Actor for ServerActor {
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
self
.network_manager
self.network_manager
.replace(NetworkManager::new(addr.clone().recipient().downgrade()));
self
.client_management
.replace(ClientManager::new(addr.clone().recipient::<ClientManagerOutput>().downgrade()));
self.client_management.replace(ClientManager::new(
addr.clone().recipient::<ClientManagerOutput>().downgrade(),
));
if let Some(net_mgr) = self.network_manager.as_ref() {
net_mgr.do_send(NetworkMessage::StartListening);
@ -120,7 +128,11 @@ impl Handler<NetworkOutput> for ServerActor {
impl Handler<ClientManagerOutput> for ServerActor {
type Result = ();
fn handle(&mut self, msg: ClientManagerOutput, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ClientManagerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
todo!()
}
}
}

View File

@ -9,9 +9,7 @@ pub(crate) mod network;
pub(crate) mod prelude;
use actix_server::ServerActor;
use tokio::time::sleep;
use tokio::time::Duration;
use tokio::time::{sleep, Duration};
#[actix::main()]
async fn main() {

View File

@ -1,29 +1,50 @@
use std::net::SocketAddr;
use crate::network::{Connection, ConnectionOuput};
use crate::prelude::ObservableMessage;
use actix::{Actor, Addr, Context, Handler, Message, MessageResponse, WeakAddr, Recipient, Running, ArbiterHandle, AsyncContext};
use actix::{
Actor,
Addr,
ArbiterHandle,
AsyncContext,
Context,
Handler,
Message,
MessageResponse,
Recipient,
Running,
WeakAddr,
};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamOut},
ClientDetails,
};
use serde_json::{from_str, to_string};
use foundation::ClientDetails;
use crate::network::ConnectionMessage;
use uuid::Uuid;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use crate::client_management::client::ClientObservableMessage::{SendGlobalMessageRequest, SendMessageRequest, UpdateRequest};
use crate::network::ConnectionMessage::SendData;
use crate::prelude::ObservableMessage::{Subscribe, Unsubscribe};
use crate::{
client_management::client::ClientObservableMessage::{
SendGlobalMessageRequest,
SendMessageRequest,
UpdateRequest,
},
network::{
Connection,
ConnectionMessage,
ConnectionMessage::SendData,
ConnectionOuput,
},
prelude::{
ObservableMessage,
ObservableMessage::{Subscribe, Unsubscribe},
},
};
/// Message sent ot the clients delegate
#[derive(Message)]
#[rtype(result = "()")]
pub enum ClientMessage {
SendUpdate(Vec<ClientDetails>),
SendMessage {
from: Uuid,
content: String,
},
SendGlobalMessage {
from: Uuid,
content: String,
}
SendMessage { from: Uuid, content: String },
SendGlobalMessage { from: Uuid, content: String },
}
#[derive(Message)]
@ -35,7 +56,7 @@ pub struct ClientDetailsResponse(pub ClientDetails);
/// messages the client will send to itself
enum SelfMessage {
ReceivedMessage(ClientStreamIn)
ReceivedMessage(ClientStreamIn),
}
/// message that is sent to all observers of the current client.
@ -53,7 +74,7 @@ pub enum ClientObservableMessage {
pub struct Client {
connection: Addr<Connection>,
details: ClientDetails,
observers: Vec<Recipient<ClientObservableMessage>>
observers: Vec<Recipient<ClientObservableMessage>>,
}
impl Client {
@ -74,38 +95,52 @@ impl Client {
ctx: &mut Context<Client>,
sender: Addr<Connection>,
addr: SocketAddr,
data: String
data: String,
) {
use ClientStreamIn::{Update, SendMessage, SendGlobalMessage, Disconnect};
let msg = from_str::<ClientStreamIn>(data.as_str()).expect("[Client] failed to decode incoming message");
use ClientStreamIn::{
Disconnect,
SendGlobalMessage,
SendMessage,
Update,
};
let msg = from_str::<ClientStreamIn>(data.as_str())
.expect("[Client] failed to decode incoming message");
match msg {
Update => self.handle_update(ctx),
SendMessage { to, content } => self.handle_send(ctx, to, content),
SendGlobalMessage { content } => self.handle_global_send(ctx, content),
SendGlobalMessage { content } => {
self.handle_global_send(ctx, content)
}
Disconnect => self.handle_disconnect(ctx),
_ => todo!()
_ => todo!(),
}
}
#[inline]
fn handle_update(&self,
ctx: &mut Context<Client>,
) {
fn handle_update(&self, ctx: &mut Context<Client>) {
self.broadcast(UpdateRequest(ctx.address().downgrade()));
}
#[inline]
fn handle_send(&self, ctx: &mut Context<Client>, to: Uuid, content: String) {
fn handle_send(
&self,
ctx: &mut Context<Client>,
to: Uuid,
content: String,
) {
self.broadcast(SendMessageRequest(
ctx.address().downgrade(),
to,
content
content,
));
}
#[inline]
fn handle_global_send(&self, ctx: &mut Context<Client>, content: String) {
self.broadcast(SendGlobalMessageRequest(ctx.address().downgrade(), content));
self.broadcast(SendGlobalMessageRequest(
ctx.address().downgrade(),
content,
));
}
#[inline]
@ -127,23 +162,33 @@ impl Actor for Client {
// tells the client that it has been connected.
fn started(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Connected;
use ConnectionMessage::{SendData};
use ConnectionMessage::SendData;
println!("[Client] started");
self.connection.do_send(Subscribe(ctx.address().recipient()));
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Connected).unwrap()));
self.connection
.do_send(Subscribe(ctx.address().recipient()));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Connected).unwrap(),
));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
use ClientStreamOut::Disconnected;
use ConnectionMessage::{SendData};
self.connection.do_send(Unsubscribe(ctx.address().recipient()));
self.connection.do_send(SendData(to_string::<ClientStreamOut>(&Disconnected).unwrap()));
use ConnectionMessage::SendData;
self.connection
.do_send(Unsubscribe(ctx.address().recipient()));
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&Disconnected).unwrap(),
));
}
}
impl Handler<ClientDataMessage> for Client {
type Result = ClientDetailsResponse;
fn handle(&mut self, msg: ClientDataMessage, ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: ClientDataMessage,
ctx: &mut Self::Context,
) -> Self::Result {
ClientDetailsResponse(self.details.clone())
}
}
@ -156,22 +201,27 @@ impl Handler<ClientMessage> for Client {
msg: ClientMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ClientMessage::{SendUpdate, SendMessage, SendGlobalMessage};
use ClientStreamOut::{ConnectedClients, UserMessage, GlobalMessage};
use ClientMessage::{SendGlobalMessage, SendMessage, SendUpdate};
use ClientStreamOut::{ConnectedClients, GlobalMessage, UserMessage};
match msg {
SendUpdate(clients) => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&ConnectedClients { clients }
).expect("[Client] Failed to encode string"))),
SendMessage {content, from} => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&UserMessage {from,content}
).expect("[Client] Failed to encode string"))),
SendGlobalMessage { from, content } => self.connection.do_send(
SendData(to_string::<ClientStreamOut>(
&GlobalMessage {from,content}
).expect("[Client] Failed to encode string"))),
SendUpdate(clients) => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&ConnectedClients { clients })
.expect("[Client] Failed to encode string"),
)),
SendMessage { content, from } => self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&UserMessage { from, content })
.expect("[Client] Failed to encode string"),
)),
SendGlobalMessage { from, content } => {
self.connection.do_send(SendData(
to_string::<ClientStreamOut>(&GlobalMessage {
from,
content,
})
.expect("[Client] Failed to encode string"),
))
}
_ => todo!(),
}
}
@ -184,13 +234,15 @@ impl Handler<ConnectionOuput> for Client {
fn handle(
&mut self,
msg: ConnectionOuput,
ctx: &mut Self::Context
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::RecvData;
match msg {
RecvData(sender, addr, data) => self.handle_request(ctx, sender, addr, data),
RecvData(sender, addr, data) => {
self.handle_request(ctx, sender, addr, data)
}
_ => todo!()
_ => todo!(),
}
}
}
@ -198,8 +250,12 @@ impl Handler<ConnectionOuput> for Client {
impl Handler<ObservableMessage<ClientObservableMessage>> for Client {
type Result = ();
fn handle(&mut self, msg: ObservableMessage<ClientObservableMessage>, ctx: &mut Self::Context) -> Self::Result {
use ObservableMessage::{Subscribe,Unsubscribe};
fn handle(
&mut self,
msg: ObservableMessage<ClientObservableMessage>,
ctx: &mut Self::Context,
) -> Self::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
println!("[Client] adding subscriber");

View File

@ -1,24 +1,45 @@
use crate::client_management::Client;
use actix::{Actor, ActorFutureExt, ActorStreamExt, ArbiterHandle, MailboxError, Recipient, Running, StreamHandler, WeakAddr};
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::{Message, MessageResponse};
use actix::WeakRecipient;
use std::collections::HashMap;
use actix::fut::{wrap_future, wrap_stream};
use actix::{
fut::{wrap_future, wrap_stream},
Actor,
ActorFutureExt,
ActorStreamExt,
Addr,
ArbiterHandle,
AsyncContext,
Context,
Handler,
MailboxError,
Message,
MessageResponse,
Recipient,
Running,
StreamHandler,
WeakAddr,
WeakRecipient,
};
use foundation::{
messages::client::{ClientStreamIn, ClientStreamIn::SendGlobalMessage},
ClientDetails,
};
use futures::{SinkExt, TryStreamExt};
use uuid::Uuid;
use tokio_stream::StreamExt;
use foundation::ClientDetails;
use foundation::messages::client::ClientStreamIn;
use foundation::messages::client::ClientStreamIn::SendGlobalMessage;
use crate::client_management::client::ClientMessage;
use crate::client_management::client::{ClientDataMessage, ClientObservableMessage};
use crate::client_management::client::ClientMessage::SendMessage;
use crate::network::NetworkOutput;
use crate::prelude::ObservableMessage;
use uuid::Uuid;
use crate::{
client_management::{
client::{
ClientDataMessage,
ClientMessage,
ClientMessage::SendMessage,
ClientObservableMessage,
},
Client,
},
network::NetworkOutput,
prelude::ObservableMessage,
};
#[derive(Message)]
#[rtype(result = "()")]
@ -39,19 +60,19 @@ pub struct ClientManager {
}
impl ClientManager {
pub(crate) fn send_update(&mut self, ctx: &mut Context<Self>, addr: WeakAddr<Client>) {
pub(crate) fn send_update(
&mut self,
ctx: &mut Context<Self>,
addr: WeakAddr<Client>,
) {
println!("[ClientManager] sending update to client");
use ClientMessage::SendUpdate;
let self_addr = ctx.address();
if let Some(to_send) = addr.upgrade() {
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
.collect();
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
let collection =
tokio_stream::iter(client_addr)
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
// .filter(|val| )
@ -71,28 +92,25 @@ impl ClientManager {
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
uuid: Uuid,
content: String
content: String,
) {
println!("[ClientManager] sending message to client");
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
let collection = tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
.collect();
let collection =
tokio_stream::iter(client_addr)
.then(|addr| addr.send(ClientDataMessage))
.map(|val| val.unwrap().0)
.collect();
let fut = wrap_future(async move {
if let Some(sender)= sender.upgrade() {
let from: Uuid = sender.send(ClientDataMessage).await.unwrap().0.uuid;
if let Some(sender) = sender.upgrade() {
let from: Uuid =
sender.send(ClientDataMessage).await.unwrap().0.uuid;
let client_details: Vec<ClientDetails> = collection.await;
let pos = client_details.iter().position(|i| i.uuid == from);
if let Some(pos) = pos {
sender.send(SendMessage {content, from}).await;
sender.send(SendMessage { content, from }).await;
}
}
});
@ -104,23 +122,24 @@ impl ClientManager {
&self,
ctx: &mut Context<ClientManager>,
sender: WeakAddr<Client>,
content: String
content: String,
) {
use ClientMessage::SendGlobalMessage;
let client_addr: Vec<Addr<Client>> = self.clients
.iter()
.map(|(_, v)| v)
.cloned()
.collect();
let client_addr: Vec<Addr<Client>> =
self.clients.iter().map(|(_, v)| v).cloned().collect();
if let Some(sender)= sender.upgrade() {
if let Some(sender) = sender.upgrade() {
let fut = wrap_future(async move {
let from: Uuid = sender.send(ClientDataMessage).await.unwrap().0.uuid;
let collection =
tokio_stream::iter(client_addr)
.then(move |addr| addr.send(SendGlobalMessage { content: content.clone(), from }))
.collect();
let from: Uuid =
sender.send(ClientDataMessage).await.unwrap().0.uuid;
let collection = tokio_stream::iter(client_addr)
.then(move |addr| {
addr.send(SendGlobalMessage {
content: content.clone(),
from,
})
})
.collect();
let a: Vec<_> = collection.await;
});
ctx.spawn(fut);
@ -139,7 +158,12 @@ impl ClientManager {
.start()
}
fn add_client(&mut self, ctx: &mut Context<ClientManager>, uuid: Uuid, addr: Addr<Client>) {
fn add_client(
&mut self,
ctx: &mut Context<ClientManager>,
uuid: Uuid,
addr: Addr<Client>,
) {
println!("[ClientManager] adding client");
use crate::prelude::ObservableMessage::Subscribe;
let recp = ctx.address().recipient::<ClientObservableMessage>();
@ -185,13 +209,25 @@ impl Handler<ClientManagerMessage> for ClientManager {
impl Handler<ClientObservableMessage> for ClientManager {
type Result = ();
fn handle(&mut self, msg: ClientObservableMessage, ctx: &mut Self::Context) -> Self::Result {
use ClientObservableMessage::{SendMessageRequest, UpdateRequest, SendGlobalMessageRequest};
fn handle(
&mut self,
msg: ClientObservableMessage,
ctx: &mut Self::Context,
) -> Self::Result {
use ClientObservableMessage::{
SendGlobalMessageRequest,
SendMessageRequest,
UpdateRequest,
};
match msg {
SendMessageRequest(addr, uuid, content) => self.send_message_request(ctx, addr, uuid, content),
SendGlobalMessageRequest(addr,content) => self.send_global_message_request(ctx, addr, content),
SendMessageRequest(addr, uuid, content) => {
self.send_message_request(ctx, addr, uuid, content)
}
SendGlobalMessageRequest(addr, content) => {
self.send_global_message_request(ctx, addr, content)
}
UpdateRequest(addr) => self.send_update(ctx, addr),
_ => todo!()
_ => todo!(),
}
}
}
}

View File

@ -3,5 +3,7 @@ mod client_manager;
pub(crate) use client::Client;
pub(crate) use client_manager::{
ClientManager, ClientManagerMessage, ClientManagerOutput,
ClientManager,
ClientManagerMessage,
ClientManagerOutput,
};

View File

@ -1,30 +1,33 @@
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorContext;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::SpawnHandle;
use futures::future::join_all;
use futures::Future;
use futures::FutureExt;
use std::{io::Write, net::SocketAddr, pin::Pin, sync::Arc};
use actix::{
fut::wrap_future,
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use futures::{future::join_all, Future, FutureExt};
use serde::Serialize;
use std::io::Write;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::split;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::{
io::{
split,
AsyncBufReadExt,
AsyncWriteExt,
BufReader,
ReadHalf,
WriteHalf,
},
net::TcpStream,
sync::Mutex,
};
use crate::prelude::ObservableMessage;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
@ -99,19 +102,18 @@ impl Actor for Connection {
let mut buffer_string = String::new();
while let Ok(len) = reader.read_line(&mut buffer_string).await {
use SelfMessage::{UpdateObserversWithData};
use ConnectionMessage::CloseConnection;
use SelfMessage::UpdateObserversWithData;
if len == 0 {
println!("[Connection] connection closed");
addr.send(CloseConnection)
.await
.expect("[Connection] failed to send close message to self");
return
addr.send(CloseConnection).await.expect(
"[Connection] failed to send close message to self",
);
return;
}
println!("[Connection] read line");
addr
.send(UpdateObserversWithData(buffer_string.clone()))
addr.send(UpdateObserversWithData(buffer_string.clone()))
.await;
buffer_string.clear();
}

View File

@ -1,22 +1,30 @@
use crate::network::connection::ConnectionOuput;
use crate::network::{Connection, ConnectionMessage};
use crate::prelude::ObservableMessage;
use actix::Actor;
use actix::ActorContext;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::WeakRecipient;
use foundation::messages::client::ClientStreamOut;
use foundation::messages::client::ClientStreamOut::Error;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::ClientDetails;
use serde_json::{from_str, to_string};
use std::net::SocketAddr;
use actix::{
Actor,
ActorContext,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
WeakRecipient,
};
use foundation::{
messages::{
client::{ClientStreamOut, ClientStreamOut::Error},
network::{NetworkSockIn, NetworkSockOut},
},
ClientDetails,
};
use serde_json::{from_str, to_string};
use crate::{
network::{connection::ConnectionOuput, Connection, ConnectionMessage},
prelude::ObservableMessage,
};
#[derive(Debug, Clone, Copy)]
enum ConnectionPhase {
Started,
@ -117,10 +125,11 @@ impl Actor for ConnectionInitiator {
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use super::ConnectionMessage::SendData;
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
use super::ConnectionMessage::SendData;
println!("[ConnectionInitiator] started");
self.connection

View File

@ -1,19 +1,24 @@
use crate::network::connection::Connection;
use crate::network::ConnectionInitiator;
use crate::network::InitiatorOutput;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::SpawnHandle;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::net::{SocketAddr, ToSocketAddrs};
use actix::{
fut::wrap_future,
Actor,
Addr,
AsyncContext,
Context,
Handler,
Message,
Recipient,
SpawnHandle,
};
use tokio::net::TcpListener;
use crate::network::{
connection::Connection,
ConnectionInitiator,
InitiatorOutput,
};
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerMessage {

View File

@ -36,5 +36,7 @@ pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{
NetworkManager, NetworkMessage, NetworkOutput,
NetworkManager,
NetworkMessage,
NetworkOutput,
};

View File

@ -2,22 +2,27 @@
//! This module contains the network manager actor
//! it's role involves handling new oncomming network connections
use crate::network::listener::ListenerOutput;
use crate::network::Connection;
use crate::network::ConnectionInitiator;
use crate::network::InitiatorOutput;
use crate::network::InitiatorOutput::ClientRequest;
use crate::network::ListenerMessage;
use crate::network::NetworkListener;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::WeakRecipient;
use actix::{
Actor,
Addr,
AsyncContext,
Context,
Handler,
Message,
WeakRecipient,
};
use foundation::ClientDetails;
use crate::network::{
listener::ListenerOutput,
Connection,
ConnectionInitiator,
InitiatorOutput,
InitiatorOutput::ClientRequest,
ListenerMessage,
NetworkListener,
};
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub enum NetworkMessage {

View File

@ -1,5 +1,4 @@
use actix::Message;
use actix::Recipient;
use actix::{Message, Recipient};
/// # ObservableMessage
/// represents common messages for observers