diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 34d31f99..2ce71136 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -31,6 +31,9 @@ pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); +pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index"); + +const MAX_INDEX_TASKS: usize = 1000; lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); @@ -343,76 +346,47 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E let lock = lock_task_list_files(true)?; - let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { - Ok(f) => Some(BufReader::new(f)), - Err(err) => { - if err.kind() == std::io::ErrorKind::NotFound { - None - } else { - bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); + let mut finish_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?; + let mut active_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)? + .into_iter() + .filter_map(|info| { + if info.state.is_some() { + // this can happen when the active file still includes finished tasks + finish_list.push(info); + return None; } - } - }; - let mut active_list = vec![]; - let mut finish_list = vec![]; - - if let Some(lines) = reader.map(|r| r.lines()) { - - for line in lines { - let line = line?; - match parse_worker_status_line(&line) { - Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), - Ok((upid_str, upid, state)) => match state { - None if worker_is_active_local(&upid) => { - active_list.push(TaskListInfo { upid, upid_str, state: None }); - }, - None => { - println!("Detected stopped UPID {}", upid_str); - let now = proxmox::tools::time::epoch_i64(); - let status = upid_read_status(&upid) - .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); - finish_list.push(TaskListInfo { - upid, upid_str, state: Some(status) - }); - }, - Some(status) => { - finish_list.push(TaskListInfo { - upid, upid_str, state: Some(status) - }) - } - } + if !worker_is_active_local(&info.upid) { + println!("Detected stopped UPID {}", &info.upid_str); + let now = proxmox::tools::time::epoch_i64(); + let status = upid_read_status(&info.upid) + .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); + finish_list.push(TaskListInfo { + upid: info.upid, + upid_str: info.upid_str, + state: Some(status) + }); + return None; } - } - } + + Some(info) + }).collect(); if let Some(upid) = new_upid { active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); } - // assemble list without duplicates - // we include all active tasks, - // and fill up to 1000 entries with finished tasks + let active_raw = render_task_list(&active_list); - let max = 1000; + replace_file( + PROXMOX_BACKUP_ACTIVE_TASK_FN, + active_raw.as_bytes(), + CreateOptions::new() + .owner(backup_user.uid) + .group(backup_user.gid), + )?; - let mut task_hash = HashMap::new(); - - for info in active_list { - task_hash.insert(info.upid_str.clone(), info); - } - - for info in finish_list { - if task_hash.len() > max { break; } - if !task_hash.contains_key(&info.upid_str) { - task_hash.insert(info.upid_str.clone(), info); - } - } - - let mut task_list: Vec = vec![]; - for (_, info) in task_hash { task_list.push(info); } - - task_list.sort_unstable_by(|b, a| { // lastest on top + finish_list.sort_unstable_by(|a, b| { match (&a.state, &b.state) { (Some(s1), Some(s2)) => s1.cmp(&s2), (Some(_), None) => std::cmp::Ordering::Less, @@ -421,11 +395,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E } }); - let raw = render_task_list(&task_list[..]); + let start = (finish_list.len()-MAX_INDEX_TASKS).max(0); + let end = (start+MAX_INDEX_TASKS).min(finish_list.len()); + let index_raw = render_task_list(&finish_list[start..end]); replace_file( - PROXMOX_BACKUP_ACTIVE_TASK_FN, - raw.as_bytes(), + PROXMOX_BACKUP_INDEX_TASK_FN, + index_raw.as_bytes(), CreateOptions::new() .owner(backup_user.uid) .group(backup_user.gid), @@ -433,7 +409,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E drop(lock); - Ok(task_list) + finish_list.append(&mut active_list); + finish_list.reverse(); + Ok(finish_list) } /// Returns a sorted list of known tasks @@ -463,6 +441,44 @@ fn render_task_list(list: &[TaskListInfo]) -> String { raw } +// note this is not locked, caller has to make sure it is +// this will skip (and log) lines that are not valid status lines +fn read_task_file(reader: R) -> Result, Error> +{ + let reader = BufReader::new(reader); + let mut list = Vec::new(); + for line in reader.lines() { + let line = line?; + match parse_worker_status_line(&line) { + Ok((upid_str, upid, state)) => list.push(TaskListInfo { + upid_str, + upid, + state + }), + Err(err) => { + eprintln!("unable to parse worker status '{}' - {}", line, err); + continue; + } + }; + } + + Ok(list) +} + +// note this is not locked, caller has to make sure it is +fn read_task_file_from_path

(path: P) -> Result, Error> +where + P: AsRef + std::fmt::Debug, +{ + let file = match File::open(&path) { + Ok(f) => f, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), + Err(err) => bail!("unable to open task list {:?} - {}", path, err), + }; + + read_task_file(file) +} + /// Launch long running worker tasks. /// /// A worker task can either be a whole thread, or a simply tokio