src/server/worker_task.rs: implement and use status command
This commit is contained in:
		@ -77,7 +77,7 @@ use crate::config::cached_user_info::CachedUserInfo;
 | 
			
		||||
    },
 | 
			
		||||
)]
 | 
			
		||||
/// Get task status.
 | 
			
		||||
fn get_task_status(
 | 
			
		||||
async fn get_task_status(
 | 
			
		||||
    param: Value,
 | 
			
		||||
    rpcenv: &mut dyn RpcEnvironment,
 | 
			
		||||
) -> Result<Value, Error> {
 | 
			
		||||
@ -102,7 +102,7 @@ fn get_task_status(
 | 
			
		||||
        "user": upid.username,
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    if crate::server::worker_is_active(&upid) {
 | 
			
		||||
    if crate::server::worker_is_active(&upid).await? {
 | 
			
		||||
        result["status"] = Value::from("running");
 | 
			
		||||
    } else {
 | 
			
		||||
        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(String::from("unknown"));
 | 
			
		||||
@ -154,7 +154,7 @@ fn extract_upid(param: &Value) -> Result<UPID, Error> {
 | 
			
		||||
    },
 | 
			
		||||
)]
 | 
			
		||||
/// Read task log.
 | 
			
		||||
fn read_task_log(
 | 
			
		||||
async fn read_task_log(
 | 
			
		||||
    param: Value,
 | 
			
		||||
    rpcenv: &mut dyn RpcEnvironment,
 | 
			
		||||
) -> Result<Value, Error> {
 | 
			
		||||
@ -202,7 +202,7 @@ fn read_task_log(
 | 
			
		||||
    rpcenv.set_result_attrib("total", Value::from(count));
 | 
			
		||||
 | 
			
		||||
    if test_status {
 | 
			
		||||
        let active = crate::server::worker_is_active(&upid);
 | 
			
		||||
        let active = crate::server::worker_is_active(&upid).await?;
 | 
			
		||||
        rpcenv.set_result_attrib("active", Value::from(active));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -241,9 +241,7 @@ fn stop_task(
 | 
			
		||||
        user_info.check_privs(&username, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if crate::server::worker_is_active(&upid) {
 | 
			
		||||
        server::abort_worker_async(upid);
 | 
			
		||||
    }
 | 
			
		||||
    server::abort_worker_async(upid);
 | 
			
		||||
 | 
			
		||||
    Ok(Value::Null)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -41,8 +41,38 @@ lazy_static! {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Test if the task is still running
 | 
			
		||||
pub fn worker_is_active(upid: &UPID) -> bool {
 | 
			
		||||
pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
 | 
			
		||||
    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
 | 
			
		||||
        return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
 | 
			
		||||
        return Ok(false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let socketname = format!(
 | 
			
		||||
        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid);
 | 
			
		||||
 | 
			
		||||
    let cmd = json!({
 | 
			
		||||
        "command": "status",
 | 
			
		||||
        "upid": upid.to_string(),
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    let status = super::send_command(socketname, cmd).await?;
 | 
			
		||||
 | 
			
		||||
    if let Some(active) = status.as_bool() {
 | 
			
		||||
        Ok(active)
 | 
			
		||||
    } else {
 | 
			
		||||
        bail!("got unexpected result {:?} (expected bool)", status);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Test if the task is still running (fast but inaccurate implementation)
 | 
			
		||||
///
 | 
			
		||||
/// If the task is spanned from a different process, we simply return if
 | 
			
		||||
/// that process is still running. This information is good enough to detect
 | 
			
		||||
/// stale tasks...
 | 
			
		||||
fn worker_is_active_local(upid: &UPID) -> bool {
 | 
			
		||||
    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
 | 
			
		||||
        WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
 | 
			
		||||
    } else {
 | 
			
		||||
@ -60,11 +90,11 @@ pub fn create_task_control_socket() -> Result<(), Error> {
 | 
			
		||||
            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
 | 
			
		||||
        if param.keys().count() != 2 { bail!("wrong number of parameters"); }
 | 
			
		||||
 | 
			
		||||
        let command = param.get("command")
 | 
			
		||||
        let command = param["command"].as_str()
 | 
			
		||||
            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
 | 
			
		||||
 | 
			
		||||
        // this is the only command for now
 | 
			
		||||
        if command != "abort-task" { bail!("got unknown command '{}'", command); }
 | 
			
		||||
        // we have only two commands for now
 | 
			
		||||
        if !(command == "abort-task" || command == "status") { bail!("got unknown command '{}'", command); }
 | 
			
		||||
 | 
			
		||||
        let upid_str = param["upid"].as_str()
 | 
			
		||||
            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
 | 
			
		||||
@ -76,12 +106,24 @@ pub fn create_task_control_socket() -> Result<(), Error> {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let hash = WORKER_TASK_LIST.lock().unwrap();
 | 
			
		||||
        if let Some(ref worker) = hash.get(&upid.task_id) {
 | 
			
		||||
            worker.request_abort();
 | 
			
		||||
        } else {
 | 
			
		||||
            // assume task is already stopped
 | 
			
		||||
 | 
			
		||||
        match command {
 | 
			
		||||
            "abort-task" => {
 | 
			
		||||
                if let Some(ref worker) = hash.get(&upid.task_id) {
 | 
			
		||||
                    worker.request_abort();
 | 
			
		||||
                } else {
 | 
			
		||||
                    // assume task is already stopped
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Value::Null)
 | 
			
		||||
            }
 | 
			
		||||
            "status" => {
 | 
			
		||||
                let active = hash.contains_key(&upid.task_id);
 | 
			
		||||
                Ok(active.into())
 | 
			
		||||
            }
 | 
			
		||||
            _ => {
 | 
			
		||||
                bail!("got unknown command '{}'", command);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Ok(Value::Null)
 | 
			
		||||
    })?;
 | 
			
		||||
 | 
			
		||||
    tokio::spawn(control_future);
 | 
			
		||||
@ -97,7 +139,7 @@ pub fn abort_worker_async(upid: UPID) {
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> {
 | 
			
		||||
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 | 
			
		||||
 | 
			
		||||
    let target_pid = upid.pid;
 | 
			
		||||
 | 
			
		||||
@ -109,7 +151,7 @@ pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> {
 | 
			
		||||
        "upid": upid.to_string(),
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    super::send_command(socketname, cmd).map_ok(|_| ())
 | 
			
		||||
    super::send_command(socketname, cmd).map_ok(|_| ()).await
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
 | 
			
		||||
@ -228,7 +270,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
 | 
			
		||||
                Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err),
 | 
			
		||||
                Ok((upid_str, upid, state)) => {
 | 
			
		||||
 | 
			
		||||
                    let running = worker_is_active(&upid);
 | 
			
		||||
                    let running = worker_is_active_local(&upid);
 | 
			
		||||
 | 
			
		||||
                    if running {
 | 
			
		||||
                        active_list.push(TaskListInfo { upid, upid_str, state: None });
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user