src/server/worker_task.rs: implement read_task_list()

This commit is contained in:
Dietmar Maurer 2019-04-06 10:17:11 +02:00
parent 4a36a2c999
commit 93aebb38bc

View File

@ -8,7 +8,6 @@ use futures::*;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use serde_json::json;
use std::io::{BufRead, BufReader};
use std::fs::File;
@ -101,41 +100,6 @@ impl std::fmt::Display for UPID {
}
}
#[derive(Debug)]
pub struct WorkerTaskInfo {
upid: UPID,
progress: f64, // 0..1
abort_requested: bool,
}
pub fn running_worker_tasks() -> Vec<WorkerTaskInfo> {
let mut list = vec![];
for (_task_id, worker) in WORKER_TASK_LIST.lock().unwrap().iter() {
let data = worker.data.lock().unwrap();
let info = WorkerTaskInfo {
upid: worker.upid.clone(),
progress: data.progress,
abort_requested: worker.abort_requested.load(Ordering::SeqCst),
};
list.push(info);
}
list
}
pub fn read_active_tasks() -> Result<(), Error> {
let data = tools::file_get_json(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), Some(json!([])))?;
println!("GOT {:?}", data);
Ok(())
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
let data = line.splitn(3, ' ').collect::<Vec<&str>>();
@ -152,6 +116,7 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
}
}
/// Returns the absolute path to the task log file
pub fn upid_log_path(upid: &UPID) -> std::path::PathBuf {
let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!());
path.push(format!("{:02X}", upid.pstart % 256));
@ -187,7 +152,25 @@ fn upid_read_status(upid: &UPID) -> Result<String, Error> {
Ok(status)
}
fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
/// Task details including parsed UPID
///
/// If there is no `state`, the task is still running.
#[derive(Debug)]
pub struct TaskListInfo {
/// The parsed UPID
pub upid: UPID,
/// UPID string representation
pub upid_str: String,
/// Task `(endtime, status)` if already finished
///
/// The `status` ise iether `unknown`, `OK`, or `ERROR: ...`
pub state: Option<(i64, String)>, // endtime, status
}
// atomically read/update the task list, update status of finished tasks
// new_upid is added to the list when specified.
// Returns a sorted list of known tasks,
fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> {
let my_pid = unsafe { libc::getpid() };
let my_pid_stat = tools::procfs::read_proc_pid_stat(my_pid)?;
@ -205,13 +188,6 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
}
};
#[derive(Debug)]
struct TaskListInfo {
upid: UPID,
upid_str: String,
state: Option<(i64, String)>, // endtime, status
};
let mut active_list = vec![];
let mut finish_list = vec![];
@ -282,7 +258,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
}
}
let mut task_list: Vec<&TaskListInfo> = task_hash.values().collect();
let mut task_list: Vec<TaskListInfo> = vec![];
for (_, info) in task_hash { task_list.push(info); }
task_list.sort_unstable_by(|a, b| {
match (&a.state, &b.state) {
(Some(s1), Some(s2)) => s1.0.cmp(&s2.0),
@ -306,9 +284,15 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
drop(lock);
Ok(())
Ok(task_list)
}
/// Returns a sorted list of known tasks
///
/// The list is sorted by `(starttime, endtime)` in ascending order
pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> {
update_active_workers(None)
}
/// Launch long running worker tasks.
///