src/server/state.rs: use new BroadcastData helper

This commit is contained in:
Dietmar Maurer 2019-04-30 10:21:48 +02:00
parent 824b5ee4ee
commit e45afdff9c

View File

@ -5,10 +5,9 @@ use std::sync::Mutex;
use futures::*; use futures::*;
use futures::stream::Stream; use futures::stream::Stream;
use tokio::sync::oneshot;
use tokio_signal::unix::{Signal, SIGHUP, SIGINT}; use tokio_signal::unix::{Signal, SIGHUP, SIGINT};
use crate::tools; use crate::tools::{self, BroadcastData};
#[derive(PartialEq, Copy, Clone, Debug)] #[derive(PartialEq, Copy, Clone, Debug)]
pub enum ServerMode { pub enum ServerMode {
@ -18,8 +17,8 @@ pub enum ServerMode {
pub struct ServerState { pub struct ServerState {
pub mode: ServerMode, pub mode: ServerMode,
pub shutdown_listeners: Vec<oneshot::Sender<()>>, pub shutdown_listeners: BroadcastData<()>,
pub last_worker_listeners: Vec<oneshot::Sender<()>>, pub last_worker_listeners: BroadcastData<()>,
pub worker_count: usize, pub worker_count: usize,
pub reload_request: bool, pub reload_request: bool,
} }
@ -28,8 +27,8 @@ pub struct ServerState {
lazy_static! { lazy_static! {
static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState { static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
mode: ServerMode::Normal, mode: ServerMode::Normal,
shutdown_listeners: vec![], shutdown_listeners: BroadcastData::new(),
last_worker_listeners: vec![], last_worker_listeners: BroadcastData::new(),
worker_count: 0, worker_count: 0,
reload_request: false, reload_request: false,
}); });
@ -85,36 +84,22 @@ pub fn server_shutdown() {
data.mode = ServerMode::Shutdown; data.mode = ServerMode::Shutdown;
notify_listeners(&mut data.shutdown_listeners); data.shutdown_listeners.notify_listeners(Ok(()));
drop(data); // unlock drop(data); // unlock
check_last_worker(); check_last_worker();
} }
pub fn shutdown_future() -> oneshot::Receiver<()> { pub fn shutdown_future() -> impl Future<Item=(), Error=Error> {
let (tx, rx) = oneshot::channel::<()>();
let mut data = SERVER_STATE.lock().unwrap(); let mut data = SERVER_STATE.lock().unwrap();
match data.mode { data.shutdown_listeners.listen()
ServerMode::Normal => { data.shutdown_listeners.push(tx); },
ServerMode::Shutdown => { let _ = tx.send(()); },
}
rx
} }
pub fn last_worker_future() -> oneshot::Receiver<()> { pub fn last_worker_future() -> impl Future<Item=(), Error=Error> {
let (tx, rx) = oneshot::channel::<()>();
let mut data = SERVER_STATE.lock().unwrap(); let mut data = SERVER_STATE.lock().unwrap();
if data.mode == ServerMode::Shutdown && data.worker_count == 0 { data.last_worker_listeners.listen()
let _ = tx.send(());
} else {
data.last_worker_listeners.push(tx);
}
rx
} }
pub fn set_worker_count(count: usize) { pub fn set_worker_count(count: usize) {
@ -123,7 +108,7 @@ pub fn set_worker_count(count: usize) {
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; }
notify_listeners(&mut data.last_worker_listeners); data.last_worker_listeners.notify_listeners(Ok(()));
} }
@ -133,19 +118,5 @@ pub fn check_last_worker() {
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; }
notify_listeners(&mut data.last_worker_listeners); data.last_worker_listeners.notify_listeners(Ok(()));
}
fn notify_listeners(list: &mut Vec<oneshot::Sender<()>>) {
loop {
match list.pop() {
None => { break; },
Some(ch) => {
println!("SEND ABORT");
if let Err(_) = ch.send(()) {
eprintln!("SEND ABORT failed");
}
},
}
}
} }