Started work on actor conversion

This commit is contained in:
michael-bailey 2022-05-20 17:48:32 +01:00
parent 0624b568f9
commit 7871e8d6a6
13 changed files with 390 additions and 3 deletions

View File

@ -1,10 +1,9 @@
extern crate core;
pub mod connection;
pub mod encryption;
pub mod event;
pub mod messages;
pub mod prelude;
pub mod connection;
pub mod test;
use serde::{Deserialize, Serialize};

View File

@ -1,5 +1,5 @@
hard_tabs = true
max_width = 100
max_width = 80
imports_indent = "Block"
imports_layout = "HorizontalVertical"
imports_granularity = "Crate"

View File

@ -14,6 +14,10 @@ path = "src/lib.rs"
name = "server"
path = "src/main.rs"
[[bin]]
name = "server-actix"
path = "src/actor.rs"
[dependencies]
clap = "2.33.3"
uuid = {version = "0.8", features = ["serde", "v4"]}

18
server/src/actor.rs Normal file
View File

@ -0,0 +1,18 @@
mod network;
pub(crate) mod prelude;
use network::NetworkManager;
use network::NetworkMessage::Ping;
use network::NetworkResponse::Pong;
#[actix::main()]
async fn main() {
let network = NetworkManager::new();
let pong = network.send(Ping).await;
if let Ok(Pong) = pong {
println!("received pong");
} else {
println!("error occurred")
}
}

View File

@ -4,8 +4,10 @@ mod client_manager;
mod event_type;
mod lua;
mod messages;
mod network;
mod network_manager;
// pub mod plugin;
mod prelude;
mod server;
pub use server::Server;

View File

@ -4,8 +4,10 @@ pub mod client_manager;
mod event_type;
mod lua;
pub mod messages;
mod network;
pub mod network_manager;
// mod plugin;
mod prelude;
pub mod server;
use std::io;

View File

@ -0,0 +1,158 @@
/// # connection.rs
/// An actor that handles a TcpStream.
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::SpawnHandle;
use std::net::SocketAddr;
use tokio::io::split;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
#[derive(Message)]
#[rtype(result = "()")]
enum ConnectionMessage {
SendData(String),
CloseConnection,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionOuput {
RecvData(String),
NoMessage,
}
#[derive(Message)]
#[rtype(result = "()")]
enum SelfMessage {
UpdateObserversWithData(String),
}
/// # Connection
/// This manages a TcpStream for a given connection.
///
/// ## Fields
/// - read_half: A temporary store fr the read half of the connection.
/// - write_half: The write half of the connection.
/// - address: The socket address of the conneciton.
/// - observers: A list of observers to events created by the connection.
/// - loop_future: the future holding the receiving loop.
pub(crate) struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: WriteHalf<TcpStream>,
address: SocketAddr,
observers: Vec<Recipient<ConnectionOuput>>,
loop_future: Option<SpawnHandle>,
}
impl Connection {
/// Creates a new Conneciton actor from a Tokio TcpStream,
/// and start's its execution.
/// returns: the Addr of the connection.
pub(super) fn new(stream: TcpStream, address: SocketAddr) -> Addr<Self> {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half,
address,
observers: Vec::new(),
loop_future: None,
}
.start()
}
}
impl Actor for Connection {
type Context = Context<Self>;
/// runs when the actor is started.
/// takes out eh read_half ad turns it into a buffered reader
/// then eneters loop readling lines from the tcp stream
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
let read_half = self
.read_half
.take()
.expect("What the hell did yu do wrong");
ctx.spawn(wrap_future(async move {
let mut reader = BufReader::new(read_half);
let mut buffer_string = String::new();
while let Ok(_) = reader.read_line(&mut buffer_string).await {
use SelfMessage::UpdateObserversWithData;
addr
.send(UpdateObserversWithData(buffer_string.clone()))
.await;
buffer_string.clear();
}
}));
}
}
impl Handler<ObservableMessage<ConnectionOuput>> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ObservableMessage<ConnectionOuput>,
_ctx: &mut Self::Context,
) -> <Self as actix::Handler<ObservableMessage<ConnectionOuput>>>::Result {
use ObservableMessage::{Subscribe, Unsubscribe};
match msg {
Subscribe(r) => {
self.observers.push(r);
}
Unsubscribe(r) => {
self.observers = self
.observers
.clone()
.into_iter()
.filter(|a| a != &r)
.collect();
}
};
}
}
impl Handler<ConnectionMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: ConnectionMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
match msg {
SendData(d) => {}
CloseConnection => {}
};
}
}
impl Handler<SelfMessage> for Connection {
type Result = ();
fn handle(
&mut self,
msg: SelfMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::RecvData;
use SelfMessage::UpdateObserversWithData;
match msg {
UpdateObserversWithData(data) => {
for o in self.observers.clone() {
o.do_send(RecvData(data.clone()));
}
}
};
}
}

View File

@ -0,0 +1,94 @@
/// # listener.rs
/// An actor for listening for new connections from the network
use crate::network::connection::Connection;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::SpawnHandle;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerMessage {
StartListening,
StopListening,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerOutput {
Started,
StartFailed,
NewConnection(Addr<Connection>),
NoConnection,
Error,
Stopped,
}
pub(super) struct NetworkListener {
address: SocketAddr,
// delegate: Arc<RwLock<Recipient<ListenerOutput>>>,
looper: Option<SpawnHandle>,
}
impl NetworkListener {
pub(crate) fn new<T: ToSocketAddrs>(
address: T,
// delegate: Recipient<ListenerOutput>,
) -> Addr<NetworkListener> {
NetworkListener {
address: address
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0],
// delegate: Arc::new(RwLock::new(delegate)),
looper: None,
}
.start()
}
fn start_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
let addr = self.address.clone();
let loop_future = ctx.spawn(wrap_future(async move {
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
let conn = Connection::new(stream, addr);
}
}));
}
fn stop_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
if let Some(fut) = self.looper.take() {
ctx.cancel_future(fut);
}
}
}
impl Actor for NetworkListener {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {}
}
impl Handler<ListenerMessage> for NetworkListener {
type Result = ();
fn handle(
&mut self,
msg: ListenerMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> Self::Result {
use ListenerMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listening(ctx),
StopListening => self.stop_listening(ctx),
}
}
}

View File

@ -0,0 +1,7 @@
mod connection;
mod listener;
mod network_manager;
use connection::Connection;
use listener::{ListenerMessage, NetworkListener};
pub(crate) use network_manager::{NetworkManager, NetworkMessage, NetworkResponse};

View File

@ -0,0 +1,82 @@
use crate::network::ListenerMessage;
use crate::network::NetworkListener;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::MessageResponse;
use actix::SpawnHandle;
use std::time::Duration;
use tokio::net::TcpListener;
#[derive(Message)]
#[rtype(result = "NetworkResponse")]
pub(crate) enum NetworkMessage {
StartListening,
StopListening,
Ping,
}
#[derive(MessageResponse, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub(crate) enum NetworkResponse {
Listening,
NotListening,
Pong,
None,
}
pub(crate) struct NetworkManager {
listener_addr: Addr<NetworkListener>,
}
impl NetworkManager {
pub(crate) fn new() -> Addr<NetworkManager> {
NetworkManager {
listener_addr: NetworkListener::new("0.0.0.0:5600"),
}
.start()
}
fn start_listener(
&mut self,
_ctx: &mut <Self as actix::Actor>::Context,
) -> NetworkResponse {
NetworkResponse::Listening
}
fn stop_listener(
&mut self,
_ctx: &mut <Self as actix::Actor>::Context,
) -> NetworkResponse {
use ListenerMessage::StopListening;
use NetworkResponse::NotListening;
self.listener_addr.do_send(StopListening);
NotListening
}
}
impl Actor for NetworkManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
}
impl Handler<NetworkMessage> for NetworkManager {
type Result = NetworkResponse;
fn handle(
&mut self,
msg: NetworkMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> <Self as Handler<NetworkMessage>>::Result {
use NetworkMessage::{Ping, StartListening, StopListening};
use NetworkResponse::{None, Pong};
match msg {
StartListening => self.start_listener(ctx),
StopListening => None,
Ping => Pong,
}
}
}

View File

@ -0,0 +1,3 @@
mod observer;
pub(crate) use observer::ObservableMessage;

View File

@ -0,0 +1,15 @@
use actix::Message;
use actix::Recipient;
/// # ObservableMessage
/// represents common messages for observers
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ObservableMessage<M>
where
M: Message + Send,
M::Result: Send,
{
Subscribe(Recipient<M>),
Unsubscribe(Recipient<M>),
}

View File

@ -1,3 +1,4 @@
use foundation::connection::Connection;
use std::io::Error;
use std::sync::Arc;
@ -14,6 +15,8 @@ use crate::{
network_manager::{NetworkManager, NetworkManagerMessage},
};
use foundation::prelude::IManager;
#[derive(Debug, Clone)]
pub enum ServerMessage {
ClientConnected {