From e45afdff9cbb0d1854ea112b2c6dab120791f036 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 30 Apr 2019 10:21:48 +0200 Subject: [PATCH] src/server/state.rs: use new BroadcastData helper --- src/server/state.rs | 53 ++++++++++----------------------------------- 1 file changed, 12 insertions(+), 41 deletions(-) diff --git a/src/server/state.rs b/src/server/state.rs index f7ee4300..3f8e47e6 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -5,10 +5,9 @@ use std::sync::Mutex; use futures::*; use futures::stream::Stream; -use tokio::sync::oneshot; use tokio_signal::unix::{Signal, SIGHUP, SIGINT}; -use crate::tools; +use crate::tools::{self, BroadcastData}; #[derive(PartialEq, Copy, Clone, Debug)] pub enum ServerMode { @@ -18,8 +17,8 @@ pub enum ServerMode { pub struct ServerState { pub mode: ServerMode, - pub shutdown_listeners: Vec>, - pub last_worker_listeners: Vec>, + pub shutdown_listeners: BroadcastData<()>, + pub last_worker_listeners: BroadcastData<()>, pub worker_count: usize, pub reload_request: bool, } @@ -28,8 +27,8 @@ pub struct ServerState { lazy_static! { static ref SERVER_STATE: Mutex = Mutex::new(ServerState { mode: ServerMode::Normal, - shutdown_listeners: vec![], - last_worker_listeners: vec![], + shutdown_listeners: BroadcastData::new(), + last_worker_listeners: BroadcastData::new(), worker_count: 0, reload_request: false, }); @@ -85,36 +84,22 @@ pub fn server_shutdown() { data.mode = ServerMode::Shutdown; - notify_listeners(&mut data.shutdown_listeners); + data.shutdown_listeners.notify_listeners(Ok(())); drop(data); // unlock check_last_worker(); } -pub fn shutdown_future() -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel::<()>(); - +pub fn shutdown_future() -> impl Future { let mut data = SERVER_STATE.lock().unwrap(); - match data.mode { - ServerMode::Normal => { data.shutdown_listeners.push(tx); }, - ServerMode::Shutdown => { let _ = tx.send(()); }, - } - - rx + data.shutdown_listeners.listen() } -pub fn last_worker_future() -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel::<()>(); +pub fn last_worker_future() -> impl Future { let mut data = SERVER_STATE.lock().unwrap(); - if data.mode == ServerMode::Shutdown && data.worker_count == 0 { - let _ = tx.send(()); - } else { - data.last_worker_listeners.push(tx); - } - - rx + data.last_worker_listeners.listen() } pub fn set_worker_count(count: usize) { @@ -123,7 +108,7 @@ pub fn set_worker_count(count: usize) { if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } - notify_listeners(&mut data.last_worker_listeners); + data.last_worker_listeners.notify_listeners(Ok(())); } @@ -133,19 +118,5 @@ pub fn check_last_worker() { if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } - notify_listeners(&mut data.last_worker_listeners); -} - -fn notify_listeners(list: &mut Vec>) { - loop { - match list.pop() { - None => { break; }, - Some(ch) => { - println!("SEND ABORT"); - if let Err(_) = ch.send(()) { - eprintln!("SEND ABORT failed"); - } - }, - } - } + data.last_worker_listeners.notify_listeners(Ok(())); }