use std::fs::File; use std::io::{BufRead, BufReader}; use anyhow::{Error}; use serde_json::{json, Value}; use proxmox::api::{api, Router, RpcEnvironment, Permission}; use proxmox::api::router::SubdirMap; use proxmox::{identity, list_subdirs_api_method, sortable}; use crate::tools; use crate::api2::types::*; use crate::server::{self, UPID, TaskState, TaskListInfoIterator}; use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_MODIFY}; use crate::config::cached_user_info::CachedUserInfo; fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> { let task_auth_id = &upid.auth_id; if auth_id == task_auth_id || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) { Ok(()) } else { let user_info = CachedUserInfo::new()?; user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false) } } #[api( input: { properties: { node: { schema: NODE_SCHEMA, }, upid: { schema: UPID_SCHEMA, }, }, }, returns: { description: "Task status information.", properties: { node: { schema: NODE_SCHEMA, }, upid: { schema: UPID_SCHEMA, }, pid: { type: i64, description: "The Unix PID.", }, pstart: { type: u64, description: "The Unix process start time from `/proc/pid/stat`", }, starttime: { type: i64, description: "The task start time (Epoch)", }, "type": { type: String, description: "Worker type (arbitrary ASCII string)", }, id: { type: String, optional: true, description: "Worker ID (arbitrary ASCII string)", }, user: { type: Userid, description: "The user who started the task.", }, tokenid: { type: Tokenname, optional: true, }, status: { type: String, description: "'running' or 'stopped'", }, exitstatus: { type: String, optional: true, description: "'OK', 'Error: ', or 'unkwown'.", }, }, }, access: { description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.", permission: &Permission::Anybody, }, )] /// Get task status. async fn get_task_status( param: Value, rpcenv: &mut dyn RpcEnvironment, ) -> Result { let upid = extract_upid(¶m)?; let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; check_task_access(&auth_id, &upid)?; let mut result = json!({ "upid": param["upid"], "node": upid.node, "pid": upid.pid, "pstart": upid.pstart, "starttime": upid.starttime, "type": upid.worker_type, "id": upid.worker_id, "user": upid.auth_id.user(), }); if upid.auth_id.is_token() { result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str()); } if crate::server::worker_is_active(&upid).await? { result["status"] = Value::from("running"); } else { let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 }); result["status"] = Value::from("stopped"); result["exitstatus"] = Value::from(exitstatus.to_string()); }; Ok(result) } fn extract_upid(param: &Value) -> Result { let upid_str = tools::required_string_param(¶m, "upid")?; upid_str.parse::() } #[api( input: { properties: { node: { schema: NODE_SCHEMA, }, upid: { schema: UPID_SCHEMA, }, "test-status": { type: bool, optional: true, description: "Test task status, and set result attribute \"active\" accordingly.", }, start: { type: u64, optional: true, description: "Start at this line.", default: 0, }, limit: { type: u64, optional: true, description: "Only list this amount of lines.", default: 50, }, }, }, access: { description: "Users can access there own tasks, or need Sys.Audit on /system/tasks.", permission: &Permission::Anybody, }, )] /// Read task log. async fn read_task_log( param: Value, mut rpcenv: &mut dyn RpcEnvironment, ) -> Result { let upid = extract_upid(¶m)?; let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; check_task_access(&auth_id, &upid)?; let test_status = param["test-status"].as_bool().unwrap_or(false); let start = param["start"].as_u64().unwrap_or(0); let mut limit = param["limit"].as_u64().unwrap_or(50); let mut count: u64 = 0; let path = upid.log_path(); let file = File::open(path)?; let mut lines: Vec = vec![]; for line in BufReader::new(file).lines() { match line { Ok(line) => { count += 1; if count < start { continue }; if limit == 0 { continue }; lines.push(json!({ "n": count, "t": line })); limit -= 1; } Err(err) => { log::error!("reading task log failed: {}", err); break; } } } rpcenv["total"] = Value::from(count); if test_status { let active = crate::server::worker_is_active(&upid).await?; rpcenv["active"] = Value::from(active); } Ok(json!(lines)) } #[api( protected: true, input: { properties: { node: { schema: NODE_SCHEMA, }, upid: { schema: UPID_SCHEMA, }, }, }, access: { description: "Users can stop there own tasks, or need Sys.Modify on /system/tasks.", permission: &Permission::Anybody, }, )] /// Try to stop a task. fn stop_task( param: Value, rpcenv: &mut dyn RpcEnvironment, ) -> Result { let upid = extract_upid(¶m)?; let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; if auth_id != upid.auth_id { let user_info = CachedUserInfo::new()?; user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?; } server::abort_worker_async(upid); Ok(Value::Null) } #[api( input: { properties: { node: { schema: NODE_SCHEMA }, start: { type: u64, description: "List tasks beginning from this offset.", default: 0, optional: true, }, limit: { type: u64, description: "Only list this amount of tasks. (0 means no limit)", default: 50, optional: true, }, store: { schema: DATASTORE_SCHEMA, optional: true, }, running: { type: bool, description: "Only list running tasks.", optional: true, default: false, }, errors: { type: bool, description: "Only list erroneous tasks.", optional:true, default: false, }, userfilter: { optional: true, type: String, description: "Only list tasks from this user.", }, since: { type: i64, description: "Only list tasks since this UNIX epoch.", optional: true, }, until: { type: i64, description: "Only list tasks until this UNIX epoch.", optional: true, }, typefilter: { optional: true, type: String, description: "Only list tasks whose type contains this.", }, statusfilter: { optional: true, type: Array, description: "Only list tasks which have any one of the listed status.", items: { type: TaskStateType, }, }, }, }, returns: { description: "A list of tasks.", type: Array, items: { type: TaskListItem }, }, access: { description: "Users can only see there own tasks, unless the have Sys.Audit on /system/tasks.", permission: &Permission::Anybody, }, )] /// List tasks. pub fn list_tasks( start: u64, limit: u64, errors: bool, running: bool, userfilter: Option, since: Option, until: Option, typefilter: Option, statusfilter: Option>, param: Value, mut rpcenv: &mut dyn RpcEnvironment, ) -> Result, Error> { let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; let user_info = CachedUserInfo::new()?; let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]); let list_all = (user_privs & PRIV_SYS_AUDIT) != 0; let store = param["store"].as_str(); let list = TaskListInfoIterator::new(running)?; let limit = if limit > 0 { limit as usize } else { usize::MAX }; let result: Vec = list .skip_while(|info| { match (info, until) { (Ok(info), Some(until)) => info.upid.starttime > until, (Ok(_), None) => false, (Err(_), _) => false, } }) .take_while(|info| { match (info, since) { (Ok(info), Some(since)) => info.upid.starttime > since, (Ok(_), None) => true, (Err(_), _) => false, } }) .filter_map(|info| { let info = match info { Ok(info) => info, Err(_) => return None, }; if !list_all && check_task_access(&auth_id, &info.upid).is_err() { return None; } if let Some(needle) = &userfilter { if !info.upid.auth_id.to_string().contains(needle) { return None; } } if let Some(store) = store { // Note: useful to select all tasks spawned by proxmox-backup-client let worker_id = match &info.upid.worker_id { Some(w) => w, None => return None, // skip }; if info.upid.worker_type == "backup" || info.upid.worker_type == "restore" || info.upid.worker_type == "prune" { let prefix = format!("{}:", store); if !worker_id.starts_with(&prefix) { return None; } } else if info.upid.worker_type == "garbage_collection" { if worker_id != store { return None; } } else { return None; // skip } } if let Some(typefilter) = &typefilter { if !info.upid.worker_type.contains(typefilter) { return None; } } match (&info.state, &statusfilter) { (Some(_), _) if running => return None, (Some(crate::server::TaskState::OK { .. }), _) if errors => return None, (Some(state), Some(filters)) => { if !filters.contains(&state.tasktype()) { return None; } }, (None, Some(_)) => return None, _ => {}, } Some(info.into()) }).skip(start as usize) .take(limit) .collect(); let mut count = result.len() + start as usize; if result.len() > 0 && result.len() >= limit { // we have a 'virtual' entry as long as we have any new count += 1; } rpcenv["total"] = Value::from(count); Ok(result) } #[sortable] const UPID_API_SUBDIRS: SubdirMap = &sorted!([ ( "log", &Router::new() .get(&API_METHOD_READ_TASK_LOG) ), ( "status", &Router::new() .get(&API_METHOD_GET_TASK_STATUS) ) ]); pub const UPID_API_ROUTER: Router = Router::new() .get(&list_subdirs_api_method!(UPID_API_SUBDIRS)) .delete(&API_METHOD_STOP_TASK) .subdirs(&UPID_API_SUBDIRS); pub const ROUTER: Router = Router::new() .get(&API_METHOD_LIST_TASKS) .match_all("upid", &UPID_API_ROUTER);