Pulling basic server functionality into development (#9)

* removed redundant files

* moved files to new foundation library

* added new foundation crate

* added new client program crate

* added new server program crate

* added new serverctl program crate

* change toml to be a workspace instead of a project

* implementing more connection to network functionality

* implementing more connection to network functionality

* Implemented IMessageable for client, client manager as well as basic commands in netmgr

* fixing blocking issues with componenets

* adding network stream queuefor handling connections in a non blocking way

* ffixing blocking calls in network manager

* adding threading support to prevent blocking calls

* running rust formatter

* Created Client threads and implemented connect command

* fixed client not disconnecting issue

* adding messaging support between clients

* Implemented client messaging through the server

* removing unnecessary prints and thread delays

* adding support for updating clients
This commit is contained in:
michael bailey 2021-04-13 18:17:58 +01:00 committed by GitHub
parent 0572d0d0e9
commit 5aa4f8caf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 833 additions and 2599 deletions

View File

@ -1,31 +1,7 @@
[package]
name = "rust-chat-server"
version = "0.1.5"
authors = ["Mitchell <mitchellhardie1@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
regex = "1"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
crossbeam-queue = "0.3.1"
parking_lot = "0.11.1"
dashmap = "4.0.2"
rayon = "1.3.1"
zeroize = "1.1.0"
crossterm = "0.19.0"
clap = "2.33.3"
log = "0.4"
url = "2.2.0"
uuid = {version = "0.8", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
[profile.dev]
opt-level = 0
[profile.release]
opt-level = 3
[workspace]
members = [
'foundation',
'server',
'client',
'serverctl'
]

9
client/Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "client"
version = "0.1.0"
authors = ["michael-bailey <mickyb18a@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

3
client/src/main.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

25
foundation/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "foundation"
version = "0.1.0"
authors = ["Mitchell <mitchellhardie1@gmail.com>","michael-bailey <mickyb18a@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
[dependencies]
regex = "1"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
crossbeam-queue = "0.3.1"
parking_lot = "0.11.1"
dashmap = "4.0.2"
rayon = "1.3.1"
zeroize = "1.1.0"
crossterm = "0.19.0"
log = "0.4"
url = "2.2.0"
uuid = {version = "0.8", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

12
foundation/src/lib.rs Normal file
View File

@ -0,0 +1,12 @@
pub mod messages;
pub mod prelude;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ClientDetails {
pub uuid: Uuid,
pub username: String,
pub address: String,
}

View File

@ -0,0 +1,31 @@
use crate::ClientDetails;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// # ClientMessage
/// This enum defined the message that a client can receive from the server
/// This uses the serde library to transform to and from json.
///
#[derive(Serialize, Deserialize)]
pub enum ClientStreamIn {
Connected,
Update,
SendMessage { to: Uuid, content: String },
SendGlobalMessage { content: String },
Disconnect,
}
#[derive(Serialize, Deserialize)]
pub enum ClientStreamOut {
Connected,
UserMessage { from: Uuid, content: String },
GlobalMessage { content: String },
ConnectedClients {clients: Vec<ClientDetails>},
Disconnected,
}

View File

@ -0,0 +1,2 @@
pub mod client;
pub mod network;

View File

@ -0,0 +1,22 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub enum NetworkSockIn {
Info,
Connect {
uuid: String,
username: String,
address: String,
},
}
#[derive(Serialize, Deserialize)]
pub enum NetworkSockOut<'a> {
Request,
GotInfo {
server_name: &'a str,
server_owner: &'a str,
},
Connecting,
}

15
foundation/src/prelude.rs Normal file
View File

@ -0,0 +1,15 @@
use std::sync::Arc;
pub trait IMessagable<TMessage, TSender> {
fn send_message(&self, msg: TMessage);
fn set_sender(&self, sender: TSender);
}
pub trait ICooperative {
fn tick(&self);
}
pub trait IPreemptive {
fn run(arc: &Arc<Self>);
fn start(arc: &Arc<Self>);
}

2
rustfmt.toml Normal file
View File

@ -0,0 +1,2 @@
hard_tabs = true
max_width = 90

18
server/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "server"
version = "0.1.0"
authors = ["michael-bailey <mickyb18a@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.3"
uuid = {version = "0.8", features = ["serde", "v4"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.0"
[dependencies.foundation]
path = '../foundation'

280
server/src/client.rs Normal file
View File

@ -0,0 +1,280 @@
use crate::messages::ClientMessage;
use crate::messages::ServerMessage;
use foundation::prelude::IPreemptive;
use std::cmp::Ordering;
use std::io::BufRead;
use std::io::Write;
use std::io::{BufReader, BufWriter};
use std::mem::replace;
use std::net::TcpStream;
use std::sync::Arc;
use std::sync::Mutex;
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde::Serialize;
use uuid::Uuid;
use foundation::ClientDetails;
use foundation::messages::client::{ClientStreamIn, ClientStreamOut};
use foundation::prelude::IMessagable;
/// # Client
/// This struct represents a connected user.
///
/// ## Attrubutes
/// - uuid: The id of the connected user.
/// - username: The username of the connected user.
/// - address: The the address of the connected client.
///
/// - stream: The socket for the connected client.
/// - stream_reader: the buffered reader used to receive messages
/// - stream_writer: the buffered writer used to send messages
/// - owner: An optional reference to the owning object.
#[derive(Debug, Serialize)]
pub struct Client {
pub uuid: Uuid,
username: String,
address: String,
pub details: ClientDetails,
// non serializable
#[serde(skip)]
server_channel: Mutex<Option<Sender<ServerMessage>>>,
#[serde(skip)]
input: Sender<ClientMessage>,
#[serde(skip)]
output: Receiver<ClientMessage>,
#[serde(skip)]
stream: Mutex<Option<TcpStream>>,
#[serde(skip)]
stream_reader: Mutex<Option<BufReader<TcpStream>>>,
#[serde(skip)]
stream_writer: Mutex<Option<BufWriter<TcpStream>>>,
}
// client funciton implmentations
impl Client {
pub fn new(
uuid: String,
username: String,
address: String,
stream: TcpStream,
server_channel: Sender<ServerMessage>,
) -> Arc<Client> {
let (sender, receiver) = unbounded();
let out_stream = stream.try_clone().unwrap();
let in_stream = stream.try_clone().unwrap();
Arc::new(Client {
username: username.clone(),
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
address: address.clone(),
details: ClientDetails {
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
username,
address,
},
server_channel: Mutex::new(Some(server_channel)),
input: sender,
output: receiver,
stream: Mutex::new(Some(stream)),
stream_reader: Mutex::new(Some(BufReader::new(in_stream))),
stream_writer: Mutex::new(Some(BufWriter::new(out_stream))),
})
}
}
impl IMessagable<ClientMessage, Sender<ServerMessage>> for Client {
fn send_message(&self, msg: ClientMessage) {
self.input
.send(msg)
.expect("failed to send message to client.");
}
fn set_sender(&self, sender: Sender<ServerMessage>) {
let mut server_lock = self.server_channel.lock().unwrap();
let _ = replace(&mut *server_lock, Some(sender));
}
}
// cooperative multitasking implementation
impl IPreemptive for Client {
fn run(arc: &Arc<Self>) {
let arc1 = arc.clone();
let arc2 = arc.clone();
// read thread
let _ = std::thread::Builder::new()
.name(format!("client thread recv [{:?}]", &arc.uuid))
.spawn(move || {
use ClientMessage::{Disconnect};
let arc = arc1;
let mut buffer = String::new();
let mut reader_lock = arc.stream_reader.lock().unwrap();
let reader = reader_lock.as_mut().unwrap();
'main: while let Ok(size) = reader.read_line(&mut buffer) {
if size == 0 {
arc.send_message(Disconnect);
break 'main;
}
let command = serde_json::from_str::<ClientStreamIn>(buffer.as_str());
match command {
Ok(ClientStreamIn::Disconnect) => {
println!("[Client {:?}]: Disconnect recieved", &arc.uuid);
arc.send_message(Disconnect);
break 'main;
}
Ok(ClientStreamIn::SendMessage { to, content }) => {
println!(
"[Client {:?}]: send message to: {:?}",
&arc.uuid, &to
);
let lock = arc.server_channel.lock().unwrap();
let sender = lock.as_ref().unwrap();
let _ = sender.send(ServerMessage::ClientSendMessage {
from: arc.uuid,
to,
content,
});
}
_ => println!("[Client {:?}]: command not found", &arc.uuid),
}
}
println!("[Client {:?}] exited thread 1", &arc.uuid);
});
// write thread
let _ = std::thread::Builder::new()
.name(format!("client thread msg [{:?}]", &arc.uuid))
.spawn(move || {
let arc = arc2;
let mut writer_lock = arc.stream_writer.lock().unwrap();
let writer = writer_lock.as_mut().unwrap();
let mut buffer: Vec<u8> = Vec::new();
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(&ClientStreamOut::Connected).unwrap()
);
let _ = writer.write_all(&buffer);
let _ = writer.flush();
'main: loop {
for message in arc.output.iter() {
use ClientMessage::{Disconnect,Message, Update};
println!("[Client {:?}]: {:?}", &arc.uuid, message);
match message {
Disconnect => {
arc.server_channel
.lock()
.unwrap()
.as_mut()
.unwrap()
.send(ServerMessage::ClientDisconnected(arc.uuid))
.unwrap();
break 'main;
}
Message { from, content } => {
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(
&ClientStreamOut::UserMessage { from, content }
)
.unwrap()
);
let _ = writer.write_all(&buffer);
let _ = writer.flush();
}
Update {clients} => {
let client_details_vec: Vec<ClientDetails> = clients.iter().map(|client| &client.details).cloned().collect();
let _ = writeln!(
buffer,
"{}",
serde_json::to_string(
&ClientStreamOut::ConnectedClients {clients: client_details_vec}
).unwrap()
);
let _ = writer.write_all(&buffer);
let _ = writer.flush();
}
}
}
}
println!("[Client {:?}]: exited thread 2", &arc.uuid);
});
}
fn start(arc: &Arc<Self>) {
Client::run(arc)
}
}
// default value implementation
impl Default for Client {
fn default() -> Self {
let (sender, reciever) = unbounded();
Client {
username: "generic_client".to_string(),
uuid: Uuid::new_v4(),
address: "127.0.0.1".to_string(),
details: ClientDetails {
uuid: Uuid::new_v4(),
username: "generic_client".to_string(),
address: "127.0.0.1".to_string(),
},
output: reciever,
input: sender,
server_channel: Mutex::new(None),
stream: Mutex::new(None),
stream_reader: Mutex::new(None),
stream_writer: Mutex::new(None),
}
}
}
// MARK: - used for sorting.
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.uuid == other.uuid
}
}
impl Eq for Client {}
impl PartialOrd for Client {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Client {
fn cmp(&self, other: &Self) -> Ordering {
self.uuid.cmp(&other.uuid)
}
}
impl Drop for Client {
fn drop(&mut self) {
println!("[Client] dropped!");
}
}

View File

@ -0,0 +1,114 @@
// use crate::lib::server::ServerMessages;
use foundation::prelude::IPreemptive;
use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
use std::sync::Mutex;
use crossbeam_channel::{unbounded, Receiver, Sender};
use uuid::Uuid;
use crate::client::Client;
use crate::messages::ClientMessage;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use foundation::prelude::IMessagable;
/// # ClientManager
/// This struct manages all connected users
#[derive(Debug)]
pub struct ClientManager {
clients: Mutex<HashMap<Uuid, Arc<Client>>>,
server_channel: Mutex<Sender<ServerMessage>>,
sender: Sender<ClientMgrMessage>,
receiver: Receiver<ClientMgrMessage>,
}
impl ClientManager {
pub fn new(server_channel: Sender<ServerMessage>) -> Arc<Self> {
let (sender, receiver) = unbounded();
Arc::new(ClientManager {
clients: Mutex::default(),
server_channel: Mutex::new(server_channel),
sender,
receiver,
})
}
}
impl IMessagable<ClientMgrMessage, Sender<ServerMessage>> for ClientManager {
fn send_message(&self, msg: ClientMgrMessage) {
self.sender.send(msg).unwrap();
}
fn set_sender(&self, sender: Sender<ServerMessage>) {
let mut server_lock = self.server_channel.lock().unwrap();
let _ = replace(&mut *server_lock, sender);
}
}
impl IPreemptive for ClientManager {
fn run(arc: &Arc<Self>) {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
if !arc.receiver.is_empty() {
for message in arc.receiver.try_iter() {
println!("[Client manager]: recieved message: {:?}", message);
use ClientMgrMessage::{Add, Remove, SendMessage, SendClients};
match message {
Add(client) => {
println!("[Client Manager]: adding new client");
Client::start(&client);
let mut lock = arc.clients.lock().unwrap();
if lock.insert(client.uuid, client).is_none() {
println!("value is new");
}
},
Remove(uuid) => {
println!("[Client Manager]: removing client: {:?}", &uuid);
if let Some(client) =
arc.clients.lock().unwrap().remove(&uuid)
{
client.send_message(ClientMessage::Disconnect);
}
},
SendMessage { to, from, content } => {
let lock = arc.clients.lock().unwrap();
if let Some(client) = lock.get(&to) {
client.send_message(ClientMessage::Message {
from,
content,
})
}
},
SendClients {to} => {
let lock = arc.clients.lock().unwrap();
if let Some(client) = lock.get(&to) {
let clients_vec: Vec<Arc<Client>> = lock.values().cloned().collect();
client.send_message(ClientMessage::Update {
clients: clients_vec,
})
}
},
#[allow(unreachable_patterns)]
_ => println!("[Client manager]: not implemented"),
}
}
}
}
}
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || ClientManager::run(&arc));
}
}

29
server/src/main.rs Normal file
View File

@ -0,0 +1,29 @@
pub mod client;
pub mod client_manager;
pub mod messages;
pub mod network_manager;
pub mod server;
use clap::{App, Arg};
use foundation::prelude::IPreemptive;
use server::Server;
fn main() {
let _args = App::new("--rust chat server--")
.version("0.1.5")
.author("Mitchel Hardie <mitch161>, Michael Bailey <michael-bailey>")
.about("this is a chat server developed in rust, depending on the version one of two implementations will be used")
.arg(
Arg::with_name("config")
.short("p")
.long("port")
.value_name("PORT")
.help("sets the port the server runs on.")
.takes_value(true))
.get_matches();
let server = Server::new();
Server::run(&server);
}

37
server/src/messages.rs Normal file
View File

@ -0,0 +1,37 @@
use std::sync::Arc;
use uuid::Uuid;
use crate::client::Client;
#[derive(Debug)]
pub enum ClientMessage {
Message { from: Uuid, content: String },
Update {clients: Vec<Arc<Client>>},
Disconnect,
}
#[derive(Debug)]
pub enum ClientMgrMessage {
Remove(Uuid),
Add(Arc<Client>),
SendClients {to: Uuid},
SendMessage {
from: Uuid,
to: Uuid,
content: String,
},
}
#[derive(Debug)]
pub enum ServerMessage {
ClientConnected(Arc<Client>),
ClientSendMessage {
from: Uuid,
to: Uuid,
content: String,
},
ClientDisconnected(Uuid),
ClientUpdate(Uuid),
}

View File

@ -0,0 +1,132 @@
use foundation::prelude::IPreemptive;
use std::io::BufRead;
use std::io::BufReader;
use std::io::BufWriter;
use std::io::Write;
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
use crossbeam_channel::Sender;
use crate::client::Client;
use crate::messages::ServerMessage;
use foundation::messages::network::{NetworkSockIn, NetworkSockOut};
pub struct NetworkManager {
listener: TcpListener,
server_channel: Sender<ServerMessage>,
}
impl NetworkManager {
pub fn new(
port: String,
server_channel: Sender<ServerMessage>,
) -> Arc<NetworkManager> {
let mut address = "0.0.0.0:".to_string();
address.push_str(&port);
let listener = TcpListener::bind(address).expect("Could not bind to address");
Arc::new(NetworkManager {
listener,
server_channel,
})
}
}
impl IPreemptive for NetworkManager {
fn run(_: &Arc<Self>) {}
fn start(arc: &Arc<Self>) {
let arc = arc.clone();
std::thread::spawn(move || {
// fetch new connections and add them to the client queue
for connection in arc.listener.incoming() {
println!("[NetworkManager]: New Connection!");
match connection {
Ok(stream) => {
let server_channel = arc.server_channel.clone();
// create readers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let _handle = thread::Builder::new()
.name("NetworkJoinThread".to_string())
.spawn(move || {
let mut out_buffer: Vec<u8> = Vec::new();
let mut in_buffer: String = String::new();
// send request message to connection
let _ = writeln!(
out_buffer,
"{}",
serde_json::to_string(&NetworkSockOut::Request)
.unwrap()
);
let _ = writer.write_all(&out_buffer);
let _ = writer.flush();
// try get response
let res = reader.read_line(&mut in_buffer);
if res.is_err() {
return;
}
//match the response
if let Ok(request) =
serde_json::from_str::<NetworkSockIn>(&in_buffer)
{
match request {
NetworkSockIn::Info => {
// send back server info to the connection
writer
.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael",
},
)
.unwrap()
.as_bytes(),
)
.unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect {
uuid,
username,
address,
} => {
// create client and send to server
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
server_channel.clone(),
);
server_channel
.send(ServerMessage::ClientConnected(
new_client,
))
.unwrap_or_default();
}
}
}
});
}
Err(e) => {
println!("[Network manager]: error getting stream: {:?}", e);
continue;
}
}
}
});
}
}

83
server/src/server.rs Normal file
View File

@ -0,0 +1,83 @@
use std::sync::Arc;
use crossbeam_channel::{unbounded, Receiver};
use uuid::Uuid;
use crate::client_manager::ClientManager;
use crate::messages::ClientMgrMessage;
use crate::messages::ServerMessage;
use crate::network_manager::NetworkManager;
use foundation::prelude::ICooperative;
use foundation::prelude::IMessagable;
use foundation::prelude::IPreemptive;
/// # ServerMessages
/// This is used internally to send messages to the server to be dispatched
#[derive(Debug)]
pub enum ServerMessages<TClient> {
ClientConnected(Arc<TClient>),
ClientDisconnected(Uuid),
}
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
receiver: Receiver<ServerMessage>,
}
impl Server {
pub fn new() -> Arc<Server> {
let (sender, receiver) = unbounded();
Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender),
receiver,
})
}
}
impl ICooperative for Server {
fn tick(&self) {
use ClientMgrMessage::{Add, Remove, SendMessage};
// handle new messages loop
if !self.receiver.is_empty() {
for message in self.receiver.try_iter() {
println!("[server]: received message {:?}", &message);
match message {
ServerMessage::ClientConnected(client) => {
self.client_manager.send_message(Add(client))
}
ServerMessage::ClientDisconnected(uuid) => {
println!("disconnecting client {:?}", uuid);
self.client_manager.send_message(Remove(uuid));
}
ServerMessage::ClientSendMessage { from, to, content } => self
.client_manager
.send_message(SendMessage { from, to, content }),
ServerMessage::ClientUpdate (_uuid) => println!("not implemented"),
}
}
}
}
}
impl IPreemptive for Server {
fn run(arc: &std::sync::Arc<Self>) {
// start services
NetworkManager::start(&arc.network_manager);
ClientManager::start(&arc.client_manager);
loop {
arc.tick();
}
}
fn start(arc: &std::sync::Arc<Self>) {
let arc = arc.clone();
// start thread
std::thread::spawn(move || Server::run(&arc));
}
}

9
serverctl/Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "serverctl"
version = "0.1.0"
authors = ["michael-bailey <mickyb18a@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

3
serverctl/src/main.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

View File

@ -1,12 +0,0 @@
use url::Url
pub trait TBundle {
fn main() -> Result<Self>;
fn initWithURL(url: Url) -> Result<Self>;
fn initWithPath(path: String) -> Result<Self>;
fn urlForResource(name: String, extention: String, subDirectory: Option<Strign>) -> Result<[u8]>;
}

View File

@ -1,11 +0,0 @@
/**
* Bundle: inspired from NSBundle on macOS
*/
struct Bundle {
location:
}
impl Bundle {
}

View File

@ -1,2 +0,0 @@
pub mod bundle;
pub mod Traits

View File

@ -1,85 +0,0 @@
struct Request {}
struct Info {}
struct Connect {}
struct Disconnect {}
struct ClientUpdate {}
struct ClientInfo {}
struct ClientRemove {}
struct Client {}
struct Success {}
struct Error {}
trait ClientRunnables {
fn client_execution(client: &Client);
}
impl Runnables for Request {
fn run() {
}
}
impl ClientRunnables for Info {
fn client_execution(client: &Client) {
let params = client.get_server_info();
let command = Commands::Success(Some(params));
client.transmit_data(command.to_string().as_str());
}
}
impl Runnables for Connect {
fn run() {
}
}
impl Runnables for Disconnect {
fn run() {
}
}
impl ClientRunnables for ClientUpdate {
fn client_execution(client: &Client) {
let mut command = Commands::Success(None);
client.transmit_data(command.to_string().as_str());
let data: HashMap<String, String> = [(String::from("uuid"), client.get_uuid())].iter().cloned().collect();
let command = Commands::ClientUpdate(Some(data));
self.server.update_all_clients(self.uuid.as_str(), command);
}
}
impl Runnables for ClientInfo {
fn run() {
}
}
impl Runnables for ClientRemove {
fn run() {
}
}
impl Runnables for Client {
fn run() {
}
}
impl Runnables for Success {
fn run() {
}
}
impl Runnables for Error {
fn run() {
}
}

View File

@ -1,267 +0,0 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::ops::Index;
use std::str::FromStr;
use std::string::ToString;
use log::info;
use regex::Regex;
use zeroize::Zeroize;
#[derive(Clone, Debug)]
pub enum Commands {
/* TODO: this is the new commands system but still needs work.
* Will be fixed soon, but continue with old version at the
* moment.
*
// Common fields:
executable: T,
params: Option<HashMap<String, String>>,
// Variants:
Request {},
Info {},
Connect {},
Disconnect {},
ClientUpdate {},
ClientInfo {},
ClientRemove {},
Client {},
Success {},
Error {},
*/
Request(Option<HashMap<String, String>>),
Info(Option<HashMap<String, String>>),
HeartBeat(Option<HashMap<String, String>>),
Connect(Option<HashMap<String, String>>),
Disconnect(Option<HashMap<String, String>>),
ClientUpdate(Option<HashMap<String, String>>),
ClientInfo(Option<HashMap<String, String>>),
ClientRemove(Option<HashMap<String, String>>),
Client(Option<HashMap<String, String>>),
Success(Option<HashMap<String, String>>),
Error(Option<HashMap<String, String>>),
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum CommandParseError {
UnknownCommand,
NoString,
}
/*trait Operations {
fn execute(&self);
}*/
impl Commands {
/*fn get_executable(&self) -> &T {
self.executable
}
fn get_params(&self) -> &Option<HashMap<String,String>> {
self.params
}*/
fn compare_params(&self, params: &Option<HashMap<String, String>>, other_params: &Option<HashMap<String, String>>) -> bool {
match (params, other_params) {
(None, Some(_other_params)) => false,
(Some(_params), None) => false,
(None, None) => true,
(Some(params), Some(other_params)) => {
let mut result = false;
if params.len() == other_params.len() {
for (key, value) in params.iter() {
if let Some(other_value) = other_params.get(key) {
if value != other_value {
result = false;
break;
} else {
result = true;
}
}
}
}
result
},
}
}
}
/*impl<T> Operations for Commands<T> {
fn execute(&self) {
self.executable.run();
}
}*/
impl PartialEq for Commands {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Commands::Request(params), Commands::Request(other_params)) => self.compare_params(&params, &other_params),
(Commands::Info(params), Commands::Info(other_params)) => self.compare_params(&params, &other_params),
(Commands::Connect(params), Commands::Connect(other_params)) => self.compare_params(&params, &other_params),
(Commands::Disconnect(params), Commands::Disconnect(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientUpdate(params), Commands::ClientUpdate(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientInfo(params), Commands::ClientInfo(other_params)) => self.compare_params(&params, &other_params),
(Commands::ClientRemove(params), Commands::ClientRemove(other_params)) => self.compare_params(&params, &other_params),
(Commands::Client(params), Commands::Client(other_params)) => self.compare_params(&params, &other_params),
(Commands::Success(params), Commands::Success(other_params)) => self.compare_params(&params, &other_params),
(Commands::Error(params), Commands::Error(other_params)) => self.compare_params(&params, &other_params),
_ => false,
}
}
}
impl ToString for Commands {
fn to_string(&self) -> std::string::String {
let mut out_string = String::new();
let (command, parameters) = match self {
Commands::Request(arguments) => { ("!request:", arguments) },
Commands::Info(arguments) => { ("!info:", arguments) },
Commands::HeartBeat(arguments) => {("!heartbeat:", arguments)},
Commands::Connect(arguments) => { ("!connect:", arguments) },
Commands::Disconnect(arguments) => { ("!disconnect:", arguments) },
Commands::ClientUpdate(arguments) => { ("!clientUpdate:", arguments) },
Commands::ClientInfo(arguments) => { ("!clientInfo:", arguments) },
Commands::ClientRemove(arguments) => { ("!clientRemove", arguments) }
Commands::Client(arguments) => { ("!client:", arguments) },
Commands::Success(arguments) => { ("!success:", arguments) },
Commands::Error(arguments) => { ("!error:", arguments) },
};
out_string.push_str(command);
if parameters.is_some() {
let hash_map = parameters.borrow().as_ref().unwrap();
for (k, v) in hash_map.iter() {
out_string.push_str(" ");
out_string.push_str(k.as_str());
out_string.push_str(":");
if v.contains(":") {
out_string.push_str(format!("\"{}\"",v.as_str()).as_str())
} else {
out_string.push_str(v.as_str());
}
}
}
out_string
}
}
impl FromStr for Commands {
type Err = CommandParseError;
fn from_str(data: &str) -> std::result::Result<Self, Self::Err> {
let regex = Regex::new(r###"(\?|!)([a-zA-z0-9]*):|([a-zA-z]*):([a-zA-Z0-9@\-\+\[\]{}_=/.]+|("(.*?)")+)"###).unwrap();
let mut iter = regex.find_iter(data);
let command_opt = iter.next();
if command_opt.is_none() {
return Err(CommandParseError::NoString);
}
let command = command_opt.unwrap().as_str();
println!("command parsed to: {:?}", command);
let mut map: HashMap<String, String> = HashMap::new();
for i in iter {
let parameter = i.as_str().to_string();
let parts:Vec<&str> = parameter.split(":").collect();
map.insert(parts.index(0).to_string(), parts.index(1).to_string());
}
let params = if map.capacity() > 0 {Some(map)} else { None };
Ok(match command {
"!request:" => Commands::Request(params),
"!info:" => Commands::Info(params),
"!heartbeat:" => Commands::HeartBeat(params),
"!connect:" => Commands::Connect(params),
"!disconnect:" => Commands::Disconnect(params),
"!clientUpdate:" => Commands::ClientUpdate(params),
"!clientInfo:" => Commands::ClientInfo(params),
"!client:" => Commands::Client(params),
"!clientRemove:" => Commands::ClientRemove(params),
"!success:" => Commands::Success(params),
"!error:" => Commands::Error(params),
_ => Commands::Error(None),
})
}
}
impl From<String> for Commands {
fn from(data: String) -> Self {
if let Ok(data) = data.as_str().parse() {
data
} else {
info!("Command: failed to parse with");
Commands::Error(None)
}
}
}
impl From<&mut [u8; 1024]> for Commands {
fn from(data: &mut [u8; 1024]) -> Self {
let incoming_message = String::from(String::from_utf8_lossy(data));
data.zeroize();
Commands::from(incoming_message)
}
}
impl From<&mut Vec<u8>> for Commands {
fn from(data: &mut Vec<u8>) -> Self {
let incoming_message = String::from(String::from_utf8_lossy(data));
data.zeroize();
Commands::from(incoming_message)
}
}
// TODO: check if unit tests still work
/*#[cfg(test)]
mod test_commands_v2 {
#![feature(test)]
use super::Commands;
use std::collections::HashMap;
use std::str::FromStr;
use super::CommandParseError;
#[test]
fn test_creation_from_string() {
let command_result = Commands::from_str("!connect: name:bop host:127.0.0.1 uuid:123456-1234-1234-123456");
}
#[test]
fn test_to_string() {
let mut a: HashMap<String, String> = HashMap::new();
a.insert("name".to_string(), "michael".to_string());
a.insert("host".to_string(), "127.0.0.1".to_string());
a.insert("uuid".to_string(), "123456-1234-1234-123456".to_string());
let command = Commands::Connect(Some(a));
println!("{:?}", command.to_string())
}
}*/

View File

@ -1,7 +0,0 @@
pub trait IMessagable<M> {
fn send_message(&self, msg: M);
}
pub trait ICooperative {
fn tick(&self);
}

View File

@ -1,110 +0,0 @@
// pub mod commands;
pub mod prelude;
pub mod server;
pub mod foundation;
pub mod commands;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use crossbeam_channel::{unbounded, Receiver, Sender};
enum Message {
NewJob(Job),
Terminate,
}
#[derive(Debug)]
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
#[allow(dead_code)]
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = unbounded();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
// create some threads and store them in the vector
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
#[derive(Debug)]
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

View File

View File

@ -1,218 +0,0 @@
extern crate regex;
use std::{
io,
io::Error,
io::prelude::*,
net::{Shutdown, TcpStream},
sync::Arc,
sync::Mutex,
time::{Duration, Instant},
};
use crossbeam_channel::{
Receiver,
Sender,
TryRecvError,
unbounded
};
use log::info;
use crate::{
commands::Commands,
server::{
//server_profile::Server,
server_profile::ServerMessages,
}
};
//use parking_lot::FairMutex;
//use dashmap::DashMap;
#[derive(Debug)]
pub struct Client {
uuid: String,
username: String,
address: String,
last_heartbeat: Arc<Mutex<Instant>>,
stream_arc: Arc<Mutex<TcpStream>>,
pub sender: Sender<Commands>,
receiver: Receiver<Commands>,
server_sender: Sender<ServerMessages>,
}
impl Client {
pub fn new(stream: TcpStream, server_sender: Sender<ServerMessages>, uuid: &str, username: &str, address: &str) -> Self {
let (sender, receiver): (Sender<Commands>, Receiver<Commands>) = unbounded();
stream.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
Client {
stream_arc: Arc::new(Mutex::new(stream)),
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
sender,
receiver,
server_sender,
last_heartbeat: Arc::new(Mutex::new(Instant::now())),
}
}
#[allow(dead_code)]
pub fn get_sender(&self) -> &Sender<Commands> {
&self.sender
}
#[allow(dead_code)]
pub fn get_uuid(&self) -> String {
self.uuid.clone()
}
#[allow(dead_code)]
pub fn get_username(&self) -> String {
self.username.clone()
}
#[allow(dead_code)]
pub fn get_address(&self) -> String {
self.address.clone()
}
// TODO: - add heartbeat timer.
pub fn handle_connection(&mut self) {
let mut buffer = [0; 1024];
// TODO: - Check heartbeat
{
info!("heartbeat")
}
info!("{}: handling connection", self.uuid);
match self.read_data(&mut buffer) {
Ok(command) => {
// match incomming commands
println!("command");
match command {
Commands::Disconnect(None) => {
self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())).expect("sending message to server failed");
self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
},
Commands::HeartBeat(None) => {
*self.last_heartbeat.lock().unwrap() = Instant::now();
self.transmit_data(Commands::Success(None).to_string().as_str());
},
Commands::ClientUpdate(None) => {
self.transmit_data(Commands::Success(None).to_string().as_str());
let _ = self.server_sender.send(ServerMessages::RequestUpdate(self.stream_arc.clone()));
},
Commands::ClientInfo(Some(params)) => {
let uuid = params.get("uuid").unwrap();
let _ = self.server_sender.send(ServerMessages::RequestInfo(uuid.clone(), self.stream_arc.clone()));
},
// TODO: may or may not be needed?
Commands::Error(None) => {
},
_ => {
self.transmit_data(Commands::Error(None).to_string().as_str());
},
}
},
Err(_) => {
// No data was read
},
}
println!("buffer");
// test to see if there is anything for the client to receive from its channel
match self.receiver.try_recv() {
/*command is on the channel*/
Ok(Commands::ClientRemove(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop1: loop {
if retry < 1 {
self.transmit_data(Commands::Error(None).to_string().as_str());
break 'retry_loop1
} else {
self.transmit_data(Commands::ClientRemove(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop1;
} else {
retry -= 1;
}
}
}
},
Ok(Commands::Client(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop2: loop {
if retry < 1 {
self.transmit_data(Commands::Error(None).to_string().as_str());
break 'retry_loop2;
} else {
self.transmit_data(Commands::Client(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop2;
} else {
retry -= 1;
}
}
}
},
/*No data available yet*/
Err(TryRecvError::Empty) => {},
_ => {},
}
println!("---Client Thread Exit---");
}
// move into a drop perhaps
#[allow(dead_code)]
pub fn disconnect(&mut self){
self.stream_arc.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
}
pub fn transmit_data(&self, data: &str) {
println!("Transmitting data: {}", data);
let error_result = self.stream_arc.lock().unwrap().write_all(data.to_string().as_bytes());
if let Some(error) = error_result.err(){
match error.kind() {
// handle disconnections
io::ErrorKind::NotConnected => {
let _ = self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone()));
},
_ => { },
}
}
}
fn read_data(&mut self, buffer: &mut [u8; 1024]) -> Result<Commands, Error> {
let _ = self.stream_arc.lock().unwrap().read(buffer)?;
let command = Commands::from(buffer);
Ok(command)
}
}
impl ToString for Client {
fn to_string(&self) -> std::string::String { todo!() }
}
impl Drop for Client {
fn drop(&mut self) {
let _ = self.stream_arc.lock().unwrap().write_all(Commands::Disconnect(None).to_string().as_bytes());
let _ = self.stream_arc.lock().unwrap().shutdown(Shutdown::Both);
}
}

View File

@ -1,223 +0,0 @@
extern crate regex;
use std::{
io,
io::Error,
io::prelude::*,
net::{Shutdown, TcpStream},
sync::Mutex,
time::{Duration, Instant},
};
use crossbeam_channel::{
Receiver,
Sender,
TryRecvError,
unbounded
};
use log::info;
use crate::{
commands::Commands,
server::server_v3::ServerMessages,
};
#[derive(Debug)]
pub struct Client {
parent: Option<&ClientManager>
uuid: String,
username: String,
address: String,
last_heartbeat: Option<Instant>,
stream: Option<Mutex<TcpStream>>,
sender: Sender<Commands>,
receiver: Receiver<Commands>,
server_sender: Sender<ServerMessages>,
}
/// # client Struct
impl Client {
#[allow(dead_code)]
pub fn new(stream: TcpStream, server_sender: Sender<ServerMessages>, uuid: &str, username: &str, address: &str) -> Self {
let (sender, receiver): (Sender<Commands>, Receiver<Commands>) = unbounded();
stream.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
Client {
stream: Some(Mutex::new(stream)),
uuid: uuid.to_string(),
username: username.to_string(),
address: address.to_string(),
sender,
receiver,
server_sender,
last_heartbeat: Some(Instant::now()),
}
}
#[allow(dead_code)]
pub fn get_sender(&self) -> &Sender<Commands> {
&self.sender
}
#[allow(dead_code)]
pub fn get_uuid(&self) -> String {
self.uuid.clone()
}
#[allow(dead_code)]
pub fn get_username(&self) -> String {
self.username.clone()
}
#[allow(dead_code)]
pub fn get_address(&self) -> String {
self.address.clone()
}
// TODO: - add heartbeat timer.
#[allow(dead_code)]
pub fn handle_connection(&mut self) {
let mut buffer = [0; 1024];
// TODO: - Check heartbeat
{
//info!("heartbeat")
}
info!("{}: handling connection", self.uuid);
match self.read_data(&mut buffer) {
Ok(Commands::Disconnect(None)) => {
self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone())).expect("sending message to server failed");
self.stream.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
},
Ok(Commands::HeartBeat(None)) => {
self.last_heartbeat = Instant::now();
self.send_data(Commands::Success(None).to_string().as_str());
},
Ok(Commands::ClientUpdate(None)) => {
self.send_data(Commands::Success(None).to_string().as_str());
let _ = self.server_sender.send(ServerMessages::RequestUpdate(self.stream.clone()));
},
Ok(Commands::ClientInfo(Some(params))) => {
let uuid = params.get("uuid").unwrap();
let _ = self.server_sender.send(ServerMessages::RequestInfo(uuid.clone(), self.stream.clone()));
},
Ok(Commands::Error(None)) => {
self.send_data(Commands::Error(None).to_string().as_str());
},
_ => {
self.send_data(Commands::Error(None).to_string().as_str());
},
}
println!("buffer");
// test to see if there is anything for the client to receive from its channel
match self.receiver.try_recv() {
/*command is on the channel*/
Ok(Commands::ClientRemove(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop1: loop {
if retry < 1 {
self.send_data(Commands::Error(None).to_string().as_str());
break 'retry_loop1
} else {
self.send_data(Commands::ClientRemove(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop1;
} else {
retry -= 1;
}
}
}
},
Ok(Commands::Client(Some(params))) => {
let mut retry: u8 = 3;
'retry_loop2: loop {
if retry < 1 {
self.send_data(Commands::Error(None).to_string().as_str());
break 'retry_loop2;
} else {
self.send_data(Commands::Client(Some(params.clone())).to_string().as_str());
if self.read_data(&mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
break 'retry_loop2;
} else {
retry -= 1;
}
}
}
},
/*No data available yet*/
Err(TryRecvError::Empty) => {},
_ => {},
}
println!("---Client Thread Exit---");
}
// move into a drop perhaps
#[allow(dead_code)]
pub fn disconnect(&mut self){
self.stream.lock().unwrap().shutdown(Shutdown::Both).expect("shutdown call failed");
}
#[allow(dead_code)]
pub fn send_data(&self, data: &str) {
println!("Transmitting data: {}", data);
let error_result = self.stream.lock().unwrap().write_all(data.to_string().as_bytes());
if let Some(error) = error_result.err(){
match error.kind() {
// handle disconnections
io::ErrorKind::NotConnected => {
let _ = self.server_sender.send(ServerMessages::Disconnect(self.uuid.clone()));
},
_ => { },
}
}
}
#[allow(dead_code)]
fn read_data(&mut self, buffer: &mut [u8; 1024]) -> Result<Commands, Error> {
let _ = self.stream.lock().unwrap().read(buffer)?;
let command = Commands::from(buffer);
Ok(command)
}
}
impl ToString for Client {
fn to_string(&self) -> std::string::String { todo!() }
}
impl Drop for Client {
fn drop(&mut self) {
let _ = self.stream.lock().unwrap().write_all(Commands::Disconnect(None).to_string().as_bytes());
let _ = self.stream.lock().unwrap().shutdown(Shutdown::Both);
}
}
#[cfg(test)]
mod test {
}

View File

@ -1,195 +0,0 @@
// pub mod client_profile;
// pub mod client_v3;
pub mod traits;
use std::cmp::Ordering;
use std::net::TcpStream;
use std::sync::Mutex;
use std::sync::Arc;
use std::io::{BufReader, BufWriter};
use std::io::BufRead;
use uuid::Uuid;
use serde::{Serialize, Deserialize};
use crossbeam_channel::{Sender, Receiver, unbounded};
use traits::IClient;
use crate::lib::foundation::{ICooperative, IMessagable};
use crate::lib::server::ServerMessages;
/// # ClientMessage
/// This enum defined the message that a client can receive from the server
/// This uses the serde library to transform to and from json.
#[derive(Serialize, Deserialize)]
pub enum ClientMessage {
Disconnect {id: String},
Update {id: String},
ServerMessage {id: String, msg: String},
NewMessage {id: String, from_user_id: String, msg: String},
NewgroupMessage {id: String, from_group_id: String, from_user_id: String, msg: String},
}
/// # ClientSocketMessage
/// This enum defines a message that can be sent from a client to the server once connected
/// This uses the serde library to transform to and from json.
#[derive(Serialize, Deserialize)]
pub enum ClientSocketMessage {
Disconnect {id: String},
SendMessage {id: String, to_user_id: String, msg: String}
}
/// # Client
/// This struct represents a connected user.
///
/// ## Attrubutes
/// - uuid: The id of the connected user.
/// - username: The username of the connected user.
/// - address: The the address of the connected client.
///
/// - stream: The socket for the connected client.
/// - owner: An optional reference to the owning object.
#[derive(Debug, Serialize)]
pub struct Client {
pub uuid: Uuid,
username: String,
address: String,
// non serializable
#[serde(skip)]
server_channel: Option<Sender<ServerMessages>>,
#[serde(skip)]
input: Sender<ClientMessage>,
#[serde(skip)]
output: Receiver<ClientMessage>,
#[serde(skip)]
stream: Mutex<Option<TcpStream>>,
#[serde(skip)]
stream_reader: Mutex<Option<BufReader<TcpStream>>>,
#[serde(skip)]
stream_writer: Mutex<Option<BufWriter<TcpStream>>>,
}
// client funciton implmentations
impl IClient<ClientMessage> for Client {
fn new(
uuid: String,
username: String,
address: String,
stream: TcpStream,
server_channel: Sender<ServerMessages>
) -> Arc<Client> {
let (sender, receiver) = unbounded();
let out_stream = stream.try_clone().unwrap();
let in_stream = stream.try_clone().unwrap();
Arc::new(Client {
username,
uuid: Uuid::parse_str(&uuid).expect("invalid id"),
address,
server_channel: Some(server_channel),
input: sender,
output: receiver,
stream: Mutex::new(Some(stream)),
stream_reader: Mutex::new(Some(BufReader::new(in_stream))),
stream_writer: Mutex::new(Some(BufWriter::new(out_stream))),
})
}
// MARK: - removeable
fn send(&self, _bytes: Vec<u8>) -> Result<(), &str> { todo!() }
fn recv(&self) -> Option<Vec<u8>> { todo!() }
// Mark: end -
}
impl IMessagable<ClientMessage> for Client{
fn send_message(&self, msg: ClientMessage) {
self.input.send(msg).expect("failed to send message to client.");
}
}
// cooperative multitasking implementation
impl ICooperative for Client {
fn tick(&self) {
// aquire locks (so value isn't dropped)
let mut reader_lock = self.stream_reader.lock().unwrap();
let mut writer_lock = self.stream_writer.lock().unwrap();
// aquiring mutable buffers
let reader = reader_lock.as_mut().unwrap();
let _writer = writer_lock.as_mut().unwrap();
// create buffer
let mut buffer = String::new();
// loop over all lines that have been sent.
while let Ok(_size) = reader.read_line(&mut buffer) {
let command = serde_json::from_str::<ClientSocketMessage>(buffer.as_str()).unwrap();
match command {
ClientSocketMessage::Disconnect {id} => println!("got Disconnect from id: {:?}", id),
_ => println!("New command found"),
}
}
// handle incomming messages
}
}
// default value implementation
impl Default for Client {
fn default() -> Self {
let (sender, reciever) = unbounded();
Client {
username: "generic_client".to_string(),
uuid: Uuid::new_v4(),
address: "127.0.0.1".to_string(),
output: reciever,
input: sender,
server_channel: None,
stream: Mutex::new(None),
stream_reader: Mutex::new(None),
stream_writer: Mutex::new(None),
}
}
}
// MARK: - used for sorting.
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.uuid == other.uuid
}
}
impl Eq for Client {
}
impl PartialOrd for Client {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Client {
fn cmp(&self, other: &Self) -> Ordering {
self.uuid.cmp(&other.uuid)
}
}

View File

@ -1,29 +0,0 @@
use std::sync::Arc;
use std::net::TcpStream;
use crossbeam_channel::Sender;
use crate::lib::server::ServerMessages;
/// # TClient
/// This trait represents the methods that a client must implement
/// in order to be used with a client manager
///
/// # Methods
/// - new: creates a new client from an id, username and a address.
/// - send: send a message to the client.
/// - recv: if there is a message in the queue, returns the message
/// - send_msg: sends a event message to the client
/// - recv_msg: used by the client to receive and process event messages
pub trait IClient<TClientMessage> {
fn new(
uuid: String,
username: String,
address: String,
stream: TcpStream,
server_channel: Sender<ServerMessages>
) -> Arc<Self>;
fn send(&self, bytes: Vec<u8>) -> Result<(), &str>;
fn recv(&self) -> Option<Vec<u8>>;
}

View File

@ -1,116 +0,0 @@
pub mod client;
pub mod traits;
// use crate::lib::server::ServerMessages;
use std::sync::Arc;
use std::sync::Mutex;
use std::collections::HashMap;
use crossbeam_channel::{unbounded, Receiver, Sender};
use uuid::Uuid;
use self::client::Client;
use self::client::ClientMessage;
use self::traits::TClientManager;
use crate::lib::server::ServerMessages;
use crate::lib::foundation::IMessagable;
use crate::lib::foundation::ICooperative;
enum ClientManagerMessages {
#[allow(dead_code)]
DropAll,
#[allow(dead_code)]
MessageClient,
}
/// # ClientManager
/// This struct manages all connected users
#[derive(Debug)]
pub struct ClientManager {
clients: Mutex<HashMap<Uuid, Arc<Client>>>,
server_channel: Sender<ServerMessages>,
sender: Sender<ClientManagerMessages>,
receiver: Receiver<ClientManagerMessages>,
}
impl ClientManager {
pub fn new(server_channel: Sender<ServerMessages>) -> Arc<Self> {
let (sender, receiver) = unbounded();
Arc::new(ClientManager {
clients: Mutex::default(),
server_channel,
sender,
receiver,
})
}
}
impl TClientManager<Client, ClientMessage> for ClientManager {
fn add_client(&self, client: std::sync::Arc<Client>) {
self.clients.lock().unwrap().insert(client.uuid, client);
}
fn remove_client(&self, uuid: Uuid) {
let _ = self.clients.lock().unwrap().remove(&uuid);
}
fn send_message_to_client(&self, uuid: Uuid, msg: ClientMessage) {
let clients = self.clients.lock().unwrap();
let client = clients.get(&uuid).unwrap();
client.send_message(msg);
}
}
impl ICooperative for ClientManager {
fn tick(&self) {
for message in self.receiver.iter() {
match message {
ClientManagerMessages::DropAll => {
println!("cannot drop all clients yet")
}
_ => println!("[Client Manager]: method not implemented")
}
}
// allocate time for clients.
let clients = self.clients.lock().unwrap();
let _ = clients.iter().map(|(_uuid, client)| client.tick());
}
}
#[cfg(test)]
mod test {
// use super::ClientManager;
// use std::sync::Arc;
// use crate::lib::Foundation::{IOwner};
#[test]
fn test_get_ref() {
// let client_manager = ClientManager::new();
// let _cm_ref = client_manager.get_ref();
// assert_eq!(Arc::weak_count(&client_manager), 2);
}
#[test]
fn test_add_client() {
todo!()
}
#[test]
fn test_remove_client() {
todo!()
}
#[test]
fn test_remove_all_clients() {
todo!()
}
}

View File

@ -1,13 +0,0 @@
use crate::lib::server::client_management::client::ClientMessage;
use std::sync::Arc;
use uuid::Uuid;
/**
* @michael-bailey
*/
pub trait TClientManager<TClient,TClientMessage> {
fn add_client(&self, client: Arc<TClient>);
fn remove_client(&self, uuid: Uuid);
fn send_message_to_client(&self, uuid: Uuid, msg: ClientMessage);
}

View File

@ -1,9 +0,0 @@
pub struct ServerConfig {
pub name: String,
pub address: String,
pub owner: String,
pub host: String,
pub port: u16,
}

View File

@ -1,65 +0,0 @@
pub mod client_management;
pub mod network_manager;
use uuid::Uuid;
use crate::lib::server::network_manager::NetworkManager;
use std::sync::Arc;
use crossbeam_channel::{Receiver, unbounded};
use crate::lib::server::client_management::ClientManager;
use crate::lib::server::client_management::traits::TClientManager;
use crate::lib::foundation::{ICooperative};
use client_management::client::Client;
/// # ServerMessages
/// This is used internally
#[derive(Debug)]
pub enum ServerMessages {
ClientConnected(Arc<Client>),
#[allow(dead_code)]
ClientDisconnected(Uuid),
}
pub struct Server {
client_manager: Arc<ClientManager>,
network_manager: Arc<NetworkManager>,
receiver: Receiver<ServerMessages>,
}
impl Server {
pub fn new() -> Arc<Server> {
let (sender, receiver) = unbounded();
Arc::new(Server {
client_manager: ClientManager::new(sender.clone()),
network_manager: NetworkManager::new("5600".to_string(), sender.clone()),
receiver,
})
}
}
impl ICooperative for Server{
fn tick(&self) {
// handle new messages loop
for message in self.receiver.try_iter() {
match message {
ServerMessages::ClientConnected(client) => {
self.client_manager.add_client(client);
},
ServerMessages::ClientDisconnected(uuid) => {
self.client_manager.remove_client(uuid);
}
}
}
// alocate time for other components
self.network_manager.tick();
self.client_manager.tick();
}
}

View File

@ -1,117 +0,0 @@
use crate::lib::server::Client;
use std::net::TcpListener;
use std::sync::Arc;
use std::io::BufReader;
use std::io::BufWriter;
use std::io::Write;
use std::io::BufRead;
use serde::{Deserialize, Serialize};
use crossbeam_channel::Sender;
use crate::lib::server::ServerMessages;
use crate::lib::foundation::ICooperative;
use crate::lib::server::client_management::client::traits::IClient;
/// # NetworkSockIn
/// these messages can be sent by a client on connecting
#[derive(Serialize, Deserialize)]
enum NetworkSockIn {
Info,
Connect {uuid: String, username: String, address: String},
}
/// # NetworkSockOut
/// these messages are sent by the network manager on connecting and requesting
#[derive(Serialize, Deserialize)]
enum NetworkSockOut<'a> {
Request,
GotInfo {server_name: &'a str, server_owner: &'a str}
}
// these are control signals from the server.
// pub enum NetworkMessages {
// }
pub struct NetworkManager {
listener: TcpListener,
server_channel: Sender<ServerMessages>,
}
impl NetworkManager {
pub fn new(
port: String,
server_channel: Sender<ServerMessages>
) -> Arc<NetworkManager> {
let mut address = "0.0.0.0:".to_string();
address.push_str(&port);
let listener = TcpListener::bind(address)
.expect("Could not bind to address");
Arc::new(NetworkManager {
listener,
server_channel,
})
}
}
impl ICooperative for NetworkManager {
fn tick(&self) {
// get all new connections
// handle each request
for connection in self.listener.incoming() {
if let Ok(stream) = connection {
// create buffered writers
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let mut buffer = String::new();
// request is always sent on new connection
writer.write_all(
serde_json::to_string(&NetworkSockOut::Request).unwrap().as_bytes()
).unwrap_or_default();
writer.write_all(b"\n").unwrap_or_default();
writer.flush().unwrap_or_default();
// read the new request into a buffer
let res = reader.read_line(&mut buffer);
if res.is_err() {continue;}
// turn into enum for pattern matching
if let Ok(request) = serde_json::from_str::<NetworkSockIn>(&buffer) {
// perform action based on the enum
match request {
NetworkSockIn::Info => {
writer.write_all(
serde_json::to_string(
&NetworkSockOut::GotInfo {
server_name: "oof",
server_owner: "michael"
}
).unwrap().as_bytes()
).unwrap();
writer.flush().unwrap();
}
NetworkSockIn::Connect { uuid, username, address } => {
let new_client = Client::new(
uuid,
username,
address,
stream.try_clone().unwrap(),
self.server_channel.clone()
);
self.server_channel.send(
ServerMessages::ClientConnected(new_client)
).unwrap_or_default();
}
}
}
}
}
}
}

View File

@ -1,633 +0,0 @@
// extern crate regex;
// extern crate rayon;
// use super::client_management::client::client_profile::Client;
// use crate::commands::Commands;
// use std::{
// sync::{Arc, Mutex},
// net::{TcpStream, TcpListener},
// collections::HashMap,
// io::prelude::*,
// time::Duration,
// io::Error,
// thread,
// io
// };
// use log::info;
// use crossbeam_channel::{Sender, Receiver, unbounded};
// #[deprecated(
// since = "0.1",
// note = "Please use server v3"
// )]
// #[derive(Debug)]
// pub enum ServerMessages {
// RequestUpdate(Arc<Mutex<TcpStream>>),
// RequestInfo(String, Arc<Mutex<TcpStream>>),
// Disconnect(String),
// Shutdown,
// }
// // MARK: - server struct
// #[deprecated(
// since = "0.1",
// note = "Please use server v3"
// )]
// pub struct Server {
// name: String,
// host: String,
// port: String,
// author: Option<String>,
// //connected_clients: Arc<Mutex<HashMap<String, Client>>>,
// sender: Sender<ServerMessages>,
// receiver: Receiver<ServerMessages>,
// pub running: bool,
// client_list_changed_handle: Box<dyn Fn(&Server)>,
// }
// // MARK: - server implemetation
// #[deprecated(
// since = "0.1",
// note = "Please use server v3"
// )]
// impl Server {
// pub fn new(name: &str, host: &str, port: &str) -> Self {
// let (sender, receiver) = unbounded();
// Self {
// name: name.to_string(),
// host: host.to_string(),
// port: port.to_string()
// author: author.to_string(),
// //connected_clients: Arc::new(Mutex::new(HashMap::new())),
// sender,
// receiver,
// running: false,
// client_list_changed_handle: Box::new(|_s| println!("help"))
// }
// }
// #[allow(dead_code)]
// pub fn get_name(&self) -> String {
// self.name.to_string()
// }
// pub fn set_host() {
// }
// pub fn set_port() {
// }
// #[allow(dead_code)]
// pub fn get_author(&self) -> String {
// self.author.to_string()
// }
// pub fn set_client_update_handle(function: Box<dyn Fn(&Server)>) {
// }
// pub fn start(&mut self) -> Result<(), io::Error> {
// println!("server: starting server...");
// self.running = true;
// // MARK: - creating clones of the server property references
// let name = self.name.clone();
// #[allow(dead_code)]
// let address = self.address.clone();
// let author = self.author.clone();
// let connected_clients = self.connected_clients.clone();
// let sender = self.sender.clone();
// let receiver = self.receiver.clone();
// // set up listener and buffer
// let mut buffer = [0; 1024];
// let listener = TcpListener::bind(self.get_address())?;
// listener.set_nonblocking(true)?;
// println!("server: spawning threads");
// let _ = thread::Builder::new().name("Server Thread".to_string()).spawn(move || {
// 'outer: loop {
// std::thread::sleep(Duration::from_millis(100));
// // get messages from the servers channel.
// println!("server: getting messages");
// for i in receiver.try_iter() {
// match i {
// ServerMessages::Shutdown => {
// // TODO: implement disconnecting all clients and shutting down the server.
// println!("server: shutting down...");
// break 'outer;
// },
// ServerMessages::RequestUpdate(stream_arc) => {
// for (_k, v) in connected_clients.lock().unwrap().iter() {
// let mut stream = stream_arc.lock().unwrap();
// let _ = Server::transmit_data(&mut stream, v.to_string().as_str());
// if Server::read_data(&mut stream, &mut buffer).unwrap_or(Commands::Error(None)) == Commands::Success(None) {
// println!("Success Confirmed");
// } else {
// println!("no success read");
// let error = Commands::Error(None);
// let _ = Server::transmit_data(&mut stream, error.to_string().as_str());
// }
// }
// },
// ServerMessages::RequestInfo(uuid, stream_arc) => {
// let mut stream = stream_arc.lock().unwrap();
// if let Some(client) = connected_clients.lock().unwrap().get(&uuid) {
// let params: HashMap<String, String> = [(String::from("uuid"), client.get_uuid()), (String::from("name"), client.get_username()), (String::from("host"), client.get_address())].iter().cloned().collect();
// let command = Commands::Success(Some(params));
// let _ = Server::transmit_data(&mut stream, command.to_string().as_str());
// } else {
// let command = Commands::Success(None);
// let _ = Server::transmit_data(&mut stream, command.to_string().as_str());
// }
// },
// ServerMessages::Disconnect(uuid) => {
// let mut clients = connected_clients.lock().unwrap();
// clients.remove(&uuid.to_string());
// let params: HashMap<String, String> = [(String::from("uuid"), uuid)].iter().cloned().collect();
// let command = Commands::ClientRemove(Some(params));
// let _ = connected_clients.lock().unwrap().iter().map(move |(_k, v)| {v.get_sender().send(command.clone())});
// },
// }
// }
// println!("server: checking for new connections");
// if let Ok((mut stream, _addr)) = listener.accept() {
// stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
// let _ = stream.set_nonblocking(false);
// let request = Commands::Request(None);
// let _ = Server::transmit_data(&mut stream, &request.to_string().as_str());
// match Server::read_data(&mut stream, &mut buffer) {
// Ok(command) => {
// println!("Server: new connection sent - {:?}", command);
// match command {
// Commands::Connect(Some(data)) => {
// let uuid = data.get("uuid").unwrap();
// let username = data.get("name").unwrap();
// let address = data.get("host").unwrap();
// println!("{}", format!("Server: new Client connection: _addr = {}", address ));
// let client = Client::new(stream, sender.clone(), &uuid, &username, &address);
// connected_clients.lock().unwrap().insert(uuid.to_string(), client);
// let params: HashMap<String, String> = [(String::from("name"), username.clone()), (String::from("host"), address.clone()), (String::from("uuid"), uuid.clone())].iter().cloned().collect();
// let new_client = Commands::Client(Some(params));
// let _ = connected_clients.lock().unwrap().iter().map(|(_k, v)| v.sender.send(new_client.clone()));
// },
// // TODO: - correct connection reset error when getting info.
// Commands::Info(None) => {
// println!("Server: info requested");
// let params: HashMap<String, String> = [(String::from("name"), name.to_string().clone()), (String::from("owner"), author.to_string().clone())].iter().cloned().collect();
// let command = Commands::Info(Some(params));
// let _ = Server::transmit_data(&mut stream, command.to_string().as_str());
// },
// _ => {
// println!("Server: Invalid command sent");
// let _ = Server::transmit_data(&mut stream, Commands::Error(None).to_string().as_str());
// },
// }
// },
// Err(_) => println!("ERROR: stream closed"),
// }
// }
// // TODO: end -
// // handle each client for messages
// println!("server: handing control to clients");
// for (_k, client) in connected_clients.lock().unwrap().iter_mut() {
// client.handle_connection();
// }
// }
// info!("server: stopped");
// });
// info!("server: started");
// Ok(())
// }
// pub fn stop(&mut self) {
// info!("server: sending stop message");
// let _ = self.sender.send(ServerMessages::Shutdown);
// self.running = false;
// }
// fn transmit_data(stream: &mut TcpStream, data: &str) -> Result<(), Error>{
// println!("Transmitting...");
// println!("data: {}", data);
// /*
// * This will throw an error and crash any thread, including the main thread, if
// * the connection is lost before transmitting. Maybe change to handle any exceptions
// * that may occur.
// */
// let _ = stream.write(data.to_string().as_bytes())?;
// stream.flush()?;
// Ok(())
// }
// fn read_data(stream: &mut TcpStream, buffer: &mut [u8; 1024]) -> Result<Commands, Error> {
// let _ = stream.read(buffer)?;
// let command = Commands::from(buffer);
// Ok(command)
// }
// }
// impl ToString for Server {
// fn to_string(&self) -> std::string::String { todo!() }
// }
// impl Drop for Server {
// fn drop(&mut self) {
// println!("server dropped");
// let _ = self.sender.send(ServerMessages::Shutdown);
// }
// }
// /* The new version of the server no long works with these unit
// * tests.
// * They will be fixed soon!
// * TODO: fix unit tests
// */
// /*#[cfg(test)]
// #[deprecated(
// since = "0.1",
// note = "Please use server v3"
// )]
// mod tests{
// use super::*;
// use std::{thread, time};
// use std::sync::Once;
// use std::time::Duration;
// lazy_static!{
// static ref SERVER_NAME: &'static str = "test";
// static ref SERVER_ADDRESS: &'static str = "0.0.0.0:6000";
// static ref SERVER_AUTHOR: &'static str = "test";
// static ref SERVER: Server<'static> = Server::new(&SERVER_NAME, &SERVER_ADDRESS, &SERVER_AUTHOR);
// }
// static START: Once = Once::new();
// /*
// * These tests must be executed individually to ensure that no errors
// * occur, this is due to the fact that the server is created everytime.
// * Setup a system for the server to close after every test.
// */
// fn setup_server(){
// unsafe{
// START.call_once(|| {
// thread::spawn(|| {
// SERVER.start();
// });
// });
// let millis = time::Duration::from_millis(1000);
// thread::sleep(millis);
// }
// }
// fn establish_client_connection(uuid: &str) -> TcpStream {
// let mut buffer = [0; 1024];
// let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
// let mut command = read_data(&stream, &mut buffer);
// assert_eq!(command, Commands::Request(None));
// let msg: String = format!("!connect: uuid:{uuid} name:\"{name}\" host:\"{host}\"", uuid=uuid, name="alice", host="127.0.0.1");
// transmit_data(&stream, msg.as_str());
// command = read_data(&stream, &mut buffer);
// assert_eq!(command, Commands::Success(None));
// stream
// }
// fn transmit_data(mut stream: &TcpStream, data: &str){
// stream.write(data.to_string().as_bytes()).unwrap();
// stream.flush().unwrap();
// }
// fn read_data(mut stream: &TcpStream, buffer: &mut [u8; 1024]) -> Commands {
// match stream.read(buffer) {
// Ok(_) => Commands::from(buffer),
// Err(_) => Commands::Error(None),
// }
// }
// fn force_disconnect(mut stream: &TcpStream){
// let msg = "!disconnect:";
// transmit_data(&stream, msg);
// }
// #[test]
// fn test_server_connect(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
// stream.read(&mut buffer).unwrap();
// let mut command = Commands::from(&mut buffer);
// assert_eq!(command, Commands::Request(None));
// let msg = b"!connect: uuid:123456-1234-1234-123456 name:\"alice\" host:\"127.0.0.1\"";
// stream.write(msg).unwrap();
// stream.read(&mut buffer).unwrap();
// command = Commands::from(&mut buffer);
// assert_eq!(command, Commands::Success(None));
// let msg = b"!disconnect:";
// stream.write(msg).unwrap();
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// #[test]
// fn test_server_info(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream = TcpStream::connect("0.0.0.0:6000").unwrap();
// let command = read_data(&stream, &mut buffer);
// assert_eq!(command, Commands::Request(None));
// let msg = "!info:";
// transmit_data(&stream, msg);
// let command = read_data(&stream, &mut buffer);
// let params: HashMap<String, String> = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect();
// assert_eq!(command, Commands::Success(Some(params)));
// }
// #[test]
// fn test_client_info(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream = establish_client_connection("1234-5542-2124-155");
// let msg = "!info:";
// transmit_data(&stream, msg);
// let command = read_data(&stream, &mut buffer);
// let params: HashMap<String, String> = [(String::from("name"), String::from("test")), (String::from("owner"), String::from("test"))].iter().cloned().collect();
// assert_eq!(command, Commands::Success(Some(params)));
// let msg = "!disconnect:";
// transmit_data(&stream, msg);
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// #[test]
// fn test_clientUpdate_solo(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream = establish_client_connection("1222-555-6-7");
// let msg = "!clientUpdate:";
// transmit_data(&stream, msg);
// let command = read_data(&stream, &mut buffer);
// assert_eq!(command, Commands::Success(None));
// let msg = "!disconnect:";
// transmit_data(&stream, msg);
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// #[test]
// fn test_clientUpdate_multi(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream_one = establish_client_connection("0001-776-6-5");
// let mut stream_two = establish_client_connection("0010-776-6-5");
// let mut stream_three = establish_client_connection("0011-776-6-5");
// let mut stream_four = establish_client_connection("0100-776-6-5");
// let client_uuids: [String; 3] = [String::from("0010-776-6-5"), String::from("0011-776-6-5"), String::from("0100-776-6-5")];
// let mut user_1 = true;
// let mut user_2 = true;
// let mut user_3 = true;
// for uuid in client_uuids.iter() {
// let command = read_data(&stream_one, &mut buffer);
// if *uuid == String::from("0010-776-6-5") && user_1 {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// user_1 = false;
// } else if *uuid == String::from("0011-776-6-5") && user_2 {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// user_2 = false;
// } else if *uuid == String::from("0100-776-6-5") && user_3 {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// user_3 = false;
// } else {
// assert!(false);
// }
// let msg = "!success:";
// transmit_data(&stream_one, msg);
// }
// stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
// let mut unsuccessful = true;
// while unsuccessful {
// let msg = "!clientUpdate:";
// transmit_data(&stream_one, msg);
// let command = read_data(&stream_one, &mut buffer);
// match command.clone() {
// Commands::Error(None) => println!("resending..."),
// _ => {
// assert_eq!(command, Commands::Success(None));
// unsuccessful = false;
// },
// }
// }
// stream_one.set_read_timeout(None).unwrap();
// for x in 0..3 {
// let command = read_data(&stream_one, &mut buffer);
// let command_clone = command.clone();
// match command{
// Commands::Client(Some(params)) => {
// let uuid = params.get("uuid").unwrap();
// if *uuid == String::from("0010-776-6-5") {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command_clone, Commands::Client(Some(params)));
// } else if *uuid == String::from("0011-776-6-5") {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0011-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command_clone, Commands::Client(Some(params)));
// } else if *uuid == String::from("0100-776-6-5") {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0100-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command_clone, Commands::Client(Some(params)));
// } else {
// assert!(false);
// }
// },
// _ => assert!(false),
// }
// let msg = "!success:";
// transmit_data(&stream_one, msg);
// }
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// let msg = "!disconnect:";
// transmit_data(&stream_one, msg);
// transmit_data(&stream_two, msg);
// transmit_data(&stream_three, msg);
// transmit_data(&stream_four, msg);
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// #[test]
// fn test_clientInfo(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream_one = establish_client_connection("0001-776-6-5");
// let mut stream_two = establish_client_connection("\"0010-776-6-5\"");
// let command = read_data(&stream_one, &mut buffer);
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// let msg = "!success:";
// transmit_data(&stream_one, msg);
// stream_one.set_read_timeout(Some(Duration::from_millis(3000))).unwrap();
// let mut unsuccessful = true;
// while unsuccessful {
// let msg = "!clientInfo: uuid:\"0010-776-6-5\"";
// transmit_data(&stream_one, msg);
// let command = read_data(&stream_one, &mut buffer);
// match command.clone() {
// Commands::Error(None) => println!("resending..."),
// _ => {
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("\"0010-776-6-5\"")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Success(Some(params)));
// unsuccessful = false;
// },
// }
// }
// stream_one.set_read_timeout(None).unwrap();
// let msg = "!disconnect:";
// transmit_data(&stream_one, msg);
// transmit_data(&stream_two, msg);
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// #[test]
// fn test_client_disconnect(){
// let mut buffer = [0; 1024];
// setup_server();
// let mut stream_one = establish_client_connection("0001-776-6-5");
// let mut stream_two = establish_client_connection("0010-776-6-5");
// let command = read_data(&stream_one, &mut buffer);
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5")), (String::from("name"), String::from("\"alice\"")), (String::from("host"), String::from("\"127.0.0.1\""))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// let msg = "!success:";
// transmit_data(&stream_one, msg);
// let msg = "!disconnect:";
// transmit_data(&stream_two, msg);
// let command = read_data(&stream_one, &mut buffer);
// let params: HashMap<String, String> = [(String::from("uuid"), String::from("0010-776-6-5"))].iter().cloned().collect();
// assert_eq!(command, Commands::Client(Some(params)));
// let msg = "!success:";
// transmit_data(&stream_one, msg);
// stream_one.set_read_timeout(Some(Duration::from_millis(2000))).unwrap();
// match stream_one.peek(&mut buffer) {
// Ok(_) => assert!(false),
// Err(_) => assert!(true),
// }
// stream_one.set_read_timeout(None).unwrap();
// let msg = "!disconnect:";
// transmit_data(&stream_one, msg);
// let dur = time::Duration::from_millis(500);
// thread::sleep(dur);
// }
// }*/

View File

@ -1,372 +0,0 @@
// use std::time::Duration;
// use std::{
// collections::HashMap,
// io,
// io::{Read, Write},
// net::{TcpListener, TcpStream},
// sync::{Arc, Mutex},
// };
// use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
// use log::info;
// use crate::commands::Commands;
// use super::client_management;
// #[derive(Debug)]
// pub enum ServerMessages {
// RequestUpdate(Arc<Mutex<TcpStream>>),
// RequestInfo(String, Arc<Mutex<TcpStream>>),
// Disconnect(String),
// Shutdown,
// }
// pub enum ServerEvent {
// Stopped,
// Started,
// addedClient(Arc<Mutex<Client>>),
// }
// #[allow(dead_code)]
// #[derive(Eq, PartialEq, Debug)]
// pub enum ServerState {
// Starting,
// Started,
// Stopping,
// Stopped,
// }
// // MARK: - server struct
// #[allow(dead_code)]
// pub struct Server<T> {
// pub config: ,
// pub state: ServerState,
// // to be seperated into a different struct
// connected_clients: HashMap<String, Client>,
// server_event_sink: Sender<ServerEvent>,
// server_message_source: Receiver<T>,
// message_source_handler: fn(&Self, event: T) -> (),
// buffer: [u8; 1024],
// // metrics
// pub o2s_rqst: usize,
// pub c2s_msgs: usize,
// pub s2s_msgs: usize,
// pub s2c_msgs: usize,
// }
// // MARK: - server implemetation
// impl Server {
// pub fn new(name: &str, address: &str, author: &str) -> Result<Self, io::Error> {
// // creating server channels
// let (sender, receiver) = unbounded();
// Ok(Self {
// // server data
// name: name.to_string(),
// address: address.to_string(),
// owner: author.to_string(),
// connected_clients: HashMap::new(),
// state: ServerState::Stopped,
// // messages & connections
// sender,
// receiver,
// listener: None,
// buffer: [0; 1024],
// // metrics
// o2s_rqst: 0,
// c2s_msgs: 0,
// s2s_msgs: 0,
// s2c_msgs: 0,
// })
// }
// pub fn get_name(&self) -> String {
// self.name.clone()
// }
// pub fn get_address(&self) -> String {
// self.address.clone()
// }
// pub fn get_owner(&self) -> String {
// self.owner.clone()
// }
// fn handle_server_messages(&mut self) -> Result<(), Vec<Result<(), ServerError>>> {
// // check for any server messages in the channel
// println!("server: getting messages");
// self.receiver.try_iter().map(|msg| {
// let _ = match msg {
// // request the server to shutdown
// // TODO: - move this into the stop method
// ServerMessages::Shutdown => {
// println!("server: shutting down...");
// let results = self
// .connected_clients
// .iter()
// .map(|(_k, v)| v.sender.send(Commands::Disconnect(None)))
// .cloned()
// .collect();
// self.state = ServerState::Stopping;
// }
// // a client requests an updated list of clients
// ServerMessages::RequestUpdate(stream_arc) => {
// self.c2s_msgs += 1;
// self.connected_clients.iter().map(|(_k, v)| {
// let mut stream = stream_arc.lock().unwrap();
// let _ = Server::send_data(&mut stream, v.to_string().as_str());
// let data =
// Server::recv_data(&mut stream, &mut self.buffer).unwrap_or(Commands::Error(None));
// if data == Commands::Success(None) {
// println!("Success Confirmed");
// } else {
// println!("No success read");
// let error = Commands::Error(None);
// let _ = Server::send_data(&mut stream, error.to_string().as_str());
// }
// })
// }
// // a client requests for the servers info
// ServerMessages::RequestInfo(uuid, stream_arc) => {
// self.c2s_msgs += 1;
// let mut stream = stream_arc.lock().unwrap();
// if let Some(client) = self.connected_clients.get(&uuid) {
// let params: HashMap<String, String> = [
// (String::from("uuid"), client.get_uuid()),
// (String::from("name"), client.get_username()),
// (String::from("host"), client.get_address()),
// ]
// .iter()
// .cloned()
// .collect();
// let command = Commands::Success(Some(params));
// let _ = Server::send_data(&mut stream, command.to_string().as_str());
// } else {
// let command = Commands::Success(None);
// let _ = Server::send_data(&mut stream, command.to_string().as_str());
// }
// }
// // a client requests to disconnect
// ServerMessages::Disconnect(uuid) => {
// self.c2s_msgs += 1;
// self.connected_clients.remove(&uuid.to_string());
// let params: HashMap<String, String> =
// [(String::from("uuid"), uuid)].iter().cloned().collect();
// let command = Commands::ClientRemove(Some(params));
// let _ = self
// .connected_clients
// .iter()
// .map(move |(_k, v)| v.get_sender().send(command.clone()));
// }
// };
// });
// Ok(())
// }
// #[allow(dead_code)]
// pub fn tick(&mut self) -> Result<(), ServerError> {
// // check to see if this server is ready to execute things.
// if self.state == ServerState::Stopped {
// Err(ServerIsStopped)
// }
// self.handle_server_messages();
// println!("server: checking for new connections");
// if let Ok((mut stream, _addr)) = self
// .listener
// .as_ref()
// .expect("tcpListener not here")
// .accept()
// {
// let _ = stream.set_read_timeout(Some(Duration::from_millis(1000)));
// let _ = stream.set_nonblocking(false);
// let request = Commands::Request(None);
// let _ = Server::send_data(&mut stream, &request.to_string().as_str());
// match Server::recv_data(&mut stream, &mut self.buffer) {
// Ok(Commands::Connect(Some(data))) => {
// self.o2s_rqst += 1;
// let uuid = data.get("uuid").unwrap();
// let username = data.get("name").unwrap();
// let address = data.get("host").unwrap();
// info!("{}", format!("Server: new client from {}", address));
// let client = Client::new(stream, self.sender.clone(), &uuid, &username, &address);
// self.connected_clients.insert(uuid.to_string(), client);
// let params: HashMap<String, String> = [
// (String::from("name"), username.clone()),
// (String::from("host"), address.clone()),
// (String::from("uuid"), uuid.clone()),
// ]
// .iter()
// .cloned()
// .collect();
// let new_client = Commands::Client(Some(params));
// let _ = self
// .connected_clients
// .iter()
// .map(|(_k, v)| v.sender.send(new_client.clone()));
// }
// Ok(Commands::Info(None)) => {
// self.o2s_rqst += 1;
// println!("Server: info requested");
// let params: HashMap<String, String> = [
// (String::from("name"), self.name.to_string().clone()),
// (String::from("owner"), self.owner.to_string().clone()),
// ]
// .iter()
// .cloned()
// .collect();
// let command = Commands::Info(Some(params));
// let _ = Server::send_data(&mut stream, command.to_string().as_str());
// }
// Err(_) => println!("ERROR: stream closed"),
// // TODO: - correct connection reset error when getting info.
// _ => {
// println!("Server: Invalid command sent");
// let _ = Server::send_data(&mut stream, Commands::Error(None).to_string().as_str());
// }
// }
// }
// println!("server: handing control to clients");
// for (_k, client) in self.connected_clients.iter_mut() {
// client.handle_connection();
// }
// Ok(())
// }
// #[allow(dead_code)]
// pub fn start(&mut self) -> Result<(), io::Error> {
// let listener = TcpListener::bind(&self.address)?;
// listener.set_nonblocking(true)?;
// self.listener = Some(listener);
// self.state = ServerState::Started;
// Ok(())
// }
// #[allow(dead_code)]
// pub fn stop(&mut self) -> Result<(), SendError<ServerMessages>> {
// info!("server: sending stop message");
// self.sender.send(ServerMessages::Shutdown)?;
// self.state = ServerState::Stopping;
// Ok(())
// }
// #[allow(dead_code)]
// fn send_data(stream: &mut TcpStream, data: &str) -> Result<(), io::Error> {
// println!("Transmitting...");
// println!("data: {}", data);
// /*
// * This will throw an error and crash any thread, including the main thread, if
// * the connection is lost before transmitting. Maybe change to handle any exceptions
// * that may occur.
// */
// let _ = stream.write(data.to_string().as_bytes())?;
// stream.flush()?;
// Ok(())
// }
// #[allow(dead_code)]
// fn recv_data(stream: &mut TcpStream, buffer: &mut [u8; 1024]) -> Result<Commands, io::Error> {
// let _ = stream.read(buffer)?;
// let command = Commands::from(buffer);
// Ok(command)
// }
// }
// impl Drop for Server {
// // TODO: - implement the drop logic
// // this includes signaling all clients to disconnect
// fn drop(&mut self) {}
// }
// #[cfg(test)]
// mod server_v3_tests {
// use crate::server::server_v3::{Server, ServerState};
// #[test]
// fn test_creation_and_drop() {
// let server =
// Server::new("test server", "0.0.0.0:6000", "michael").expect("server creation failed");
// assert_eq!(server.name, "test server");
// assert_eq!(server.address, "0.0.0.0:6000");
// assert_eq!(server.owner, "michael");
// }
// #[test]
// fn test_server_start() {
// let mut server =
// Server::new("test server", "0.0.0.0:6000", "michael").expect("server creation failed");
// let result = server.start();
// assert!(result.is_ok());
// assert_eq!(server.state, ServerState::Started);
// }
// #[test]
// fn test_server_stop() {
// let mut server =
// Server::new("test server", "0.0.0.0:6000", "michael").expect("server creation failed");
// let _ = server.start();
// let result = server.stop();
// assert!(result.is_ok());
// assert_eq!(server.state, ServerState::Stopping);
// }
// #[test]
// fn test_server_start_stop_and_one_tick() {
// let mut server =
// Server::new("test server", "0.0.0.0:6000", "michael").expect("server creation failed");
// let _ = server.start();
// let result = server.stop();
// server.tick();
// assert!(result.is_ok());
// assert_eq!(server.state, ServerState::Stopped);
// }
// }

View File

@ -1,84 +0,0 @@
mod lib;
use clap::{App, Arg};
use crate::lib::server::Server;
use crate::lib::foundation::ICooperative;
fn main() {
let _args = App::new("--rust chat server--")
.version("0.1.5")
.author("Mitchel Hardie <mitch161>, Michael Bailey <michael-bailey>")
.about("this is a chat server developed in rust, depending on the version one of two implementations will be used")
.arg(
Arg::with_name("config")
.short("p")
.long("port")
.value_name("PORT")
.help("sets the port the server runs on.")
.takes_value(true))
.get_matches();
let server = Server::new();
loop {
server.tick();
}
}
// MARK: - general testing zone
// #[cfg(test)]
// mod tests {
// use crate::server::server_profile::Server;
// use crate::client_api::ClientApi;
// use std::collections::HashMap;
// use crate::commands::Commands;
// use std::{thread, time};
// #[test]
// fn test_server_info() {
// // setup the server
// let name = "Server-01";
// let address = "0.0.0.0:6000";
// let owner = "noreply@email.com";
// let mut server = Server::new(name, address, owner);
// let result = server.start();
// assert_eq!(result.is_ok(), true);
// let dur = time::Duration::from_millis(1000);
// thread::sleep(dur);
// let api = ClientApi::get_info("127.0.0.1:6000");
// assert_eq!(api.is_ok(), true);
// if let Ok(api) = api {
// println!("received: {:?}", api);
// let mut map = HashMap::new();
// map.insert("name".to_string(), name.to_string());
// map.insert("owner".to_string(), owner.to_string());
// let expected = Commands::Info(Some(map));
// println!("expected: {:?}", expected);
// assert_eq!(api, expected);
// }
// }
// #[test]
// fn test_server_connect() {
// let name = "Server-01";
// let address = "0.0.0.0:6001";
// let owner = "noreply@email.com";
// let mut server = Server::new(name, address, owner);
// let _ = server.start().unwrap();
// let api_result = ClientApi::new(address);
// assert_eq!(api_result.is_ok(), true);
// if api_result.is_ok() {
// std::thread::sleep(std::time::Duration::from_secs(2));
// }
// }
// }