Grpc-manager #22

Merged
michael-bailey merged 28 commits from grpc-manager into master 2024-05-30 19:42:42 +00:00
7 changed files with 226 additions and 49 deletions
Showing only changes of commit 1feec18697 - Show all commits

View File

@ -21,14 +21,14 @@ path = "src/main.rs"
[dependencies]
chrono = "0.4"
clap = {version = "4.4.8", features = ["derive"]}
uuid = {version = "1.1.2", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid.workspace = true
serde.workspace = true
serde_json.workspace = true
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
zeroize = "1.1.0"
openssl = "0.10.33"
tokio = { version = "1.9.0", features = ["full"] }
tokio.workspace = true
futures = "0.3.16"
async-trait = "0.1.52"
actix = "0.13"
@ -39,4 +39,8 @@ toml = "0.8.8"
aquamarine = "0.3.2"
tokio-stream = "0.1.9"
# protobuf
bytes.workspace = true
prost.workspace = true
foundation = {path = '../foundation'}

View File

@ -0,0 +1,51 @@
use std::net::SocketAddr;
use tokio::{
net::{TcpListener, TcpStream},
select,
sync::mpsc::UnboundedSender,
};
use crate::server_va::ServerMessages;
/// # Listener Manager
/// This stores and awaits for connections from listeners.
/// When a connection is received, it is passed to the server
pub struct ListenerManager {
protobuf_listener: TcpListener,
sender: UnboundedSender<ServerMessages>,
}
impl ListenerManager {
/// Binds listeners and stores them in the ListenerManager
pub async fn new(channel: UnboundedSender<ServerMessages>) -> Self {
let protobuf_listener = TcpListener::bind("0.0.0.0:6500")
.await
.expect("[ListenerManager] failed to bind to 0.0.0.0:6500");
Self {
protobuf_listener,
sender: channel,
}
}
pub async fn run(&self) {
loop {
let accept_protobuf = self.protobuf_listener.accept();
let msg = select! {
Ok((stream, addr)) = accept_protobuf => {
println!("[ListenerManager] accepted connection");
ServerMessages::NewConnection(ConnectionType::ProtobufConnection(stream, addr))
}
};
println!("[ListenerManager] passing message to server");
self.sender.send(msg).unwrap();
println!("[ListenerManager] looping to accept new");
}
}
}
pub enum ConnectionType {
ProtobufConnection(TcpStream, SocketAddr),
}

View File

@ -3,40 +3,24 @@
pub(crate) mod client_management;
pub(crate) mod config_manager;
pub(crate) mod lua;
pub(crate) mod network;
pub(crate) mod prelude;
pub(crate) mod rhai;
pub(crate) mod scripting;
pub(crate) mod server;
use server::Server;
use tokio::{
net::TcpListener,
select,
time::{sleep, Duration},
};
pub mod listener_manager;
pub mod os_signal_manager;
pub mod protobuf_listener;
pub mod server_va;
use crate::server_va::Server;
/// The main function
#[actix::main()]
async fn main() {
// creating listeners
let protobuf_listener = TcpListener::bind("127.0.0.1:6500").await.unwrap();
// todo: convert the actix stuff to whatever this is.
// let json_listener = TcpListener::bind("127.0.0.1:5601").await.unwrap();
let _init = Server::create().build();
select! {
Ok((stream, addr)) = protobuf_listener.accept() => {
},
};
loop {
sleep(Duration::from_millis(1000)).await;
}
Server::default().run().await;
}

View File

@ -0,0 +1,25 @@
use tokio::sync::mpsc::UnboundedSender;
use crate::server_va::ServerMessages;
pub struct OSSignalManager {
server_channel: UnboundedSender<ServerMessages>,
}
impl OSSignalManager {
pub fn new(channel: UnboundedSender<ServerMessages>) -> Self {
Self {
server_channel: channel,
}
}
pub async fn run(&self) {
loop {
tokio::signal::ctrl_c().await.unwrap();
self
.server_channel
.send(ServerMessages::Exit)
.expect("[OSSignalManager] server channel closed");
}
}
}

View File

@ -0,0 +1,3 @@
pub struct ProtobufListener;
impl ProtobufListener {}

View File

@ -25,7 +25,6 @@ use crate::{
ConfigManagerDataResponse,
ConfigValue,
},
lua::LuaManager,
network::{
Connection,
ConnectionMessage::{CloseConnection, SendData},
@ -34,7 +33,6 @@ use crate::{
NetworkOutput::{InfoRequested, NewClient},
},
prelude::messages::NetworkMessage,
rhai::RhaiManager,
server::{builder, ServerBuilder, ServerDataMessage, ServerDataResponse},
};
@ -46,8 +44,6 @@ pub struct Server {
network_manager: Option<Addr<NetworkManager>>,
client_manager: Option<Addr<ClientManager>>,
rhai_manager: Option<Addr<RhaiManager>>,
lua_manager: Option<Addr<LuaManager>>,
}
impl Server {
@ -100,23 +96,9 @@ impl Actor for Server {
let nm = NetworkManager::create(addr.clone().recipient()).build();
let cm = ClientManager::new(addr.recipient());
let rm = RhaiManager::create(
ctx.address().downgrade(),
nm.downgrade(),
cm.downgrade(),
)
.build();
let lm = LuaManager::create(
ctx.address().downgrade(),
nm.downgrade(),
cm.downgrade(),
)
.build();
self.network_manager.replace(nm.clone());
self.client_manager.replace(cm.clone());
self.rhai_manager.replace(rm);
self.lua_manager.replace(lm);
nm.do_send(NetworkMessage::StartListening);
@ -204,8 +186,6 @@ impl From<ServerBuilder> for Server {
network_manager: None,
client_manager: None,
rhai_manager: None,
lua_manager: None,
}
}
}

130
server/src/server_va.rs Normal file
View File

@ -0,0 +1,130 @@
use std::net::SocketAddr;
use bytes::{BufMut, BytesMut};
use foundation::{
networking::{read_message, write_message},
prelude::{
network_client_message,
network_server_message,
GetInfo,
Info,
NetworkClientMessage,
NetworkServerMessage,
Request,
},
};
use prost::Message as pMessage;
use tokio::{
io::AsyncWriteExt,
net::TcpStream,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
Mutex,
},
task::JoinHandle,
};
use crate::{
listener_manager::{ConnectionType, ListenerManager},
os_signal_manager::OSSignalManager,
};
pub struct Server {
os_event_manager_task: JoinHandle<()>,
listener_task: JoinHandle<()>,
receiver: Mutex<UnboundedReceiver<ServerMessages>>,
}
impl Server {
pub async fn run(&self) {
loop {
let mut lock = self.receiver.lock().await;
let msg = lock.recv().await;
drop(lock);
match msg {
Some(ServerMessages::Exit) | None => {
println!("[Server] Shutting down");
self.shutdown();
return;
}
Some(ServerMessages::NewConnection(
ConnectionType::ProtobufConnection(stream, addr),
)) => {
println!("[Server] New protobuf connection");
self.handle_protobuf_connection(stream, addr).await;
}
};
}
}
async fn handle_protobuf_connection(
&self,
mut stream: TcpStream,
addr: SocketAddr,
) {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::Request(Request {
a: true,
})),
};
println!("[Server] made message {:?}", message);
write_message(&mut stream, message).await.unwrap();
let request = read_message::<NetworkClientMessage>(&mut stream)
.await
.unwrap();
match request {
NetworkClientMessage {
message: Some(network_client_message::Message::GetInfo(GetInfo {})),
} => {
let message = NetworkServerMessage {
message: Some(network_server_message::Message::GotInfo(Info {
server_name: "Test server".into(),
owner: "mickyb18a@gmail.com".into(),
})),
};
write_message(&mut stream, message).await.unwrap();
}
_ => {
println!("[Server] message not supported");
}
}
let _ = stream.flush().await;
}
fn shutdown(&self) {
self.os_event_manager_task.abort();
self.listener_task.abort();
}
}
impl Default for Server {
fn default() -> Self {
let (tx, rx) = unbounded_channel();
let tx1 = tx.clone();
let tx2 = tx.clone();
let os_event_manager_task = tokio::spawn(async move {
OSSignalManager::new(tx1).run().await;
});
let listener_task = tokio::spawn(async move {
ListenerManager::new(tx2).await.run().await;
});
Self {
os_event_manager_task,
receiver: Mutex::new(rx),
listener_task,
}
}
}
pub enum ServerMessages {
Exit,
NewConnection(ConnectionType),
}