diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs index 4ebacd4e..1016b7ca 100644 --- a/src/api2/node/tasks.rs +++ b/src/api2/node/tasks.rs @@ -18,7 +18,7 @@ fn get_task_status( let upid = extract_upid(¶m)?; - let result = if upid.is_active() { + let result = if crate::server::worker_is_active(&upid) { json!({ "status": "running", }) diff --git a/src/server.rs b/src/server.rs index 19e3b2d7..67325c5e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,9 @@ mod environment; pub use environment::*; +mod upid; +pub use upid::*; + mod state; pub use state::*; diff --git a/src/server/upid.rs b/src/server/upid.rs new file mode 100644 index 00000000..5ecc281d --- /dev/null +++ b/src/server/upid.rs @@ -0,0 +1,118 @@ +use failure::*; +use lazy_static::lazy_static; +use regex::Regex; +use chrono::Local; + +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; + +use crate::tools; + +/// Unique Process/Task Identifier +/// +/// We use this to uniquely identify worker task. UPIDs have a short +/// string repesentaion, which gives additional information about the +/// type of the task. for example: +/// ```text +/// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{username}: +/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam: +/// ``` +/// Please note that we use tokio, so a single thread can run multiple +/// tasks. +#[derive(Debug, Clone)] +pub struct UPID { + /// The Unix PID + pub pid: libc::pid_t, + /// The Unix process start time from `/proc/pid/stat` + pub pstart: u64, + /// The task start time (Epoch) + pub starttime: i64, + /// The task ID (inside the process/thread) + pub task_id: usize, + /// Worker type (arbitrary ASCII string) + pub worker_type: String, + /// Worker ID (arbitrary ASCII string) + pub worker_id: Option, + /// The user who started the task + pub username: String, + /// The node name. + pub node: String, +} + +impl UPID { + + /// Create a new UPID + pub fn new(worker_type: &str, worker_id: Option, username: &str) -> Result { + + let pid = unsafe { libc::getpid() }; + + static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + + let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst); + + Ok(UPID { + pid, + pstart: tools::procfs::read_proc_starttime(pid)?, + starttime: Local::now().timestamp(), + task_id, + worker_type: worker_type.to_owned(), + worker_id, + username: username.to_owned(), + node: tools::nodename().to_owned(), + }) + } + + /// Returns the absolute path to the task log file + pub fn log_path(&self) -> std::path::PathBuf { + let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR); + path.push(format!("{:02X}", self.pstart % 256)); + path.push(self.to_string()); + path + } +} + + +impl std::str::FromStr for UPID { + type Err = Error; + + fn from_str(s: &str) -> Result { + + lazy_static! { + static ref REGEX: Regex = Regex::new(concat!( + r"^UPID:(?P[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P[0-9A-Fa-f]{8}):", + r"(?P[0-9A-Fa-f]{8,9}):(?P[0-9A-Fa-f]{8,16}):(?P[0-9A-Fa-f]{8}):", + r"(?P[^:\s]+):(?P[^:\s]*):(?P[^:\s]+):$" + )).unwrap(); + } + + if let Some(cap) = REGEX.captures(s) { + + return Ok(UPID { + pid: i32::from_str_radix(&cap["pid"], 16).unwrap(), + pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(), + starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(), + task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(), + worker_type: cap["wtype"].to_string(), + worker_id: if cap["wid"].is_empty() { None } else { Some(cap["wid"].to_string()) }, + username: cap["username"].to_string(), + node: cap["node"].to_string(), + }); + } else { + bail!("unable to parse UPID '{}'", s); + } + + } +} + +impl std::fmt::Display for UPID { + + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + + let wid = if let Some(ref id) = self.worker_id { id } else { "" }; + + // Note: pstart can be > 32bit if uptime > 497 days, so this can result in + // more that 8 characters for pstart + + write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:", + self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.username) + } +} diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 49e5a140..70cf95f3 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -1,137 +1,50 @@ use failure::*; use lazy_static::lazy_static; -use regex::Regex; use chrono::Local; use tokio::sync::oneshot; use futures::*; use std::sync::{Arc, Mutex}; use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::io::{BufRead, BufReader}; use std::fs::File; +use super::UPID; + use crate::tools::{self, FileLogger}; -macro_rules! PROXMOX_BACKUP_LOG_DIR { () => ("/var/log/proxmox-backup") } -macro_rules! PROXMOX_BACKUP_TASK_DIR { () => (concat!( PROXMOX_BACKUP_LOG_DIR!(), "/tasks")) } -macro_rules! PROXMOX_BACKUP_TASK_LOCK_FN { () => (concat!(PROXMOX_BACKUP_TASK_DIR!(), "/.active.lock")) } -macro_rules! PROXMOX_BACKUP_ACTIVE_TASK_FN { () => (concat!(PROXMOX_BACKUP_TASK_DIR!(), "/active")) } +macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } +macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } + +pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); +pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); +pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); +pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); } -static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; +/// Test if the task is still running +pub fn worker_is_active(upid: &UPID) -> bool { -/// Unique Process/Task Identifier -/// -/// We use this to uniquely identify worker task. UPIDs have a short -/// string repesentaion, which gives additional information about the -/// type of the task. for example: -/// ```text -/// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{username}: -/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam: -/// ``` -/// Please note that we use tokio, so a single thread can run multiple -/// tasks. -#[derive(Debug, Clone)] -pub struct UPID { - /// The Unix PID - pub pid: libc::pid_t, - /// The Unix process start time from `/proc/pid/stat` - pub pstart: u64, - /// The task start time (Epoch) - pub starttime: i64, - /// The task ID (inside the process/thread) - pub task_id: usize, - /// Worker type (arbitrary ASCII string) - pub worker_type: String, - /// Worker ID (arbitrary ASCII string) - pub worker_id: Option, - /// The user who started the task - pub username: String, - /// The node name. - pub node: String, -} - -impl UPID { - - /// Returns the absolute path to the task log file - pub fn log_path(&self) -> std::path::PathBuf { - let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); - path.push(format!("{:02X}", self.pstart % 256)); - path.push(self.to_string()); - path + lazy_static! { + static ref MY_PID: i32 = unsafe { libc::getpid() }; + static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; } - /// Test if the task is still running - pub fn is_active(&self) -> bool { - - lazy_static! { - static ref MY_PID: i32 = unsafe { libc::getpid() }; - static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; - } - - if (self.pid == *MY_PID) && (self.pstart == *MY_PID_PSTART) { - if WORKER_TASK_LIST.lock().unwrap().contains_key(&self.task_id) { - true - } else { - false - } + if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { + if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) { + true } else { - match tools::procfs::check_process_running_pstart(self.pid, self.pstart) { - Some(_) => true, - _ => false, - } + false } - } -} - - -impl std::str::FromStr for UPID { - type Err = Error; - - fn from_str(s: &str) -> Result { - - lazy_static! { - static ref REGEX: Regex = Regex::new(concat!( - r"^UPID:(?P[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P[0-9A-Fa-f]{8}):", - r"(?P[0-9A-Fa-f]{8,9}):(?P[0-9A-Fa-f]{8,16}):(?P[0-9A-Fa-f]{8}):", - r"(?P[^:\s]+):(?P[^:\s]*):(?P[^:\s]+):$" - )).unwrap(); + } else { + match tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) { + Some(_) => true, + _ => false, } - - if let Some(cap) = REGEX.captures(s) { - - return Ok(UPID { - pid: i32::from_str_radix(&cap["pid"], 16).unwrap(), - pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(), - starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(), - task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(), - worker_type: cap["wtype"].to_string(), - worker_id: if cap["wid"].is_empty() { None } else { Some(cap["wid"].to_string()) }, - username: cap["username"].to_string(), - node: cap["node"].to_string(), - }); - } else { - bail!("unable to parse UPID '{}'", s); - } - - } -} - -impl std::fmt::Display for UPID { - - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - - let wid = if let Some(ref id) = self.worker_id { id } else { "" }; - - // Note: pstart can be > 32bit if uptime > 497 days, so this can result in - // more that 8 characters for pstart - - write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:", - self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.username) } } @@ -159,8 +72,8 @@ pub fn create_task_log_dir() -> Result<(), Error> { let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); - tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR!(), None, uid, gid)?; - tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR!(), None, uid, gid)?; + tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?; + tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?; Ok(()) }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; @@ -220,16 +133,16 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); - let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN!(), std::time::Duration::new(10, 0))?; - nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN!(), uid, gid)?; + let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; + nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?; - let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN!()) { + let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { Ok(f) => Some(BufReader::new(f)), Err(err) => { if err.kind() == std::io::ErrorKind::NotFound { None } else { - bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN!(), err); + bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); } } }; @@ -245,7 +158,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), Ok((upid_str, upid, state)) => { - let running = upid.is_active(); + let running = worker_is_active(&upid); if running { active_list.push(TaskListInfo { upid, upid_str, state: None }); @@ -315,7 +228,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E } } - tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), raw.as_bytes(), None, uid, gid)?; + tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?; drop(lock); @@ -367,22 +280,10 @@ impl WorkerTask { fn new(worker_type: &str, worker_id: Option, username: &str, to_stdout: bool) -> Result, Error> { println!("register worker"); - let pid = unsafe { libc::getpid() }; + let upid = UPID::new(worker_type, worker_id, username)?; + let task_id = upid.task_id; - let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst); - - let upid = UPID { - pid, - pstart: tools::procfs::read_proc_starttime(pid)?, - starttime: Local::now().timestamp(), - task_id, - worker_type: worker_type.to_owned(), - worker_id, - username: username.to_owned(), - node: tools::nodename().to_owned(), - }; - - let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); + let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); path.push(format!("{:02X}", upid.pstart % 256));