diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 73ce2bf5..797a7d0e 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -8,7 +8,6 @@ use futures::*; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -use serde_json::json; use std::io::{BufRead, BufReader}; use std::fs::File; @@ -101,41 +100,6 @@ impl std::fmt::Display for UPID { } } - -#[derive(Debug)] -pub struct WorkerTaskInfo { - upid: UPID, - progress: f64, // 0..1 - abort_requested: bool, -} - -pub fn running_worker_tasks() -> Vec { - - let mut list = vec![]; - - for (_task_id, worker) in WORKER_TASK_LIST.lock().unwrap().iter() { - let data = worker.data.lock().unwrap(); - let info = WorkerTaskInfo { - upid: worker.upid.clone(), - progress: data.progress, - abort_requested: worker.abort_requested.load(Ordering::SeqCst), - }; - list.push(info); - } - - list -} - -pub fn read_active_tasks() -> Result<(), Error> { - - let data = tools::file_get_json(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), Some(json!([])))?; - - println!("GOT {:?}", data); - - - Ok(()) -} - fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { let data = line.splitn(3, ' ').collect::>(); @@ -152,6 +116,7 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St } } +/// Returns the absolute path to the task log file pub fn upid_log_path(upid: &UPID) -> std::path::PathBuf { let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); path.push(format!("{:02X}", upid.pstart % 256)); @@ -187,7 +152,25 @@ fn upid_read_status(upid: &UPID) -> Result { Ok(status) } -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { +/// Task details including parsed UPID +/// +/// If there is no `state`, the task is still running. +#[derive(Debug)] +pub struct TaskListInfo { + /// The parsed UPID + pub upid: UPID, + /// UPID string representation + pub upid_str: String, + /// Task `(endtime, status)` if already finished + /// + /// The `status` ise iether `unknown`, `OK`, or `ERROR: ...` + pub state: Option<(i64, String)>, // endtime, status +} + +// atomically read/update the task list, update status of finished tasks +// new_upid is added to the list when specified. +// Returns a sorted list of known tasks, +fn update_active_workers(new_upid: Option<&UPID>) -> Result, Error> { let my_pid = unsafe { libc::getpid() }; let my_pid_stat = tools::procfs::read_proc_pid_stat(my_pid)?; @@ -205,13 +188,6 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { } }; - #[derive(Debug)] - struct TaskListInfo { - upid: UPID, - upid_str: String, - state: Option<(i64, String)>, // endtime, status - }; - let mut active_list = vec![]; let mut finish_list = vec![]; @@ -282,7 +258,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { } } - let mut task_list: Vec<&TaskListInfo> = task_hash.values().collect(); + let mut task_list: Vec = vec![]; + for (_, info) in task_hash { task_list.push(info); } + task_list.sort_unstable_by(|a, b| { match (&a.state, &b.state) { (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), @@ -306,9 +284,15 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { drop(lock); - Ok(()) + Ok(task_list) } +/// Returns a sorted list of known tasks +/// +/// The list is sorted by `(starttime, endtime)` in ascending order +pub fn read_task_list() -> Result, Error> { + update_active_workers(None) +} /// Launch long running worker tasks. ///