implemented GetInfo for actix server

This commit is contained in:
michael-bailey 2022-05-30 08:57:57 +01:00
parent 7871e8d6a6
commit 63181ec9b5
8 changed files with 446 additions and 136 deletions

View File

@ -1,63 +1,63 @@
use futures::lock::Mutex;
use serverlib::plugin::WeakPluginInterface;
use std::sync::Mutex as StdMutex;
use std::thread::sleep;
use std::time::Duration;
// use futures::lock::Mutex;
// use serverlib::plugin::WeakPluginInterface;
// use std::sync::Mutex as StdMutex;
// use std::thread::sleep;
// use std::time::Duration;
use serverlib::plugin::IPlugin;
use serverlib::plugin::PluginDetails;
// use serverlib::plugin::IPlugin;
// use serverlib::plugin::PluginDetails;
#[derive(Debug)]
pub struct ExamplePlugin {
number: Mutex<u8>,
interface: StdMutex<Option<WeakPluginInterface>>,
}
// #[derive(Debug)]
// pub struct ExamplePlugin {
// number: Mutex<u8>,
// interface: StdMutex<Option<WeakPluginInterface>>,
// }
impl Default for ExamplePlugin {
fn default() -> Self {
ExamplePlugin {
number: Mutex::new(0),
interface: StdMutex::default(),
}
}
}
// impl Default for ExamplePlugin {
// fn default() -> Self {
// ExamplePlugin {
// number: Mutex::new(0),
// interface: StdMutex::default(),
// }
// }
// }
#[async_trait::async_trait]
impl IPlugin for ExamplePlugin {
fn details(&self) -> PluginDetails {
PluginDetails {
display_name: "ExamplePlugin",
id: "io.github.michael-bailey.ExamplePlugin",
version: "0.0.1",
contacts: vec!["bailey-michael1@outlook.com"],
}
}
// #[async_trait::async_trait]
// impl IPlugin for ExamplePlugin {
// fn details(&self) -> PluginDetails {
// PluginDetails {
// display_name: "ExamplePlugin",
// id: "io.github.michael-bailey.ExamplePlugin",
// version: "0.0.1",
// contacts: vec!["bailey-michael1@outlook.com"],
// }
// }
fn set_interface(&self, interface: WeakPluginInterface) {
if let Ok(mut lock) = self.interface.lock() {
*lock = Some(interface);
}
}
// fn set_interface(&self, interface: WeakPluginInterface) {
// if let Ok(mut lock) = self.interface.lock() {
// *lock = Some(interface);
// }
// }
async fn event(&self) {
println!("Not Implemented");
}
// async fn event(&self) {
// println!("Not Implemented");
// }
fn init(&self) {
println!("[ExamplePlugin]: example init")
}
// fn init(&self) {
// println!("[ExamplePlugin]: example init")
// }
async fn run(&self) {
println!("Example!!!");
sleep(Duration::new(1, 0));
let mut a = self.number.lock().await;
*a = a.overflowing_add(1).0;
println!("[ExamplePlugin]: example run {}", *a);
}
// async fn run(&self) {
// println!("Example!!!");
// sleep(Duration::new(1, 0));
// let mut a = self.number.lock().await;
// *a = a.overflowing_add(1).0;
// println!("[ExamplePlugin]: example run {}", *a);
// }
fn deinit(&self) {
if let Some(mut lock) = self.number.try_lock() {
*lock = 0;
}
}
}
// fn deinit(&self) {
// if let Some(mut lock) = self.number.try_lock() {
// *lock = 0;
// }
// }
// }

View File

@ -2,13 +2,13 @@ mod example;
use std::sync::Arc;
use serverlib::plugin::Plugin;
// use serverlib::plugin::Plugin;
use crate::example::ExamplePlugin;
use serverlib::plugin::plugin::Plugin;
use std::sync::Arc;
// use crate::example::ExamplePlugin;
// use serverlib::plugin::plugin::Plugin;
// use std::sync::Arc;
#[no_mangle]
pub extern "C" fn get_plugin() -> Plugin {
Arc::new(ExamplePlugin::default())
}
// #[no_mangle]
// pub extern "C" fn get_plugin() -> Plugin {
// Arc::new(ExamplePlugin::default())
// }

View File

@ -1,18 +1,89 @@
mod network;
pub(crate) mod prelude;
use network::NetworkManager;
use network::NetworkMessage::Ping;
use network::NetworkResponse::Pong;
use crate::network::ConnectionMessage;
use crate::network::NetworkOutput;
use actix::clock::sleep;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorFutureExt;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use foundation::messages::network::NetworkSockOut;
use network::{NetworkManager, NetworkMessage};
use std::time::Duration;
/// This struct is the main actor of teh server.
/// all other actors are ran through here.
struct Server {
network_manager: Option<Addr<NetworkManager>>,
}
impl Server {
pub(crate) fn new() -> Addr<Self> {
Server {
network_manager: None,
}
.start()
}
}
impl Actor for Server {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self
.network_manager
.replace(NetworkManager::new(ctx.address().recipient()));
if let Some(net_mgr) = self.network_manager.as_ref() {
net_mgr.do_send(NetworkMessage::StartListening);
}
}
}
impl Handler<NetworkOutput> for Server {
type Result = ();
fn handle(
&mut self,
msg: NetworkOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
use NetworkOutput::{InfoRequested, NewClient};
use NetworkSockOut::GotInfo;
println!("server received message");
match msg {
// This uses promise like funcionality to queue
// a set of async operations,
// so they occur in the right order
InfoRequested(sender) => {
let fut = wrap_future(
sender.send(SendData(
serde_json::to_string(&GotInfo {
server_name: "String".to_owned(),
server_owner: "String".to_owned(),
})
.expect("Failed to serialise"),
)),
)
// equivalent to using .then() in js
.map(move |out, act: &mut Self, ctx| {
sender.do_send(CloseConnection);
});
ctx.spawn(fut);
}
NewClient(_, _) => todo!(),
};
}
}
#[actix::main()]
async fn main() {
let network = NetworkManager::new();
let pong = network.send(Ping).await;
if let Ok(Pong) = pong {
println!("received pong");
} else {
println!("error occurred")
let server = Server::new();
loop {
sleep(Duration::from_millis(500)).await;
}
}

View File

@ -1,8 +1,7 @@
/// # connection.rs
/// An actor that handles a TcpStream.
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorContext;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
@ -10,17 +9,27 @@ use actix::Handler;
use actix::Message;
use actix::Recipient;
use actix::SpawnHandle;
use futures::future::join_all;
use futures::Future;
use futures::FutureExt;
use serde::Serialize;
use std::io::Write;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::split;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
/// This is a message that can be sent to the Connection.
#[derive(Message)]
#[rtype(result = "()")]
enum ConnectionMessage {
pub(crate) enum ConnectionMessage {
SendData(String),
CloseConnection,
}
@ -28,8 +37,8 @@ enum ConnectionMessage {
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum ConnectionOuput {
RecvData(String),
NoMessage,
RecvData(Addr<Connection>, SocketAddr, String),
ConnectionClosed(Addr<Connection>),
}
#[derive(Message)]
@ -49,7 +58,7 @@ enum SelfMessage {
/// - loop_future: the future holding the receiving loop.
pub(crate) struct Connection {
read_half: Option<ReadHalf<TcpStream>>,
write_half: WriteHalf<TcpStream>,
write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
address: SocketAddr,
observers: Vec<Recipient<ConnectionOuput>>,
loop_future: Option<SpawnHandle>,
@ -63,7 +72,7 @@ impl Connection {
let (read_half, write_half) = split(stream);
Connection {
read_half: Some(read_half),
write_half,
write_half: Arc::new(Mutex::new(write_half)),
address,
observers: Vec::new(),
loop_future: None,
@ -128,12 +137,21 @@ impl Handler<ConnectionMessage> for Connection {
fn handle(
&mut self,
msg: ConnectionMessage,
_ctx: &mut Self::Context,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionMessage::{CloseConnection, SendData};
let writer = self.write_half.clone();
match msg {
SendData(d) => {}
CloseConnection => {}
SendData(d) => {
ctx.spawn(wrap_future(async move {
let mut lock = writer.lock().await;
let mut buffer = Vec::new();
writeln!(&mut buffer, "{}", d.as_str());
lock.write_all(&buffer).await;
}));
}
CloseConnection => ctx.stop(),
};
}
}
@ -143,15 +161,31 @@ impl Handler<SelfMessage> for Connection {
fn handle(
&mut self,
msg: SelfMessage,
_ctx: &mut Self::Context,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::RecvData;
use SelfMessage::UpdateObserversWithData;
match msg {
UpdateObserversWithData(data) => {
for o in self.observers.clone() {
o.do_send(RecvData(data.clone()));
let send = ctx.address();
let addr = self.address.clone();
// this is a mess
let futs: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.observers
.iter()
.cloned()
.map(|r| {
let send = send.clone();
let data = data.clone();
async move {
let _ = r.send(RecvData(send, addr, data)).await;
}
.boxed()
})
.collect();
let _ = ctx.spawn(wrap_future(async {
join_all(futs).await;
}));
}
};
}

View File

@ -0,0 +1,139 @@
use crate::network::connection::ConnectionOuput;
use crate::network::Connection;
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::ActorContext;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::Recipient;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
use foundation::ClientDetails;
use serde_json::{from_str, to_string};
use std::net::SocketAddr;
#[derive(Debug, Clone, Copy)]
enum ConnectionPhase {
Started,
Requested,
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum InitiatorOutput {
InfoRequest(Addr<Connection>),
ClientRequest(Addr<Connection>, ClientDetails),
}
/// # ConnectionInitiator
/// Handles the initiatin of a new connection.
///
/// This will do one of two things:
/// - Create a new client and send it to the network manager.
/// - Request the eserver info and send it to the connection.
pub(crate) struct ConnectionInitiator {
delegate: Recipient<InitiatorOutput>,
connection: Addr<Connection>,
}
impl ConnectionInitiator {
pub(crate) fn new(
delegate: Recipient<InitiatorOutput>,
connection: Addr<Connection>,
) -> Addr<Self> {
ConnectionInitiator {
connection,
delegate,
}
.start()
}
fn handle_request(
&mut self,
sender: Addr<Connection>,
ctx: &mut <Self as Actor>::Context,
address: SocketAddr,
data: String,
) {
use InitiatorOutput::{ClientRequest, InfoRequest};
use NetworkSockIn::{Connect, Info};
use NetworkSockOut::{Connecting, GotInfo};
use ObservableMessage::Unsubscribe;
let msg = from_str::<NetworkSockIn>(data.as_str())
.expect("error deserialising incomming message");
match msg {
Info => self
.delegate
.do_send(InfoRequest(sender))
.expect("Failed to send info request Message"),
Connect {
uuid,
username,
address,
} => self
.delegate
.do_send(ClientRequest(
sender,
ClientDetails {
uuid,
username,
address,
public_key: None,
},
))
.expect("Failed to send connect request"),
};
ctx.stop();
}
}
impl Actor for ConnectionInitiator {
type Context = Context<Self>;
/// on start initiate the protocol.
/// also add self as a subscriber to the connection.
fn started(&mut self, ctx: &mut Self::Context) {
use super::ConnectionMessage::SendData;
use NetworkSockOut::Request;
use ObservableMessage::Subscribe;
self
.connection
.do_send(Subscribe(ctx.address().recipient()));
self
.connection
.do_send(SendData(to_string(&Request).unwrap()));
}
/// once stopped remove self from the connection subscribers
fn stopped(&mut self, ctx: &mut Self::Context) {
use ObservableMessage::Unsubscribe;
self
.connection
.do_send(Unsubscribe(ctx.address().recipient()));
}
}
impl Handler<ConnectionOuput> for ConnectionInitiator {
type Result = ();
fn handle(
&mut self,
msg: ConnectionOuput,
ctx: &mut Self::Context,
) -> Self::Result {
use ConnectionOuput::{ConnectionClosed, RecvData};
use ConnectionPhase::Requested;
match msg {
RecvData(sender, addr, data) => {
self.handle_request(sender, ctx, addr, data)
}
ConnectionClosed(_) => todo!(),
}
}
}

View File

@ -1,6 +1,6 @@
/// # listener.rs
/// An actor for listening for new connections from the network
use crate::network::connection::Connection;
use crate::network::ConnectionInitiator;
use crate::network::InitiatorOutput;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
@ -12,7 +12,6 @@ use actix::Recipient;
use actix::SpawnHandle;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
@ -26,46 +25,51 @@ pub(super) enum ListenerMessage {
#[derive(Message)]
#[rtype(result = "()")]
pub(super) enum ListenerOutput {
Started,
StartFailed,
NewConnection(Addr<Connection>),
NoConnection,
Error,
Stopped,
InfoRequest(Addr<Connection>),
}
pub(super) struct NetworkListener {
address: SocketAddr,
// delegate: Arc<RwLock<Recipient<ListenerOutput>>>,
delegate: Recipient<ListenerOutput>,
looper: Option<SpawnHandle>,
}
impl NetworkListener {
pub(crate) fn new<T: ToSocketAddrs>(
address: T,
// delegate: Recipient<ListenerOutput>,
delegate: Recipient<ListenerOutput>,
) -> Addr<NetworkListener> {
NetworkListener {
address: address
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0],
// delegate: Arc::new(RwLock::new(delegate)),
delegate,
looper: None,
}
.start()
}
/// called when the actor is to start listening
fn start_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("Network listener started listening");
let addr = self.address.clone();
let self_addr = ctx.address();
let loop_future = ctx.spawn(wrap_future(async move {
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, addr)) = listener.accept().await {
println!("new tcp connection");
let conn = Connection::new(stream, addr);
let connection_initiator =
ConnectionInitiator::new(self_addr.clone().recipient(), conn);
}
}));
}
/// called when the actor is to stop listening
fn stop_listening(&mut self, ctx: &mut <Self as Actor>::Context) {
println!("Network listener stopped listening");
if let Some(fut) = self.looper.take() {
ctx.cancel_future(fut);
}
@ -75,7 +79,9 @@ impl NetworkListener {
impl Actor for NetworkListener {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {}
fn started(&mut self, ctx: &mut Self::Context) {
println!("Network listener started");
}
}
impl Handler<ListenerMessage> for NetworkListener {
@ -92,3 +98,21 @@ impl Handler<ListenerMessage> for NetworkListener {
}
}
}
impl Handler<InitiatorOutput> for NetworkListener {
type Result = ();
fn handle(
&mut self,
msg: InitiatorOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use InitiatorOutput::{ClientRequest, InfoRequest};
match msg {
ClientRequest(addr, client_details) => {}
InfoRequest(addr) => {
println!("Got Info request");
self.delegate.do_send(ListenerOutput::InfoRequest(addr));
}
}
}
}

View File

@ -1,7 +1,11 @@
mod connection;
mod connection_initiator;
mod listener;
mod network_manager;
use connection::Connection;
use listener::{ListenerMessage, NetworkListener};
pub(crate) use network_manager::{NetworkManager, NetworkMessage, NetworkResponse};
pub(crate) use connection::{Connection, ConnectionMessage, ConnectionOuput};
pub(crate) use connection_initiator::{ConnectionInitiator, InitiatorOutput};
use listener::{ListenerMessage, ListenerOutput, NetworkListener};
pub(crate) use network_manager::{
NetworkManager, NetworkMessage, NetworkOutput,
};

View File

@ -1,82 +1,120 @@
use crate::network::connection::ConnectionOuput;
use crate::network::listener::ListenerOutput;
use crate::network::Connection;
use crate::network::ListenerMessage;
use crate::network::NetworkListener;
use crate::prelude::ObservableMessage;
use actix::fut::wrap_future;
use actix::Actor;
use actix::Addr;
use actix::AsyncContext;
use actix::Context;
use actix::Handler;
use actix::Message;
use actix::MessageResponse;
use actix::SpawnHandle;
use std::time::Duration;
use tokio::net::TcpListener;
use actix::Recipient;
use foundation::ClientDetails;
#[derive(Message)]
#[rtype(result = "NetworkResponse")]
#[derive(Message, Debug, Ord, PartialOrd, Eq, PartialEq)]
#[rtype(result = "()")]
pub(crate) enum NetworkMessage {
StartListening,
StopListening,
Ping,
}
#[derive(MessageResponse, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub(crate) enum NetworkResponse {
Listening,
NotListening,
Pong,
None,
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) enum NetworkOutput {
NewClient(Addr<Connection>, ClientDetails),
InfoRequested(Addr<Connection>),
}
pub(crate) struct NetworkManager {
listener_addr: Addr<NetworkListener>,
listener_addr: Option<Addr<NetworkListener>>,
delegate: Recipient<NetworkOutput>,
}
impl NetworkManager {
pub(crate) fn new() -> Addr<NetworkManager> {
pub(crate) fn new(
delegate: Recipient<NetworkOutput>,
) -> Addr<NetworkManager> {
NetworkManager {
listener_addr: NetworkListener::new("0.0.0.0:5600"),
listener_addr: None,
delegate,
}
.start()
}
fn start_listener(
&mut self,
_ctx: &mut <Self as actix::Actor>::Context,
) -> NetworkResponse {
NetworkResponse::Listening
fn start_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StartListening;
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StartListening);
}
}
fn stop_listener(
&mut self,
_ctx: &mut <Self as actix::Actor>::Context,
) -> NetworkResponse {
fn stop_listener(&mut self, _ctx: &mut <Self as actix::Actor>::Context) {
use ListenerMessage::StopListening;
use NetworkResponse::NotListening;
self.listener_addr.do_send(StopListening);
NotListening
if let Some(addr) = self.listener_addr.as_ref() {
addr.do_send(StopListening);
}
}
fn new_connection(
&mut self,
ctx: &mut <Self as Actor>::Context,
connection: Addr<Connection>,
) {
println!("Got new connection");
let delegate = self.delegate.clone();
ctx.spawn(wrap_future(async move {
// delegate.send(NewConnection(recipient))
// delegate.send().await;
// delegate.send().await;
}));
}
}
impl Actor for NetworkManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
fn started(&mut self, ctx: &mut Self::Context) {
println!("started network manager");
let recipient = ctx.address().recipient();
self
.listener_addr
.replace(NetworkListener::new("0.0.0.0:5600", recipient));
}
}
impl Handler<NetworkMessage> for NetworkManager {
type Result = NetworkResponse;
type Result = ();
fn handle(
&mut self,
msg: NetworkMessage,
ctx: &mut <Self as actix::Actor>::Context,
) -> <Self as Handler<NetworkMessage>>::Result {
use NetworkMessage::{Ping, StartListening, StopListening};
use NetworkResponse::{None, Pong};
use NetworkMessage::{StartListening, StopListening};
match msg {
StartListening => self.start_listener(ctx),
StopListening => None,
Ping => Pong,
StopListening => self.stop_listener(ctx),
}
}
}
impl Handler<ListenerOutput> for NetworkManager {
type Result = ();
fn handle(
&mut self,
msg: ListenerOutput,
ctx: &mut Self::Context,
) -> Self::Result {
use ListenerOutput::{InfoRequest, NewConnection};
match msg {
NewConnection(connection) => self.new_connection(ctx, connection),
InfoRequest(connection) => self
.delegate
.do_send(NetworkOutput::InfoRequested(connection))
.expect("failed to send message"),
_ => todo!(),
};
}
}