server/state: add spawn_internal_task and use it for websockets

is a helper to spawn an internal tokio task without it showing up
in the task list

it is still tracked for reload and notifies the last_worker_listeners

this enables the console to survive a reload of proxmox-backup-proxy

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2020-07-23 15:20:13 +02:00 committed by Thomas Lamprecht
parent 224c65f8de
commit 33a88dafb9
2 changed files with 31 additions and 9 deletions

View File

@ -267,7 +267,7 @@ fn upgrade_to_websocket(
let (ws, response) = WebSocket::new(parts.headers)?; let (ws, response) = WebSocket::new(parts.headers)?;
tokio::spawn(async move { crate::server::spawn_internal_task(async move {
let conn: Upgraded = match req_body.on_upgrade().map_err(Error::from).await { let conn: Upgraded = match req_body.on_upgrade().map_err(Error::from).await {
Ok(upgraded) => upgraded, Ok(upgraded) => upgraded,
_ => bail!("error"), _ => bail!("error"),

View File

@ -19,6 +19,7 @@ pub struct ServerState {
pub shutdown_listeners: BroadcastData<()>, pub shutdown_listeners: BroadcastData<()>,
pub last_worker_listeners: BroadcastData<()>, pub last_worker_listeners: BroadcastData<()>,
pub worker_count: usize, pub worker_count: usize,
pub task_count: usize,
pub reload_request: bool, pub reload_request: bool,
} }
@ -28,6 +29,7 @@ lazy_static! {
shutdown_listeners: BroadcastData::new(), shutdown_listeners: BroadcastData::new(),
last_worker_listeners: BroadcastData::new(), last_worker_listeners: BroadcastData::new(),
worker_count: 0, worker_count: 0,
task_count: 0,
reload_request: false, reload_request: false,
}); });
} }
@ -101,20 +103,40 @@ pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
} }
pub fn set_worker_count(count: usize) { pub fn set_worker_count(count: usize) {
let mut data = SERVER_STATE.lock().unwrap(); SERVER_STATE.lock().unwrap().worker_count = count;
data.worker_count = count;
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } check_last_worker();
data.last_worker_listeners.notify_listeners(Ok(()));
} }
pub fn check_last_worker() { pub fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap(); let mut data = SERVER_STATE.lock().unwrap();
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.task_count == 0) { return; }
data.last_worker_listeners.notify_listeners(Ok(())); data.last_worker_listeners.notify_listeners(Ok(()));
} }
/// Spawns a tokio task that will be tracked for reload
/// and if it is finished, notify the last_worker_listener if we
/// are in shutdown mode
pub fn spawn_internal_task<T>(task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let mut data = SERVER_STATE.lock().unwrap();
data.task_count += 1;
tokio::spawn(async move {
let _ = tokio::spawn(task).await; // ignore errors
{ // drop mutex
let mut data = SERVER_STATE.lock().unwrap();
if data.task_count > 0 {
data.task_count -= 1;
}
}
check_last_worker();
});
}