worker task: allow to configure path and owner/group
And application now needs to call init_worker_tasks() before using worker tasks. Notable changes: - need to call init_worker_tasks() before using worker tasks. - create_task_log_dirs() ís called inside init_worker_tasks() - removed UpidExt trait - use atomic_open_or_create_file() - remove pbs_config and pbs_buildcfg dependency
This commit is contained in:
parent
049a22a3a3
commit
0a33fba49c
@ -16,7 +16,7 @@ use pbs_api_types::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::api2::pull::check_pull_privs;
|
use crate::api2::pull::check_pull_privs;
|
||||||
use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
|
use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
|
||||||
use pbs_config::CachedUserInfo;
|
use pbs_config::CachedUserInfo;
|
||||||
|
|
||||||
// matches respective job execution privileges
|
// matches respective job execution privileges
|
||||||
@ -220,7 +220,7 @@ async fn get_task_status(
|
|||||||
if crate::server::worker_is_active(&upid).await? {
|
if crate::server::worker_is_active(&upid).await? {
|
||||||
result["status"] = Value::from("running");
|
result["status"] = Value::from("running");
|
||||||
} else {
|
} else {
|
||||||
let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
|
let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
|
||||||
result["status"] = Value::from("stopped");
|
result["status"] = Value::from("stopped");
|
||||||
result["exitstatus"] = Value::from(exitstatus.to_string());
|
result["exitstatus"] = Value::from(exitstatus.to_string());
|
||||||
};
|
};
|
||||||
@ -287,7 +287,7 @@ async fn read_task_log(
|
|||||||
|
|
||||||
let mut count: u64 = 0;
|
let mut count: u64 = 0;
|
||||||
|
|
||||||
let path = upid.log_path();
|
let path = upid_log_path(&upid)?;
|
||||||
|
|
||||||
let file = File::open(path)?;
|
let file = File::open(path)?;
|
||||||
|
|
||||||
|
@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
|
|||||||
bail!("unable to inititialize syslog - {}", err);
|
bail!("unable to inititialize syslog - {}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
server::create_task_log_dirs()?;
|
|
||||||
|
|
||||||
config::create_configdir()?;
|
config::create_configdir()?;
|
||||||
|
|
||||||
config::update_self_signed_cert(false)?;
|
config::update_self_signed_cert(false)?;
|
||||||
@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
|
|||||||
|
|
||||||
config.enable_auth_log(
|
config.enable_auth_log(
|
||||||
pbs_buildcfg::API_AUTH_LOG_FN,
|
pbs_buildcfg::API_AUTH_LOG_FN,
|
||||||
Some(dir_opts),
|
Some(dir_opts.clone()),
|
||||||
Some(file_opts),
|
Some(file_opts.clone()),
|
||||||
&mut commando_sock,
|
&mut commando_sock,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|
||||||
let rest_server = RestServer::new(config);
|
let rest_server = RestServer::new(config);
|
||||||
|
proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
|
||||||
|
|
||||||
// http server future:
|
// http server future:
|
||||||
let server = daemon::create_daemon(
|
let server = daemon::create_daemon(
|
||||||
|
@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
|
|||||||
|
|
||||||
config.enable_auth_log(
|
config.enable_auth_log(
|
||||||
pbs_buildcfg::API_AUTH_LOG_FN,
|
pbs_buildcfg::API_AUTH_LOG_FN,
|
||||||
Some(dir_opts),
|
Some(dir_opts.clone()),
|
||||||
Some(file_opts),
|
Some(file_opts.clone()),
|
||||||
&mut commando_sock,
|
&mut commando_sock,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let rest_server = RestServer::new(config);
|
let rest_server = RestServer::new(config);
|
||||||
|
proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
|
||||||
|
|
||||||
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
|
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
|
||||||
|
|
||||||
|
@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
|
|||||||
ctrl_sock_from_pid(*PID)
|
ctrl_sock_from_pid(*PID)
|
||||||
}
|
}
|
||||||
|
|
||||||
mod upid;
|
|
||||||
pub use upid::*;
|
|
||||||
|
|
||||||
mod worker_task;
|
mod worker_task;
|
||||||
pub use worker_task::*;
|
pub use worker_task::*;
|
||||||
|
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
pub trait UPIDExt: private::Sealed {
|
|
||||||
/// Returns the absolute path to the task log file
|
|
||||||
fn log_path(&self) -> std::path::PathBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
mod private {
|
|
||||||
pub trait Sealed {}
|
|
||||||
impl Sealed for pbs_api_types::UPID {}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UPIDExt for pbs_api_types::UPID {
|
|
||||||
fn log_path(&self) -> std::path::PathBuf {
|
|
||||||
let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
|
|
||||||
path.push(format!("{:02X}", self.pstart % 256));
|
|
||||||
path.push(self.to_string());
|
|
||||||
path
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,5 +1,6 @@
|
|||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::io::{Read, Write, BufRead, BufReader};
|
use std::io::{Read, Write, BufRead, BufReader};
|
||||||
use std::panic::UnwindSafe;
|
use std::panic::UnwindSafe;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -11,27 +12,267 @@ use lazy_static::lazy_static;
|
|||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
use nix::fcntl::OFlag;
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
|
||||||
use proxmox::sys::linux::procfs;
|
use proxmox::sys::linux::procfs;
|
||||||
use proxmox::try_block;
|
use proxmox::try_block;
|
||||||
use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
|
use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
|
||||||
|
|
||||||
use pbs_buildcfg;
|
|
||||||
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
|
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
|
||||||
use pbs_api_types::UPID;
|
use pbs_api_types::UPID;
|
||||||
use pbs_config::{open_backup_lockfile, BackupLockGuard};
|
|
||||||
use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
|
use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
|
||||||
|
|
||||||
use super::UPIDExt;
|
struct TaskListLockGuard(File);
|
||||||
|
|
||||||
macro_rules! taskdir {
|
struct WorkerTaskSetup {
|
||||||
($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
|
file_opts: CreateOptions,
|
||||||
|
taskdir: PathBuf,
|
||||||
|
task_lock_fn: PathBuf,
|
||||||
|
active_tasks_fn: PathBuf,
|
||||||
|
task_index_fn: PathBuf,
|
||||||
|
task_archive_fn: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
|
||||||
|
|
||||||
|
fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
|
||||||
|
WORKER_TASK_SETUP.get()
|
||||||
|
.ok_or_else(|| format_err!("WorkerTask library is not initialized"))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerTaskSetup {
|
||||||
|
|
||||||
|
fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
|
||||||
|
|
||||||
|
let mut taskdir = basedir.clone();
|
||||||
|
taskdir.push("tasks");
|
||||||
|
|
||||||
|
let mut task_lock_fn = taskdir.clone();
|
||||||
|
task_lock_fn.push(".active.lock");
|
||||||
|
|
||||||
|
let mut active_tasks_fn = taskdir.clone();
|
||||||
|
active_tasks_fn.push("active");
|
||||||
|
|
||||||
|
let mut task_index_fn = taskdir.clone();
|
||||||
|
task_index_fn.push("index");
|
||||||
|
|
||||||
|
let mut task_archive_fn = taskdir.clone();
|
||||||
|
task_archive_fn.push("archive");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
file_opts,
|
||||||
|
taskdir,
|
||||||
|
task_lock_fn,
|
||||||
|
active_tasks_fn,
|
||||||
|
task_index_fn,
|
||||||
|
task_archive_fn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
|
||||||
|
let options = self.file_opts.clone()
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
|
||||||
|
|
||||||
|
let timeout = std::time::Duration::new(10, 0);
|
||||||
|
|
||||||
|
let file = proxmox::tools::fs::open_file_locked(
|
||||||
|
&self.task_lock_fn,
|
||||||
|
timeout,
|
||||||
|
exclusive,
|
||||||
|
options,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(TaskListLockGuard(file))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
|
||||||
|
let mut path = self.taskdir.clone();
|
||||||
|
path.push(format!("{:02X}", upid.pstart % 256));
|
||||||
|
path.push(upid.to_string());
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
// atomically read/update the task list, update status of finished tasks
|
||||||
|
// new_upid is added to the list when specified.
|
||||||
|
fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
|
||||||
|
|
||||||
|
let lock = self.lock_task_list_files(true)?;
|
||||||
|
|
||||||
|
// TODO remove with 1.x
|
||||||
|
let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
|
||||||
|
let had_index_file = !finish_list.is_empty();
|
||||||
|
|
||||||
|
// We use filter_map because one negative case wants to *move* the data into `finish_list`,
|
||||||
|
// clippy doesn't quite catch this!
|
||||||
|
#[allow(clippy::unnecessary_filter_map)]
|
||||||
|
let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|info| {
|
||||||
|
if info.state.is_some() {
|
||||||
|
// this can happen when the active file still includes finished tasks
|
||||||
|
finish_list.push(info);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !worker_is_active_local(&info.upid) {
|
||||||
|
// println!("Detected stopped task '{}'", &info.upid_str);
|
||||||
|
let now = proxmox::tools::time::epoch_i64();
|
||||||
|
let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
|
||||||
|
finish_list.push(TaskListInfo {
|
||||||
|
upid: info.upid,
|
||||||
|
upid_str: info.upid_str,
|
||||||
|
state: Some(status)
|
||||||
|
});
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(info)
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
if let Some(upid) = new_upid {
|
||||||
|
active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
|
||||||
|
}
|
||||||
|
|
||||||
|
let active_raw = render_task_list(&active_list);
|
||||||
|
|
||||||
|
let options = self.file_opts.clone()
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
|
||||||
|
|
||||||
|
replace_file(
|
||||||
|
&self.active_tasks_fn,
|
||||||
|
active_raw.as_bytes(),
|
||||||
|
options,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
finish_list.sort_unstable_by(|a, b| {
|
||||||
|
match (&a.state, &b.state) {
|
||||||
|
(Some(s1), Some(s2)) => s1.cmp(&s2),
|
||||||
|
(Some(_), None) => std::cmp::Ordering::Less,
|
||||||
|
(None, Some(_)) => std::cmp::Ordering::Greater,
|
||||||
|
_ => a.upid.starttime.cmp(&b.upid.starttime),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if !finish_list.is_empty() {
|
||||||
|
let options = self.file_opts.clone()
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
|
||||||
|
|
||||||
|
let mut writer = atomic_open_or_create_file(
|
||||||
|
&self.task_archive_fn,
|
||||||
|
OFlag::O_APPEND | OFlag::O_RDWR,
|
||||||
|
&[],
|
||||||
|
options,
|
||||||
|
)?;
|
||||||
|
for info in &finish_list {
|
||||||
|
writer.write_all(render_task_line(&info).as_bytes())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Remove with 1.x
|
||||||
|
// for compatibility, if we had an INDEX file, we do not need it anymore
|
||||||
|
if had_index_file {
|
||||||
|
let _ = nix::unistd::unlink(&self.task_index_fn);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(lock);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create task log directory with correct permissions
|
||||||
|
fn create_task_log_dirs(&self) -> Result<(), Error> {
|
||||||
|
|
||||||
|
try_block!({
|
||||||
|
let dir_opts = self.file_opts.clone()
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
|
||||||
|
|
||||||
|
create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
|
||||||
|
// fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
|
||||||
|
Ok(())
|
||||||
|
}).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the WorkerTask library
|
||||||
|
pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
|
||||||
|
let setup = WorkerTaskSetup::new(basedir, file_opts);
|
||||||
|
setup.create_task_log_dirs()?;
|
||||||
|
WORKER_TASK_SETUP.set(setup)
|
||||||
|
.map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
|
||||||
|
/// rotates it if it is
|
||||||
|
pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
|
||||||
|
|
||||||
|
let setup = worker_task_setup()?;
|
||||||
|
|
||||||
|
let _lock = setup.lock_task_list_files(true)?;
|
||||||
|
|
||||||
|
let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
|
||||||
|
.ok_or_else(|| format_err!("could not get archive file names"))?;
|
||||||
|
|
||||||
|
logrotate.rotate(size_threshold, None, max_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Path to the worker log file
|
||||||
|
pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
|
||||||
|
let setup = worker_task_setup()?;
|
||||||
|
Ok(setup.log_path(upid))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read endtime (time of last log line) and exitstatus from task log file
|
||||||
|
/// If there is not a single line with at valid datetime, we assume the
|
||||||
|
/// starttime to be the endtime
|
||||||
|
pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
|
||||||
|
|
||||||
|
let setup = worker_task_setup()?;
|
||||||
|
|
||||||
|
let mut status = TaskState::Unknown { endtime: upid.starttime };
|
||||||
|
|
||||||
|
let path = setup.log_path(upid);
|
||||||
|
|
||||||
|
let mut file = File::open(path)?;
|
||||||
|
|
||||||
|
/// speedup - only read tail
|
||||||
|
use std::io::Seek;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
|
||||||
|
|
||||||
|
let mut data = Vec::with_capacity(8192);
|
||||||
|
file.read_to_end(&mut data)?;
|
||||||
|
|
||||||
|
// strip newlines at the end of the task logs
|
||||||
|
while data.last() == Some(&b'\n') {
|
||||||
|
data.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
let last_line = match data.iter().rposition(|c| *c == b'\n') {
|
||||||
|
Some(start) if data.len() > (start+1) => &data[start+1..],
|
||||||
|
Some(_) => &data, // should not happen, since we removed all trailing newlines
|
||||||
|
None => &data,
|
||||||
|
};
|
||||||
|
|
||||||
|
let last_line = std::str::from_utf8(last_line)
|
||||||
|
.map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
|
||||||
|
|
||||||
|
let mut iter = last_line.splitn(2, ": ");
|
||||||
|
if let Some(time_str) = iter.next() {
|
||||||
|
if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
|
||||||
|
// set the endtime even if we cannot parse the state
|
||||||
|
status = TaskState::Unknown { endtime };
|
||||||
|
if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
|
||||||
|
if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
|
||||||
|
status = state;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(status)
|
||||||
}
|
}
|
||||||
pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
|
|
||||||
pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
|
|
||||||
pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
|
|
||||||
pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
|
|
||||||
pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
|
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
|
||||||
@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create task log directory with correct permissions
|
|
||||||
pub fn create_task_log_dirs() -> Result<(), Error> {
|
|
||||||
|
|
||||||
try_block!({
|
|
||||||
let backup_user = pbs_config::backup_user()?;
|
|
||||||
let opts = CreateOptions::new()
|
|
||||||
.owner(backup_user.uid)
|
|
||||||
.group(backup_user.gid);
|
|
||||||
|
|
||||||
create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
|
|
||||||
create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
|
|
||||||
create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
|
|
||||||
Ok(())
|
|
||||||
}).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read endtime (time of last log line) and exitstatus from task log file
|
|
||||||
/// If there is not a single line with at valid datetime, we assume the
|
|
||||||
/// starttime to be the endtime
|
|
||||||
pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
|
|
||||||
|
|
||||||
let mut status = TaskState::Unknown { endtime: upid.starttime };
|
|
||||||
|
|
||||||
let path = upid.log_path();
|
|
||||||
|
|
||||||
let mut file = File::open(path)?;
|
|
||||||
|
|
||||||
/// speedup - only read tail
|
|
||||||
use std::io::Seek;
|
|
||||||
use std::io::SeekFrom;
|
|
||||||
let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
|
|
||||||
|
|
||||||
let mut data = Vec::with_capacity(8192);
|
|
||||||
file.read_to_end(&mut data)?;
|
|
||||||
|
|
||||||
// strip newlines at the end of the task logs
|
|
||||||
while data.last() == Some(&b'\n') {
|
|
||||||
data.pop();
|
|
||||||
}
|
|
||||||
|
|
||||||
let last_line = match data.iter().rposition(|c| *c == b'\n') {
|
|
||||||
Some(start) if data.len() > (start+1) => &data[start+1..],
|
|
||||||
Some(_) => &data, // should not happen, since we removed all trailing newlines
|
|
||||||
None => &data,
|
|
||||||
};
|
|
||||||
|
|
||||||
let last_line = std::str::from_utf8(last_line)
|
|
||||||
.map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
|
|
||||||
|
|
||||||
let mut iter = last_line.splitn(2, ": ");
|
|
||||||
if let Some(time_str) = iter.next() {
|
|
||||||
if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
|
|
||||||
// set the endtime even if we cannot parse the state
|
|
||||||
status = TaskState::Unknown { endtime };
|
|
||||||
if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
|
|
||||||
if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
|
|
||||||
status = state;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(status)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Task State
|
/// Task State
|
||||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub enum TaskState {
|
pub enum TaskState {
|
||||||
@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
|
|
||||||
open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
|
|
||||||
/// rotates it if it is
|
|
||||||
pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
|
|
||||||
let _lock = lock_task_list_files(true)?;
|
|
||||||
|
|
||||||
let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
|
|
||||||
.ok_or_else(|| format_err!("could not get archive file names"))?;
|
|
||||||
|
|
||||||
logrotate.rotate(size_threshold, None, max_files)
|
|
||||||
}
|
|
||||||
|
|
||||||
// atomically read/update the task list, update status of finished tasks
|
|
||||||
// new_upid is added to the list when specified.
|
|
||||||
fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
|
|
||||||
|
|
||||||
let backup_user = pbs_config::backup_user()?;
|
|
||||||
|
|
||||||
let lock = lock_task_list_files(true)?;
|
|
||||||
|
|
||||||
// TODO remove with 1.x
|
|
||||||
let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
|
|
||||||
let had_index_file = !finish_list.is_empty();
|
|
||||||
|
|
||||||
// We use filter_map because one negative case wants to *move* the data into `finish_list`,
|
|
||||||
// clippy doesn't quite catch this!
|
|
||||||
#[allow(clippy::unnecessary_filter_map)]
|
|
||||||
let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|info| {
|
|
||||||
if info.state.is_some() {
|
|
||||||
// this can happen when the active file still includes finished tasks
|
|
||||||
finish_list.push(info);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !worker_is_active_local(&info.upid) {
|
|
||||||
// println!("Detected stopped task '{}'", &info.upid_str);
|
|
||||||
let now = proxmox::tools::time::epoch_i64();
|
|
||||||
let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
|
|
||||||
finish_list.push(TaskListInfo {
|
|
||||||
upid: info.upid,
|
|
||||||
upid_str: info.upid_str,
|
|
||||||
state: Some(status)
|
|
||||||
});
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(info)
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
if let Some(upid) = new_upid {
|
|
||||||
active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
|
|
||||||
}
|
|
||||||
|
|
||||||
let active_raw = render_task_list(&active_list);
|
|
||||||
|
|
||||||
replace_file(
|
|
||||||
PROXMOX_BACKUP_ACTIVE_TASK_FN,
|
|
||||||
active_raw.as_bytes(),
|
|
||||||
CreateOptions::new()
|
|
||||||
.owner(backup_user.uid)
|
|
||||||
.group(backup_user.gid),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
finish_list.sort_unstable_by(|a, b| {
|
|
||||||
match (&a.state, &b.state) {
|
|
||||||
(Some(s1), Some(s2)) => s1.cmp(&s2),
|
|
||||||
(Some(_), None) => std::cmp::Ordering::Less,
|
|
||||||
(None, Some(_)) => std::cmp::Ordering::Greater,
|
|
||||||
_ => a.upid.starttime.cmp(&b.upid.starttime),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if !finish_list.is_empty() {
|
|
||||||
match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
|
|
||||||
Ok(mut writer) => {
|
|
||||||
for info in &finish_list {
|
|
||||||
writer.write_all(render_task_line(&info).as_bytes())?;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => bail!("could not write task archive - {}", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO Remove with 1.x
|
|
||||||
// for compatibility, if we had an INDEX file, we do not need it anymore
|
|
||||||
if had_index_file {
|
|
||||||
let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(lock);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn render_task_line(info: &TaskListInfo) -> String {
|
fn render_task_line(info: &TaskListInfo) -> String {
|
||||||
let mut raw = String::new();
|
let mut raw = String::new();
|
||||||
if let Some(status) = &info.state {
|
if let Some(status) = &info.state {
|
||||||
@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
|
|||||||
list: VecDeque<TaskListInfo>,
|
list: VecDeque<TaskListInfo>,
|
||||||
end: bool,
|
end: bool,
|
||||||
archive: Option<LogRotateFiles>,
|
archive: Option<LogRotateFiles>,
|
||||||
lock: Option<BackupLockGuard>,
|
lock: Option<TaskListLockGuard>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskListInfoIterator {
|
impl TaskListInfoIterator {
|
||||||
pub fn new(active_only: bool) -> Result<Self, Error> {
|
pub fn new(active_only: bool) -> Result<Self, Error> {
|
||||||
|
|
||||||
|
let setup = worker_task_setup()?;
|
||||||
|
|
||||||
let (read_lock, active_list) = {
|
let (read_lock, active_list) = {
|
||||||
let lock = lock_task_list_files(false)?;
|
let lock = setup.lock_task_list_files(false)?;
|
||||||
let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
|
let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
|
||||||
|
|
||||||
let needs_update = active_list
|
let needs_update = active_list
|
||||||
.iter()
|
.iter()
|
||||||
.any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
|
.any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
|
||||||
|
|
||||||
// TODO remove with 1.x
|
// TODO remove with 1.x
|
||||||
let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
|
let index_exists = setup.task_index_fn.is_file();
|
||||||
|
|
||||||
if needs_update || index_exists {
|
if needs_update || index_exists {
|
||||||
drop(lock);
|
drop(lock);
|
||||||
update_active_workers(None)?;
|
setup.update_active_workers(None)?;
|
||||||
let lock = lock_task_list_files(false)?;
|
let lock = setup.lock_task_list_files(false)?;
|
||||||
let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
|
let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
|
||||||
(lock, active_list)
|
(lock, active_list)
|
||||||
} else {
|
} else {
|
||||||
(lock, active_list)
|
(lock, active_list)
|
||||||
@ -516,7 +592,7 @@ impl TaskListInfoIterator {
|
|||||||
let archive = if active_only {
|
let archive = if active_only {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
|
let logrotate = LogRotate::new(&setup.task_archive_fn, true)
|
||||||
.ok_or_else(|| format_err!("could not get archive file names"))?;
|
.ok_or_else(|| format_err!("could not get archive file names"))?;
|
||||||
Some(logrotate.files())
|
Some(logrotate.files())
|
||||||
};
|
};
|
||||||
@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
|
|||||||
/// persistently to files. Task should poll the `abort_requested`
|
/// persistently to files. Task should poll the `abort_requested`
|
||||||
/// flag, and stop execution when requested.
|
/// flag, and stop execution when requested.
|
||||||
pub struct WorkerTask {
|
pub struct WorkerTask {
|
||||||
|
setup: &'static WorkerTaskSetup,
|
||||||
upid: UPID,
|
upid: UPID,
|
||||||
data: Mutex<WorkerTaskData>,
|
data: Mutex<WorkerTaskData>,
|
||||||
abort_requested: AtomicBool,
|
abort_requested: AtomicBool,
|
||||||
@ -589,17 +666,26 @@ struct WorkerTaskData {
|
|||||||
|
|
||||||
impl WorkerTask {
|
impl WorkerTask {
|
||||||
|
|
||||||
pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
|
pub fn new(
|
||||||
|
worker_type: &str,
|
||||||
|
worker_id: Option<String>,
|
||||||
|
auth_id: String,
|
||||||
|
to_stdout: bool,
|
||||||
|
) -> Result<Arc<Self>, Error> {
|
||||||
|
|
||||||
|
let setup = worker_task_setup()?;
|
||||||
|
|
||||||
let upid = UPID::new(worker_type, worker_id, auth_id)?;
|
let upid = UPID::new(worker_type, worker_id, auth_id)?;
|
||||||
let task_id = upid.task_id;
|
let task_id = upid.task_id;
|
||||||
|
|
||||||
let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
|
let mut path = setup.taskdir.clone();
|
||||||
|
|
||||||
path.push(format!("{:02X}", upid.pstart & 255));
|
path.push(format!("{:02X}", upid.pstart & 255));
|
||||||
|
|
||||||
let backup_user = pbs_config::backup_user()?;
|
let dir_opts = setup.file_opts.clone()
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
|
||||||
|
|
||||||
create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
|
create_path(&path, None, Some(dir_opts))?;
|
||||||
|
|
||||||
path.push(upid.to_string());
|
path.push(upid.to_string());
|
||||||
|
|
||||||
@ -608,12 +694,13 @@ impl WorkerTask {
|
|||||||
exclusive: true,
|
exclusive: true,
|
||||||
prefix_time: true,
|
prefix_time: true,
|
||||||
read: true,
|
read: true,
|
||||||
|
file_opts: setup.file_opts.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let logger = FileLogger::new(&path, logger_options)?;
|
let logger = FileLogger::new(&path, logger_options)?;
|
||||||
nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
|
|
||||||
|
|
||||||
let worker = Arc::new(Self {
|
let worker = Arc::new(Self {
|
||||||
|
setup,
|
||||||
upid: upid.clone(),
|
upid: upid.clone(),
|
||||||
abort_requested: AtomicBool::new(false),
|
abort_requested: AtomicBool::new(false),
|
||||||
data: Mutex::new(WorkerTaskData {
|
data: Mutex::new(WorkerTaskData {
|
||||||
@ -631,7 +718,7 @@ impl WorkerTask {
|
|||||||
proxmox_rest_server::set_worker_count(hash.len());
|
proxmox_rest_server::set_worker_count(hash.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
update_active_workers(Some(&upid))?;
|
setup.update_active_workers(Some(&upid))?;
|
||||||
|
|
||||||
Ok(worker)
|
Ok(worker)
|
||||||
}
|
}
|
||||||
@ -714,7 +801,7 @@ impl WorkerTask {
|
|||||||
self.log(state.result_text());
|
self.log(state.result_text());
|
||||||
|
|
||||||
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
|
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
|
||||||
let _ = update_active_workers(None);
|
let _ = self.setup.update_active_workers(None);
|
||||||
proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
|
proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user