From 5751e49566e0e2e2a7c364ebb377558ceaf28319 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 7 May 2020 08:30:38 +0200 Subject: [PATCH] src/server/worker_task.rs: implement and use status command --- src/api2/node/tasks.rs | 12 +++---- src/server/worker_task.rs | 66 ++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs index 3b28c251..30d91313 100644 --- a/src/api2/node/tasks.rs +++ b/src/api2/node/tasks.rs @@ -77,7 +77,7 @@ use crate::config::cached_user_info::CachedUserInfo; }, )] /// Get task status. -fn get_task_status( +async fn get_task_status( param: Value, rpcenv: &mut dyn RpcEnvironment, ) -> Result { @@ -102,7 +102,7 @@ fn get_task_status( "user": upid.username, }); - if crate::server::worker_is_active(&upid) { + if crate::server::worker_is_active(&upid).await? { result["status"] = Value::from("running"); } else { let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(String::from("unknown")); @@ -154,7 +154,7 @@ fn extract_upid(param: &Value) -> Result { }, )] /// Read task log. -fn read_task_log( +async fn read_task_log( param: Value, rpcenv: &mut dyn RpcEnvironment, ) -> Result { @@ -202,7 +202,7 @@ fn read_task_log( rpcenv.set_result_attrib("total", Value::from(count)); if test_status { - let active = crate::server::worker_is_active(&upid); + let active = crate::server::worker_is_active(&upid).await?; rpcenv.set_result_attrib("active", Value::from(active)); } @@ -241,9 +241,7 @@ fn stop_task( user_info.check_privs(&username, &["system", "tasks"], PRIV_SYS_MODIFY, false)?; } - if crate::server::worker_is_active(&upid) { - server::abort_worker_async(upid); - } + server::abort_worker_async(upid); Ok(Value::Null) } diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 3d2b4a04..202422e6 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -41,8 +41,38 @@ lazy_static! { } /// Test if the task is still running -pub fn worker_is_active(upid: &UPID) -> bool { +pub async fn worker_is_active(upid: &UPID) -> Result { + if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { + return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); + } + if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() { + return Ok(false); + } + + let socketname = format!( + "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid); + + let cmd = json!({ + "command": "status", + "upid": upid.to_string(), + }); + + let status = super::send_command(socketname, cmd).await?; + + if let Some(active) = status.as_bool() { + Ok(active) + } else { + bail!("got unexpected result {:?} (expected bool)", status); + } +} + +/// Test if the task is still running (fast but inaccurate implementation) +/// +/// If the task is spanned from a different process, we simply return if +/// that process is still running. This information is good enough to detect +/// stale tasks... +fn worker_is_active_local(upid: &UPID) -> bool { if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) } else { @@ -60,11 +90,11 @@ pub fn create_task_control_socket() -> Result<(), Error> { .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; if param.keys().count() != 2 { bail!("wrong number of parameters"); } - let command = param.get("command") + let command = param["command"].as_str() .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; - // this is the only command for now - if command != "abort-task" { bail!("got unknown command '{}'", command); } + // we have only two commands for now + if !(command == "abort-task" || command == "status") { bail!("got unknown command '{}'", command); } let upid_str = param["upid"].as_str() .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; @@ -76,12 +106,24 @@ pub fn create_task_control_socket() -> Result<(), Error> { } let hash = WORKER_TASK_LIST.lock().unwrap(); - if let Some(ref worker) = hash.get(&upid.task_id) { - worker.request_abort(); - } else { - // assume task is already stopped + + match command { + "abort-task" => { + if let Some(ref worker) = hash.get(&upid.task_id) { + worker.request_abort(); + } else { + // assume task is already stopped + } + Ok(Value::Null) + } + "status" => { + let active = hash.contains_key(&upid.task_id); + Ok(active.into()) + } + _ => { + bail!("got unknown command '{}'", command); + } } - Ok(Value::Null) })?; tokio::spawn(control_future); @@ -97,7 +139,7 @@ pub fn abort_worker_async(upid: UPID) { }); } -pub fn abort_worker(upid: UPID) -> impl Future> { +pub async fn abort_worker(upid: UPID) -> Result<(), Error> { let target_pid = upid.pid; @@ -109,7 +151,7 @@ pub fn abort_worker(upid: UPID) -> impl Future> { "upid": upid.to_string(), }); - super::send_command(socketname, cmd).map_ok(|_| ()) + super::send_command(socketname, cmd).map_ok(|_| ()).await } fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { @@ -228,7 +270,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), Ok((upid_str, upid, state)) => { - let running = worker_is_active(&upid); + let running = worker_is_active_local(&upid); if running { active_list.push(TaskListInfo { upid, upid_str, state: None });