src/server/upid.rs: moved code into separate file
This commit is contained in:
		| @ -18,7 +18,7 @@ fn get_task_status( | ||||
|  | ||||
|     let upid = extract_upid(¶m)?; | ||||
|  | ||||
|     let result = if upid.is_active() { | ||||
|     let result = if crate::server::worker_is_active(&upid) { | ||||
|         json!({ | ||||
|             "status": "running", | ||||
|         }) | ||||
|  | ||||
| @ -7,6 +7,9 @@ | ||||
| mod environment; | ||||
| pub use environment::*; | ||||
|  | ||||
| mod upid; | ||||
| pub use upid::*; | ||||
|  | ||||
| mod state; | ||||
| pub use state::*; | ||||
|  | ||||
|  | ||||
							
								
								
									
										118
									
								
								src/server/upid.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										118
									
								
								src/server/upid.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,118 @@ | ||||
| use failure::*; | ||||
| use lazy_static::lazy_static; | ||||
| use regex::Regex; | ||||
| use chrono::Local; | ||||
|  | ||||
| use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; | ||||
|  | ||||
| use crate::tools; | ||||
|  | ||||
| /// Unique Process/Task Identifier | ||||
| /// | ||||
| /// We use this to uniquely identify worker task. UPIDs have a short | ||||
| /// string repesentaion, which gives additional information about the | ||||
| /// type of the task. for example: | ||||
| /// ```text | ||||
| /// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{username}: | ||||
| /// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam: | ||||
| /// ``` | ||||
| /// Please note that we use tokio, so a single thread can run multiple | ||||
| /// tasks. | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct UPID { | ||||
|     /// The Unix PID | ||||
|     pub pid: libc::pid_t, | ||||
|     /// The Unix process start time from `/proc/pid/stat` | ||||
|     pub pstart: u64, | ||||
|     /// The task start time (Epoch) | ||||
|     pub starttime: i64, | ||||
|     /// The task ID (inside the process/thread) | ||||
|     pub task_id: usize, | ||||
|     /// Worker type (arbitrary ASCII string) | ||||
|     pub worker_type: String, | ||||
|     /// Worker ID (arbitrary ASCII string) | ||||
|     pub worker_id: Option<String>, | ||||
|     /// The user who started the task | ||||
|     pub username: String, | ||||
|     /// The node name. | ||||
|     pub node: String, | ||||
| } | ||||
|  | ||||
| impl UPID { | ||||
|  | ||||
|     /// Create a new UPID | ||||
|     pub fn new(worker_type: &str, worker_id: Option<String>, username: &str) -> Result<Self, Error> { | ||||
|  | ||||
|         let pid = unsafe { libc::getpid() }; | ||||
|  | ||||
|         static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; | ||||
|  | ||||
|         let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst); | ||||
|  | ||||
|         Ok(UPID { | ||||
|             pid, | ||||
|             pstart: tools::procfs::read_proc_starttime(pid)?, | ||||
|             starttime: Local::now().timestamp(), | ||||
|             task_id, | ||||
|             worker_type: worker_type.to_owned(), | ||||
|             worker_id, | ||||
|             username: username.to_owned(), | ||||
|             node: tools::nodename().to_owned(), | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Returns the absolute path to the task log file | ||||
|     pub fn log_path(&self) -> std::path::PathBuf { | ||||
|         let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR); | ||||
|         path.push(format!("{:02X}", self.pstart % 256)); | ||||
|         path.push(self.to_string()); | ||||
|         path | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl std::str::FromStr for UPID { | ||||
|     type Err = Error; | ||||
|  | ||||
|     fn from_str(s: &str) -> Result<Self, Self::Err> { | ||||
|  | ||||
|         lazy_static! { | ||||
|             static ref REGEX: Regex = Regex::new(concat!( | ||||
|                 r"^UPID:(?P<node>[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P<pid>[0-9A-Fa-f]{8}):", | ||||
|                 r"(?P<pstart>[0-9A-Fa-f]{8,9}):(?P<task_id>[0-9A-Fa-f]{8,16}):(?P<starttime>[0-9A-Fa-f]{8}):", | ||||
|                 r"(?P<wtype>[^:\s]+):(?P<wid>[^:\s]*):(?P<username>[^:\s]+):$" | ||||
|             )).unwrap(); | ||||
|         } | ||||
|  | ||||
|         if let Some(cap) = REGEX.captures(s) { | ||||
|  | ||||
|             return Ok(UPID { | ||||
|                 pid: i32::from_str_radix(&cap["pid"], 16).unwrap(), | ||||
|                 pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(), | ||||
|                 starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(), | ||||
|                 task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(), | ||||
|                 worker_type: cap["wtype"].to_string(), | ||||
|                 worker_id: if cap["wid"].is_empty() { None } else { Some(cap["wid"].to_string()) }, | ||||
|                 username: cap["username"].to_string(), | ||||
|                 node: cap["node"].to_string(), | ||||
|             }); | ||||
|         } else { | ||||
|             bail!("unable to parse UPID '{}'", s); | ||||
|         } | ||||
|  | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl std::fmt::Display for UPID { | ||||
|  | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||||
|  | ||||
|         let wid = if let Some(ref id) = self.worker_id { id } else { "" }; | ||||
|  | ||||
|         // Note: pstart can be > 32bit if uptime > 497 days, so this can result in | ||||
|         // more that 8 characters for pstart | ||||
|  | ||||
|         write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:", | ||||
|                self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.username) | ||||
|     } | ||||
| } | ||||
| @ -1,139 +1,52 @@ | ||||
| use failure::*; | ||||
| use lazy_static::lazy_static; | ||||
| use regex::Regex; | ||||
| use chrono::Local; | ||||
|  | ||||
| use tokio::sync::oneshot; | ||||
| use futures::*; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| use std::collections::HashMap; | ||||
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; | ||||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||||
| use std::io::{BufRead, BufReader}; | ||||
| use std::fs::File; | ||||
|  | ||||
| use super::UPID; | ||||
|  | ||||
| use crate::tools::{self, FileLogger}; | ||||
|  | ||||
| macro_rules! PROXMOX_BACKUP_LOG_DIR { () => ("/var/log/proxmox-backup") } | ||||
| macro_rules! PROXMOX_BACKUP_TASK_DIR { () => (concat!( PROXMOX_BACKUP_LOG_DIR!(), "/tasks")) } | ||||
| macro_rules! PROXMOX_BACKUP_TASK_LOCK_FN { () => (concat!(PROXMOX_BACKUP_TASK_DIR!(), "/.active.lock")) } | ||||
| macro_rules! PROXMOX_BACKUP_ACTIVE_TASK_FN { () => (concat!(PROXMOX_BACKUP_TASK_DIR!(), "/active")) } | ||||
| macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } | ||||
| macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | ||||
|  | ||||
| pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); | ||||
| pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | ||||
| pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | ||||
| pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | ||||
|  | ||||
| lazy_static! { | ||||
|     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | ||||
| } | ||||
|  | ||||
| static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; | ||||
|  | ||||
| /// Unique Process/Task Identifier | ||||
| /// | ||||
| /// We use this to uniquely identify worker task. UPIDs have a short | ||||
| /// string repesentaion, which gives additional information about the | ||||
| /// type of the task. for example: | ||||
| /// ```text | ||||
| /// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{username}: | ||||
| /// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam: | ||||
| /// ``` | ||||
| /// Please note that we use tokio, so a single thread can run multiple | ||||
| /// tasks. | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct UPID { | ||||
|     /// The Unix PID | ||||
|     pub pid: libc::pid_t, | ||||
|     /// The Unix process start time from `/proc/pid/stat` | ||||
|     pub pstart: u64, | ||||
|     /// The task start time (Epoch) | ||||
|     pub starttime: i64, | ||||
|     /// The task ID (inside the process/thread) | ||||
|     pub task_id: usize, | ||||
|     /// Worker type (arbitrary ASCII string) | ||||
|     pub worker_type: String, | ||||
|     /// Worker ID (arbitrary ASCII string) | ||||
|     pub worker_id: Option<String>, | ||||
|     /// The user who started the task | ||||
|     pub username: String, | ||||
|     /// The node name. | ||||
|     pub node: String, | ||||
| } | ||||
|  | ||||
| impl UPID { | ||||
|  | ||||
|     /// Returns the absolute path to the task log file | ||||
|     pub fn log_path(&self) -> std::path::PathBuf { | ||||
|         let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); | ||||
|         path.push(format!("{:02X}", self.pstart % 256)); | ||||
|         path.push(self.to_string()); | ||||
|         path | ||||
|     } | ||||
|  | ||||
| /// Test if the task is still running | ||||
|     pub fn is_active(&self) -> bool { | ||||
| pub fn worker_is_active(upid: &UPID) -> bool { | ||||
|  | ||||
|     lazy_static! { | ||||
|         static ref MY_PID: i32 = unsafe { libc::getpid() }; | ||||
|         static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; | ||||
|     } | ||||
|  | ||||
|         if (self.pid == *MY_PID) && (self.pstart == *MY_PID_PSTART) { | ||||
|             if WORKER_TASK_LIST.lock().unwrap().contains_key(&self.task_id) { | ||||
|     if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { | ||||
|         if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) { | ||||
|             true | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|     } else { | ||||
|             match tools::procfs::check_process_running_pstart(self.pid, self.pstart) { | ||||
|         match tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) { | ||||
|             Some(_) => true, | ||||
|             _ => false, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl std::str::FromStr for UPID { | ||||
|     type Err = Error; | ||||
|  | ||||
|     fn from_str(s: &str) -> Result<Self, Self::Err> { | ||||
|  | ||||
|         lazy_static! { | ||||
|             static ref REGEX: Regex = Regex::new(concat!( | ||||
|                 r"^UPID:(?P<node>[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P<pid>[0-9A-Fa-f]{8}):", | ||||
|                 r"(?P<pstart>[0-9A-Fa-f]{8,9}):(?P<task_id>[0-9A-Fa-f]{8,16}):(?P<starttime>[0-9A-Fa-f]{8}):", | ||||
|                 r"(?P<wtype>[^:\s]+):(?P<wid>[^:\s]*):(?P<username>[^:\s]+):$" | ||||
|             )).unwrap(); | ||||
|         } | ||||
|  | ||||
|         if let Some(cap) = REGEX.captures(s) { | ||||
|  | ||||
|             return Ok(UPID { | ||||
|                 pid: i32::from_str_radix(&cap["pid"], 16).unwrap(), | ||||
|                 pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(), | ||||
|                 starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(), | ||||
|                 task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(), | ||||
|                 worker_type: cap["wtype"].to_string(), | ||||
|                 worker_id: if cap["wid"].is_empty() { None } else { Some(cap["wid"].to_string()) }, | ||||
|                 username: cap["username"].to_string(), | ||||
|                 node: cap["node"].to_string(), | ||||
|             }); | ||||
|         } else { | ||||
|             bail!("unable to parse UPID '{}'", s); | ||||
|         } | ||||
|  | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl std::fmt::Display for UPID { | ||||
|  | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||||
|  | ||||
|         let wid = if let Some(ref id) = self.worker_id { id } else { "" }; | ||||
|  | ||||
|         // Note: pstart can be > 32bit if uptime > 497 days, so this can result in | ||||
|         // more that 8 characters for pstart | ||||
|  | ||||
|         write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:", | ||||
|                self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.username) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { | ||||
|  | ||||
| @ -159,8 +72,8 @@ pub fn create_task_log_dir() -> Result<(), Error> { | ||||
|         let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); | ||||
|         let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); | ||||
|  | ||||
|         tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR!(), None, uid, gid)?; | ||||
|         tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR!(), None, uid, gid)?; | ||||
|         tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?; | ||||
|         tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?; | ||||
|  | ||||
|         Ok(()) | ||||
|     }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | ||||
| @ -220,16 +133,16 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E | ||||
|     let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); | ||||
|     let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); | ||||
|  | ||||
|     let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN!(), std::time::Duration::new(10, 0))?; | ||||
|     nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN!(), uid, gid)?; | ||||
|     let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; | ||||
|     nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?; | ||||
|  | ||||
|     let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN!()) { | ||||
|     let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { | ||||
|         Ok(f) => Some(BufReader::new(f)), | ||||
|         Err(err) => { | ||||
|             if err.kind() ==  std::io::ErrorKind::NotFound { | ||||
|                  None | ||||
|             } else { | ||||
|                 bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN!(), err); | ||||
|                 bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); | ||||
|             } | ||||
|         } | ||||
|     }; | ||||
| @ -245,7 +158,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E | ||||
|                 Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | ||||
|                 Ok((upid_str, upid, state)) => { | ||||
|  | ||||
|                     let running = upid.is_active(); | ||||
|                     let running = worker_is_active(&upid); | ||||
|  | ||||
|                     if running { | ||||
|                         active_list.push(TaskListInfo { upid, upid_str, state: None }); | ||||
| @ -315,7 +228,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN!(), raw.as_bytes(), None, uid, gid)?; | ||||
|     tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?; | ||||
|  | ||||
|     drop(lock); | ||||
|  | ||||
| @ -367,22 +280,10 @@ impl WorkerTask { | ||||
|     fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> { | ||||
|         println!("register worker"); | ||||
|  | ||||
|         let pid = unsafe { libc::getpid() }; | ||||
|         let upid = UPID::new(worker_type, worker_id, username)?; | ||||
|         let task_id = upid.task_id; | ||||
|  | ||||
|         let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst); | ||||
|  | ||||
|         let upid = UPID { | ||||
|             pid, | ||||
|             pstart: tools::procfs::read_proc_starttime(pid)?, | ||||
|             starttime: Local::now().timestamp(), | ||||
|             task_id, | ||||
|             worker_type: worker_type.to_owned(), | ||||
|             worker_id, | ||||
|             username: username.to_owned(), | ||||
|             node: tools::nodename().to_owned(), | ||||
|         }; | ||||
|  | ||||
|         let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR!()); | ||||
|         let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); | ||||
|  | ||||
|         path.push(format!("{:02X}", upid.pstart % 256)); | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user