Pulling basic server functionality into development #9

Merged
michael-bailey merged 22 commits from network-client into Development 2021-04-13 17:17:58 +00:00
6 changed files with 78 additions and 61 deletions
Showing only changes of commit 2af0fb8bf3 - Show all commits

View File

@ -9,7 +9,7 @@ pub trait ICooperative {
fn tick(&self);
}
pub trait IPreemtive {
pub trait IPreemptive {
fn run(arc: &Arc<Self>) {}
fn start(arc: &Arc<Self>);
}

View File

@ -1,7 +1,11 @@
use crate::messages::ClientMessage;
use crate::messages::ClientMessage::Disconnect;
use crate::messages::ServerMessage;
use foundation::prelude::IPreemptive;
use std::cmp::Ordering;
use std::io::BufRead;
use std::io::Read;
use std::io::Write;
use std::io::{BufReader, BufWriter};
use std::mem::replace;
use std::net::TcpStream;
@ -12,7 +16,7 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use serde::Serialize;
use uuid::Uuid;
use foundation::messages::client::ClientStreamIn;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::prelude::{ICooperative, IMessagable};
/// # Client
@ -81,15 +85,6 @@ impl Client {
stream_writer: Mutex::new(Some(BufWriter::new(out_stream))),
})
}
// MARK: - removeable
pub fn send(&self, _bytes: Vec<u8>) -> Result<(), &str> {
todo!()
}
pub fn recv(&self) -> Option<Vec<u8>> {
todo!()
}
// Mark: end -
}
impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client {
@ -105,55 +100,75 @@ impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client {
}
// cooperative multitasking implementation
impl ICooperative for Client {
fn tick(&self) {
println!("[client]: Tick!");
{
// aquire locks (so value isn't dropped)
let mut reader_lock = self.stream_reader.lock().unwrap();
let mut writer_lock = self.stream_writer.lock().unwrap();
impl IPreemptive for Client {
fn run(arc: &Arc<Self>) {
let arc1 = arc.clone();
let arc2 = arc.clone();
// aquiring mutable buffers
let reader = reader_lock.as_mut().unwrap();
let _writer = writer_lock.as_mut().unwrap();
// create buffer
// read thread
std::thread::spawn(move || {
let arc = arc1.clone();
let mut buffer = String::new();
let mut reader_lock = arc.stream_reader.lock().unwrap();
let reader = reader_lock.as_mut().unwrap();
while let Ok(size) = reader.read_line(&mut buffer) {
if size == 0 {
arc.send_message(Disconnect);
break;
}
// loop over all lines that have been sent.
while let Ok(_size) = reader.read_line(&mut buffer) {
let command =
serde_json::from_str::<ClientStreamIn>(buffer.as_str())
.unwrap();
match command {
ClientStreamIn::Disconnect => println!("got Disconnect"),
_ => println!("New command found"),
ClientStreamIn::Disconnect => arc.send_message(Disconnect),
_ => println!("[client]: command not found"),
}
}
}
});
{
for message in self.output.iter() {
use ClientMessage::Disconnect;
match message {
Disconnect => {
let lock = self.server_channel.lock().unwrap();
// write thread
std::thread::spawn(move || {
let arc = arc2.clone();
let mut writer_lock = arc.stream_writer.lock().unwrap();
let writer = writer_lock.as_mut().unwrap();
if let Some(sender) = lock.as_ref() {
sender
let mut buffer: Vec<u8> = Vec::new();
writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
writer.write_all(&buffer).unwrap();
writer.flush().unwrap();
loop {
for message in arc.output.iter() {
match message {
Disconnect => {
arc.server_channel
.lock()
.unwrap()
.as_mut()
.unwrap()
.send(ServerMessage::ClientDisconnected(
self.uuid,
arc.uuid,
))
.unwrap();
break;
}
_ => println!("[client]: message not implemented"),
}
_ => println!("command not implemneted yet"),
}
}
}
});
}
// handle incomming messages
fn start(arc: &Arc<Self>) {
Client::run(arc)
}
}
@ -199,3 +214,9 @@ impl Ord for Client {
self.uuid.cmp(&other.uuid)
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] dropped!");
}
}

View File

@ -1,5 +1,5 @@
// use crate::lib::server::ServerMessages;
use foundation::prelude::IPreemtive;
use foundation::prelude::IPreemptive;
use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
@ -51,7 +51,7 @@ impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
}
}
impl IPreemtive for ClientManager {
impl IPreemptive for ClientManager {
fn run(arc: &Arc<Self>) {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
@ -64,6 +64,7 @@ impl IPreemtive for ClientManager {
match message {
Add(client) => {
Client::start(&client);
arc.clients
.lock()
.unwrap()

View File

@ -6,7 +6,7 @@ pub mod server;
use clap::{App, Arg};
use foundation::prelude::IPreemtive;
use foundation::prelude::IPreemptive;
use server::Server;
fn main() {

View File

@ -1,4 +1,4 @@
use foundation::prelude::IPreemtive;
use foundation::prelude::IPreemptive;
use std::io::BufRead;
use std::io::BufReader;
use std::io::BufWriter;
@ -36,7 +36,7 @@ impl NetworkManager {
}
}
impl IPreemtive for NetworkManager {
impl IPreemptive for NetworkManager {
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || {
@ -86,13 +86,13 @@ impl IPreemtive for NetworkManager {
NetworkSockIn::Info => {
// send back server info to the connection
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
@ -110,8 +110,8 @@ impl IPreemtive for NetworkManager {
server_channel.clone(),
);
server_channel.send(
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
ServerMessage::ClientConnected(new_client)
).unwrap_or_default();
}
}
}

View File

@ -10,7 +10,7 @@ use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
use foundation::prelude::ICooperative;
use foundation::prelude::IMessagable;
use foundation::prelude::IPreemtive;
use foundation::prelude::IPreemptive;
/// # ServerMessages
/// This is used internally
@ -48,7 +48,6 @@ impl ICooperative for Server {
use ClientMgrMessage::{Add, Remove};
// handle new messages loop
if !self.receiver.is_empty() {
println!("[server]: entering loop!");
for message in self.receiver.try_iter() {
@ -64,14 +63,10 @@ impl ICooperative for Server {
}
}
}
// alocate time for other components
println!("[server]: allocating time for others");
//
}
}
impl IPreemtive for Server {
impl IPreemptive for Server {
fn run(arc: &std::sync::Arc<Self>) {
// start services
NetworkManager::start(&arc.network_manager);