diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 56d63dbb..04180274 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -8,7 +8,9 @@ use futures::*; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -use serde_json::{json, Value}; +use serde_json::json; +use std::io::{BufRead, BufReader}; +use std::fs::File; use crate::tools::{self, FileLogger}; @@ -104,6 +106,190 @@ pub fn running_worker_tasks() -> Vec { list } +pub fn read_active_tasks() -> Result<(), Error> { + + let data = tools::file_get_json(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), Some(json!([])))?; + + println!("GOT {:?}", data); + + + Ok(()) +} + +fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { + + let data = line.splitn(3, ' ').collect::>(); + + let len = data.len(); + + match len { + 1 => Ok((data[0].to_owned(), data[0].parse::()?, None)), + 3 => { + let endtime = i64::from_str_radix(data[1], 16)?; + Ok((data[0].to_owned(), data[0].parse::()?, Some((endtime, data[2].to_owned())))) + } + _ => bail!("wrong number of components"), + } +} + +pub fn upid_log_path(upid: &UPID) -> std::path::PathBuf { + let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); + path.push(format!("{:02X}", upid.pstart % 256)); + path.push(upid.to_string()); + path +} + +fn upid_read_status(upid: &UPID) -> Result { + let mut status = String::from("unknown"); + + let path = upid_log_path(upid); + + let file = File::open(path)?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + + let mut iter = line.splitn(2, ": TASK "); + if iter.next() == None { continue; } + match iter.next() { + None => continue, + Some(rest) => { + if rest == "OK" { + status = String::from(rest); + } else if rest.starts_with("ERROR: ") { + status = String::from(rest); + } + } + } + } + + Ok(status) +} + +fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { + + let my_pid = unsafe { libc::getpid() }; + let my_pid_stat = tools::procfs::read_proc_pid_stat(my_pid)?; + + let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN!(), std::time::Duration::new(10, 0))?; + + let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN!()) { + Ok(f) => Some(BufReader::new(f)), + Err(err) => { + if err.kind() == std::io::ErrorKind::NotFound { + None + } else { + bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN!(), err); + } + } + }; + + #[derive(Debug)] + struct TaskListInfo { + upid: UPID, + upid_str: String, + state: Option<(i64, String)>, // endtime, status + }; + + let mut active_list = vec![]; + let mut finish_list = vec![]; + + if let Some(lines) = reader.map(|r| r.lines()) { + + for line in lines { + let line = line?; + match parse_worker_status_line(&line) { + Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), + Ok((upid_str, upid, state)) => { + + let running = if (upid.pid == my_pid) && (upid.pstart == my_pid_stat.starttime) { + if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) { + true + } else { + false + } + } else { + match tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) { + Some(_) => true, + _ => false, + } + }; + + if running { + active_list.push(TaskListInfo { upid, upid_str, state: None }); + } else { + match state { + None => { + println!("Detected stoped UPID {}", upid_str); + let status = upid_read_status(&upid).unwrap_or(String::from("unknown")); + finish_list.push(TaskListInfo { + upid, upid_str, state: Some((Local::now().timestamp(), status)) + }); + } + Some((endtime, status)) => { + finish_list.push(TaskListInfo { + upid, upid_str, state: Some((endtime, status)) + }) + } + } + } + } + } + } + } + + if let Some(upid) = new_upid { + active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); + } + + // assemble list without duplicates + // we include all active tasks, + // and fill up to 1000 entries with finished tasks + + let max = 1000; + + let mut task_hash = HashMap::new(); + + for info in active_list { + task_hash.insert(info.upid_str.clone(), info); + } + + for info in finish_list { + if task_hash.len() > max { break; } + if !task_hash.contains_key(&info.upid_str) { + task_hash.insert(info.upid_str.clone(), info); + } + } + + let mut task_list: Vec<&TaskListInfo> = task_hash.values().collect(); + task_list.sort_unstable_by(|a, b| { + match (&a.state, &b.state) { + (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + _ => a.upid.starttime.cmp(&b.upid.starttime), + } + }); + + let mut raw = String::new(); + for info in &task_list { + if let Some((endtime, status)) = &info.state { + raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); + } else { + raw.push_str(&info.upid_str); + raw.push('\n'); + } + } + + tools::file_set_contents(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), raw.as_bytes(), None)?; + + drop(lock); + + Ok(()) +} + + #[derive(Debug)] pub struct WorkerTask { upid: UPID, @@ -162,6 +348,8 @@ impl WorkerTask { let logger = FileLogger::new(path, to_stdout)?; + update_active_workers(Some(&upid))?; + let worker = Arc::new(Self { upid: upid, abort_requested: AtomicBool::new(false), @@ -178,13 +366,15 @@ impl WorkerTask { pub fn spawn(worker_type: &str, worker_id: Option, username: &str, to_stdout: bool, f: F) -> Result<(), Error> where F: Send + 'static + FnOnce(Arc) -> T, - T: Send + 'static + Future, + T: Send + 'static + Future, { let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; let task_id = worker.upid.task_id; - tokio::spawn(f(worker).then(move |_| { + tokio::spawn(f(worker.clone()).then(move |result| { WORKER_TASK_LIST.lock().unwrap().remove(&task_id); + worker.log_result(result); + let _ = update_active_workers(None); Ok(()) })); @@ -192,7 +382,7 @@ impl WorkerTask { } pub fn new_thread(worker_type: &str, worker_id: Option, username: &str, to_stdout: bool, f: F) -> Result<(), Error> - where F: Send + 'static + FnOnce(Arc) -> () + where F: Send + 'static + FnOnce(Arc) -> Result<(), Error> { println!("register worker thread"); @@ -202,14 +392,10 @@ impl WorkerTask { let task_id = worker.upid.task_id; let _child = std::thread::spawn(move || { - - - println!("start worker thread"); - f(worker); - println!("end worker thread"); - + let result = f(worker.clone()); WORKER_TASK_LIST.lock().unwrap().remove(&task_id); - + worker.log_result(result); + let _ = update_active_workers(None); p.send(()).unwrap(); }); @@ -218,6 +404,14 @@ impl WorkerTask { Ok(()) } + fn log_result(&self, result: Result<(), Error>) { + if let Err(err) = result { + self.log(&format!("TASK ERROR: {}", err)); + } else { + self.log("TASK OK"); + } + } + pub fn log>(&self, msg: S) { let mut data = self.data.lock().unwrap(); data.logger.log(msg);