Grpc-manager #22

Merged
michael-bailey merged 28 commits from grpc-manager into master 2024-05-30 19:42:42 +00:00
11 changed files with 4 additions and 464 deletions
Showing only changes of commit 32a93e0c8a - Show all commits

View File

@ -23,4 +23,6 @@ serde_json = "1.0"
openssl = "0.10"
uuid = {version = "1.1.2", features = ["serde", "v4"]}
tokio = { version = "1.9.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
protocol = { path = '../protocol' }

View File

@ -1,146 +0,0 @@
use std::{
io::{Error, ErrorKind, Write},
mem,
sync::Arc,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
#[derive(Debug)]
pub struct Connection {
stream_rx: Mutex<Option<BufReader<ReadHalf<tokio::net::TcpStream>>>>,
stream_tx: Mutex<Option<WriteHalf<tokio::net::TcpStream>>>,
}
impl Connection {
pub fn new() -> Arc<Self> {
Arc::new(Connection {
stream_rx: Mutex::new(None),
stream_tx: Mutex::new(None),
})
}
pub async fn connect<T: ToSocketAddrs>(&self, host: T) -> Result<(), Error> {
let connection = TcpStream::connect(host).await?;
let (rd, wd) = io::split(connection);
let mut writer_lock = self.stream_tx.lock().await;
let mut reader_lock = self.stream_rx.lock().await;
let _ = mem::replace(&mut *writer_lock, Some(wd));
let _ = mem::replace(&mut *reader_lock, Some(BufReader::new(rd)));
Ok(())
}
pub async fn write<T>(&self, message: T) -> Result<(), Error>
where
T: Serialize,
{
let mut out_buffer = Vec::new();
let out = serde_json::to_string(&message).unwrap();
let mut writer_lock = self.stream_tx.lock().await;
let old = mem::replace(&mut *writer_lock, None);
writeln!(&mut out_buffer, "{}", out)?;
let Some(mut writer) = old else {
return Err(Error::new(ErrorKind::Interrupted, "Writer does not exist"));
};
writer.write_all(&out_buffer).await?;
writer.flush().await?;
let _ = mem::replace(&mut *writer_lock, Some(writer));
Ok(())
}
pub async fn read<T>(&self) -> Result<T, Error>
where
T: DeserializeOwned,
{
let mut buffer = String::new();
let mut reader_lock = self.stream_rx.lock().await;
let old = mem::replace(&mut *reader_lock, None);
if let Some(mut reader) = old {
let _ = reader.read_line(&mut buffer).await?;
let _ = mem::replace(&mut *reader_lock, Some(reader));
Ok(serde_json::from_str(&buffer).unwrap())
} else {
Err(Error::new(ErrorKind::Interrupted, "Reader does not exist"))
}
}
}
impl From<TcpStream> for Connection {
fn from(stream: TcpStream) -> Self {
let (rd, wd) = io::split(stream);
Connection {
stream_tx: Mutex::new(Some(wd)),
stream_rx: Mutex::new(Some(BufReader::new(rd))),
}
}
}
#[cfg(test)]
mod test {
use std::{future::Future, io::Error, panic};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use crate::connection::Connection;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum TestMessages {
Ping,
Pong,
}
#[tokio::test]
async fn a() -> Result<(), Error> {
wrap_setup(|port| async move {
println!("{}", port);
let connection = Connection::new();
connection
.connect(format!("localhost:{}", &port))
.await
.unwrap();
connection.write(&TestMessages::Ping).await.unwrap();
let res = connection.read::<TestMessages>().await.unwrap();
assert_eq!(res, TestMessages::Pong);
})
.await
}
async fn wrap_setup<T, F>(test: T) -> Result<(), std::io::Error>
where
T: FnOnce(u16) -> F + panic::UnwindSafe,
F: Future,
{
let server = TcpListener::bind("localhost:0").await?;
let addr = server.local_addr()?;
// create tokio server execution
tokio::spawn(async move {
while let Ok((stream, addr)) = server.accept().await {
use TestMessages::{Ping, Pong};
println!("[server]: Connected {}", &addr);
let connection = Connection::from(stream);
if let Ok(Ping) = connection.read::<TestMessages>().await {
connection.write::<TestMessages>(Pong).await.unwrap()
}
}
});
test(addr.port()).await;
Ok(())
}
}

View File

@ -1,41 +0,0 @@
// use openssl::sha::sha256;
// use openssl::symm::{Cipher, Crypter, Mode};
#[cfg(test)]
mod test {
use openssl::{
sha::sha256,
symm::{Cipher, Crypter, Mode},
};
#[test]
fn testEncryption() {
let plaintext = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".as_bytes();
let key = sha256(b"This is a key");
let IV = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let encrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Encrypt, &key, Some(IV));
let mut ciphertext = vec![0u8; 1024];
let cipherlen = encrypter
.unwrap()
.update(plaintext, ciphertext.as_mut_slice())
.unwrap();
let decrypter =
Crypter::new(Cipher::aes_256_gcm(), Mode::Decrypt, &key, Some(IV));
let mut decrypted = vec![0u8; 1024];
decrypter
.unwrap()
.update(&ciphertext[..cipherlen], decrypted.as_mut_slice())
.unwrap();
println!("{:?}", plaintext);
println!("{:?}", ciphertext.as_slice());
println!("{:?}", decrypted.as_slice());
println!("{:?}", plaintext.len());
println!("{:?}", ciphertext.len());
println!("{:?}", decrypted.len());
}
}

View File

@ -1,97 +0,0 @@
use std::collections::HashMap;
use futures::channel::oneshot::{channel, Receiver, Sender};
use crate::event::{
event_result::EventResultBuilder,
EventResult,
EventResultType,
};
/// # Eventw
/// Object that holds details about an event being passed through the application.
///
/// ## Properties
/// - r#type: The event type
/// - args: A hashmap of arguments to be carried by the event
/// - sender: The sender to send the result for the event.
/// - receiver: The reciever of the event result from the event.
pub struct Event<T>
where
T: Sync + Send,
{
pub r#type: T,
args: HashMap<String, String>,
sender: Sender<EventResult>,
receiver: Option<Receiver<EventResult>>,
}
impl<T> Event<T>
where
T: Sync + Send,
{
/// Fetches an argument from the arguments of the event.
pub fn get_arg(&self, key: String) -> Option<String> {
self.args.get(&key).cloned()
}
/// Creates an event result using the sender of the event.
/// This consumes the event.
pub fn respond(self, result_type: EventResultType) -> EventResultBuilder {
EventResult::create(result_type, self.sender)
}
/// Used to await the result of the event if required.
pub fn get_reciever(&mut self) -> Receiver<EventResult> {
self.receiver.take().unwrap()
}
}
pub struct EventBuilder<T> {
#[allow(dead_code)]
r#type: T,
#[allow(dead_code)]
args: HashMap<String, String>,
#[allow(dead_code)]
sender: Sender<EventResult>,
#[allow(dead_code)]
receiver: Option<Receiver<EventResult>>,
}
impl<T> EventBuilder<T> {
#[allow(dead_code)]
pub(super) fn new(r#type: T) -> EventBuilder<T> {
let (sender, receiver) = channel();
EventBuilder {
r#type,
args: HashMap::new(),
sender,
receiver: Some(receiver),
}
}
pub fn add_arg<K: Into<String>, V: Into<String>>(
mut self,
key: K,
value: V,
) -> Self {
self.args.insert(key.into(), value.into());
self
}
#[allow(dead_code)]
pub(crate) fn build(self) -> Event<T>
where
T: Sync + Send,
{
Event {
r#type: self.r#type,
args: self.args,
sender: self.sender,
receiver: self.receiver,
}
}
}

View File

@ -1,61 +0,0 @@
use std::collections::HashMap;
use futures::channel::oneshot::Sender;
pub enum EventResultType {
Success,
NoResponse,
InvalidArgs,
InvalidCode,
Other(String),
}
pub struct EventResult {
code: EventResultType,
args: HashMap<String, String>,
}
impl EventResult {
pub fn create(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> EventResultBuilder {
EventResultBuilder::new(result_type, sender)
}
}
/// # EventResultBuilder
/// Builds the result of an event
pub struct EventResultBuilder {
code: EventResultType,
args: HashMap<String, String>,
sender: Sender<EventResult>,
}
impl EventResultBuilder {
pub(self) fn new(
result_type: EventResultType,
sender: Sender<EventResult>,
) -> Self {
Self {
code: result_type,
args: HashMap::default(),
sender,
}
}
pub fn add_arg(mut self, key: String, value: String) -> Self {
self.args.insert(key, value);
self
}
pub fn send(self) {
self
.sender
.send(EventResult {
code: self.code,
args: self.args,
})
.ok();
}
}

View File

@ -1,9 +0,0 @@
#[allow(clippy::module_inception)]
mod event;
mod event_result;
mod responder;
pub use event::{Event, EventBuilder};
pub use event_result::{EventResult, EventResultType};
pub use self::responder::IResponder;

View File

@ -1,21 +0,0 @@
use std::sync::Weak;
use crate::event::Event;
pub trait IResponder<T>
where
T: Sync + Send,
{
fn post_event(&self, event: Event<T>) {
if let Some(next) = self.get_next() {
if let Some(next) = next.upgrade() {
next.post_event(event);
return;
}
}
self.r#final(event);
}
fn get_next(&self) -> Option<Weak<dyn IResponder<T>>>;
fn on_event(&self, event: Event<T>);
fn r#final(&self, _event: Event<T>) {}
}

View File

@ -1,11 +1,6 @@
extern crate core;
pub mod connection;
pub mod encryption;
pub mod event;
pub mod messages;
pub mod models;
pub mod prelude;
pub mod test;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

View File

@ -1,55 +1 @@
use std::{
sync::{Arc, Weak},
time::Duration,
};
use async_trait::async_trait;
use tokio::time::sleep;
/// # IManager
/// This is used with all managers to implement multitasking
///
/// ## Methods
/// - init: gets executed once before a tokio task is created
/// - run: gets called once every tick in a tokio task
/// - start: runs the init function then creates the tokio task for the run function
#[async_trait]
pub trait IManager {
/// This defines some setup before the tokio loop is started
async fn init(self: &Arc<Self>)
where
Self: Send + Sync + 'static,
{
}
/// this is used to get a future that can be awaited
async fn run(self: &Arc<Self>);
/// This is used to start a future through tokio
fn start(self: &Arc<Self>)
where
Self: Send + Sync + 'static,
{
let weak_self: Weak<Self> = Arc::downgrade(self);
// this looks horrid but works
tokio::spawn(async move {
let weak_self = weak_self.clone();
let a = weak_self.upgrade().unwrap();
a.init().await;
drop(a);
loop {
sleep(Duration::new(1, 0)).await;
if let Some(manager) = Weak::upgrade(&weak_self) {
manager.run().await
}
}
});
}
}
trait Visitor<T: IManager> {
fn visit(&self, message: T);
}
use protocol::prelude::*;

View File

@ -1,25 +0,0 @@
use std::{io::Error, net::SocketAddr, sync::Arc};
use tokio::{
join,
net::{TcpListener, TcpStream},
};
use crate::connection::Connection;
pub async fn create_connection_pair(
) -> Result<(Arc<Connection>, (Arc<Connection>, SocketAddr)), Error> {
let listener: TcpListener = TcpListener::bind("localhost:0000").await?;
let port = listener.local_addr()?.port();
let (server_res, client_res) = join!(
async { TcpStream::connect(format!("localhost:{}", port)).await },
async { listener.accept().await }
);
let (client, addr) = client_res?;
let server = Arc::new(Connection::from(server_res?));
let client = Arc::new(Connection::from(client));
Ok((server, (client, addr)))
}

View File

@ -1,3 +0,0 @@
mod connection_pair;
pub use connection_pair::create_connection_pair;