cleanup worker task logging

In order to avoid name conflicts with WorkerTaskContext

- renamed WorkerTask::log to WorkerTask::log_message

Note: Methods have different fuction signatures

Also renamed WorkerTask::warn to WorkerTask::log_warning for
consistency reasons.

Use the task_log!() and task_warn!() macros more often.
This commit is contained in:
Dietmar Maurer 2021-09-24 09:30:00 +02:00
parent c8449217dc
commit 1ec0d70d09
21 changed files with 210 additions and 183 deletions

View File

@ -33,7 +33,7 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
} }
pub fn debug<S: AsRef<str>>(&self, msg: S) { pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); } if self.debug { self.worker.log_message(msg); }
} }
fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture { fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
@ -77,7 +77,14 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
message = &data.0; message = &data.0;
} }
worker.log(format!("{} {}: {} {}: {}", method.as_str(), path, status.as_str(), reason, message)); worker.log_message(format!(
"{} {}: {} {}: {}",
method.as_str(),
path,
status.as_str(),
reason,
message
));
} }
} }
} }

View File

@ -779,7 +779,7 @@ impl WorkerTask {
/// Log task result, remove task from running list /// Log task result, remove task from running list
pub fn log_result(&self, result: &Result<(), Error>) { pub fn log_result(&self, result: &Result<(), Error>) {
let state = self.create_state(result); let state = self.create_state(result);
self.log(state.result_text()); self.log_message(state.result_text());
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
let _ = self.setup.update_active_workers(None); let _ = self.setup.update_active_workers(None);
@ -787,13 +787,13 @@ impl WorkerTask {
} }
/// Log a message. /// Log a message.
pub fn log<S: AsRef<str>>(&self, msg: S) { pub fn log_message<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap(); let mut data = self.data.lock().unwrap();
data.logger.log(msg); data.logger.log(msg);
} }
/// Log a message as warning. /// Log a message as warning.
pub fn warn<S: AsRef<str>>(&self, msg: S) { pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap(); let mut data = self.data.lock().unwrap();
data.logger.log(format!("WARN: {}", msg.as_ref())); data.logger.log(format!("WARN: {}", msg.as_ref()));
data.warn_count += 1; data.warn_count += 1;
@ -815,7 +815,7 @@ impl WorkerTask {
let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
if !prev_abort { // log abort one time if !prev_abort { // log abort one time
self.log(format!("received abort request ...")); self.log_message(format!("received abort request ..."));
} }
// noitify listeners // noitify listeners
let mut data = self.data.lock().unwrap(); let mut data = self.data.lock().unwrap();
@ -867,11 +867,11 @@ impl WorkerTaskContext for WorkerTask {
fn log(&self, level: log::Level, message: &std::fmt::Arguments) { fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
match level { match level {
log::Level::Error => self.warn(&message.to_string()), log::Level::Error => self.log_warning(&message.to_string()),
log::Level::Warn => self.warn(&message.to_string()), log::Level::Warn => self.log_warning(&message.to_string()),
log::Level::Info => self.log(&message.to_string()), log::Level::Info => self.log_message(&message.to_string()),
log::Level::Debug => self.log(&format!("DEBUG: {}", message)), log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)),
log::Level::Trace => self.log(&format!("TRACE: {}", message)), log::Level::Trace => self.log_message(&format!("TRACE: {}", message)),
} }
} }
} }

View File

@ -82,7 +82,7 @@ async fn pipe_to_tasklog<T: AsyncRead + Unpin>(
line.clear(); line.clear();
match pipe.read_line(&mut line).await { match pipe.read_line(&mut line).await {
Ok(0) => return Ok(()), Ok(0) => return Ok(()),
Ok(_) => task.log(line.as_str()), Ok(_) => task.log_message(line.as_str()),
Err(err) => return Err(err), Err(err) => return Err(err),
} }
} }
@ -150,7 +150,7 @@ impl DnsPlugin {
Ok(((), (), ())) => (), Ok(((), (), ())) => (),
Err(err) => { Err(err) => {
if let Err(err) = child.kill().await { if let Err(err) = child.kill().await {
task.log(format!( task.log_message(format!(
"failed to kill '{} {}' command: {}", "failed to kill '{} {}' command: {}",
PROXMOX_ACME_SH_PATH, action, err PROXMOX_ACME_SH_PATH, action, err
)); ));
@ -188,7 +188,7 @@ impl AcmePlugin for DnsPlugin {
let validation_delay = self.core.validation_delay.unwrap_or(30) as u64; let validation_delay = self.core.validation_delay.unwrap_or(30) as u64;
if validation_delay > 0 { if validation_delay > 0 {
task.log(format!( task.log_message(format!(
"Sleeping {} seconds to wait for TXT record propagation", "Sleeping {} seconds to wait for TXT record propagation",
validation_delay validation_delay
)); ));

View File

@ -53,6 +53,7 @@ use pbs_datastore::prune::compute_prune_info;
use pbs_tools::blocking::WrappedReaderStream; use pbs_tools::blocking::WrappedReaderStream;
use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter}; use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
use pbs_tools::json::{required_integer_param, required_string_param}; use pbs_tools::json::{required_integer_param, required_string_param};
use pbs_tools::{task_log, task_warn};
use pbs_config::CachedUserInfo; use pbs_config::CachedUserInfo;
use proxmox_rest_server::{WorkerTask, formatter}; use proxmox_rest_server::{WorkerTask, formatter};
@ -770,9 +771,9 @@ pub fn verify(
)? )?
}; };
if !failed_dirs.is_empty() { if !failed_dirs.is_empty() {
worker.log("Failed to verify the following snapshots/groups:"); task_log!(worker, "Failed to verify the following snapshots/groups:");
for dir in failed_dirs { for dir in failed_dirs {
worker.log(format!("\t{}", dir)); task_log!(worker, "\t{}", dir);
} }
bail!("verification failed - please check the log for details"); bail!("verification failed - please check the log for details");
} }
@ -865,11 +866,11 @@ pub fn prune(
let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?; let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
if keep_all { if keep_all {
worker.log("No prune selection - keeping all files."); task_log!(worker, "No prune selection - keeping all files.");
} else { } else {
worker.log(format!("retention options: {}", pbs_datastore::prune::cli_options_string(&prune_options))); task_log!(worker, "retention options: {}", pbs_datastore::prune::cli_options_string(&prune_options));
worker.log(format!("Starting prune on store \"{}\" group \"{}/{}\"", task_log!(worker, "Starting prune on store \"{}\" group \"{}/{}\"",
store, backup_type, backup_id)); store, backup_type, backup_id);
} }
for (info, mut keep) in prune_info { for (info, mut keep) in prune_info {
@ -888,7 +889,7 @@ pub fn prune(
if keep { "keep" } else { "remove" }, if keep { "keep" } else { "remove" },
); );
worker.log(msg); task_log!(worker, "{}", msg);
prune_result.push(json!({ prune_result.push(json!({
"backup-type": group.backup_type(), "backup-type": group.backup_type(),
@ -899,11 +900,11 @@ pub fn prune(
if !(dry_run || keep) { if !(dry_run || keep) {
if let Err(err) = datastore.remove_backup_dir(&info.backup_dir, false) { if let Err(err) = datastore.remove_backup_dir(&info.backup_dir, false) {
worker.warn( task_warn!(
format!( worker,
"failed to remove dir {:?}: {}", "failed to remove dir {:?}: {}",
info.backup_dir.relative_path(), err info.backup_dir.relative_path(),
) err,
); );
} }
} }

View File

@ -528,7 +528,7 @@ impl BackupEnvironment {
self.auth_id.to_string(), self.auth_id.to_string(),
false, false,
move |worker| { move |worker| {
worker.log("Automatically verifying newly added snapshot"); worker.log_message("Automatically verifying newly added snapshot");
let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
@ -548,11 +548,11 @@ impl BackupEnvironment {
} }
pub fn log<S: AsRef<str>>(&self, msg: S) { pub fn log<S: AsRef<str>>(&self, msg: S) {
self.worker.log(msg); self.worker.log_message(msg);
} }
pub fn debug<S: AsRef<str>>(&self, msg: S) { pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); } if self.debug { self.worker.log_message(msg); }
} }
pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> { pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {

View File

@ -18,6 +18,7 @@ use proxmox_acme_rs::Account;
use pbs_api_types::{Authid, PRIV_SYS_MODIFY}; use pbs_api_types::{Authid, PRIV_SYS_MODIFY};
use pbs_tools::ops::ControlFlow; use pbs_tools::ops::ControlFlow;
use pbs_tools::{task_log, task_warn};
use crate::acme::AcmeClient; use crate::acme::AcmeClient;
use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirectory}; use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirectory};
@ -220,15 +221,16 @@ fn register_account(
move |worker| async move { move |worker| async move {
let mut client = AcmeClient::new(directory); let mut client = AcmeClient::new(directory);
worker.log(format!("Registering ACME account '{}'...", &name)); task_log!(worker, "Registering ACME account '{}'...", &name);
let account = let account =
do_register_account(&mut client, &name, tos_url.is_some(), contact, None).await?; do_register_account(&mut client, &name, tos_url.is_some(), contact, None).await?;
worker.log(format!( task_log!(
worker,
"Registration successful, account URL: {}", "Registration successful, account URL: {}",
account.location account.location
)); );
Ok(()) Ok(())
}, },
@ -331,10 +333,11 @@ pub fn deactivate_account(
Ok(_account) => (), Ok(_account) => (),
Err(err) if !force => return Err(err), Err(err) if !force => return Err(err),
Err(err) => { Err(err) => {
worker.warn(format!( task_warn!(
worker,
"error deactivating account {}, proceedeing anyway - {}", "error deactivating account {}, proceedeing anyway - {}",
name, err, name, err,
)); );
} }
} }
crate::config::acme::mark_account_deactivated(&name)?; crate::config::acme::mark_account_deactivated(&name)?;

View File

@ -89,7 +89,7 @@ fn read_and_update_proxy_config() -> Result<Option<ProxyConfig>, Error> {
} }
fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> { fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> {
if !quiet { worker.log("starting apt-get update") } if !quiet { worker.log_message("starting apt-get update") }
read_and_update_proxy_config()?; read_and_update_proxy_config()?;
@ -101,7 +101,7 @@ fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> {
.map_err(|err| format_err!("failed to execute {:?} - {}", command, err))?; .map_err(|err| format_err!("failed to execute {:?} - {}", command, err))?;
if !quiet { if !quiet {
worker.log(String::from_utf8(output.stdout)?); worker.log_message(String::from_utf8(output.stdout)?);
} }
// TODO: improve run_command to allow outputting both, stderr and stdout // TODO: improve run_command to allow outputting both, stderr and stdout
@ -110,7 +110,7 @@ fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> {
let msg = String::from_utf8(output.stderr) let msg = String::from_utf8(output.stderr)
.map(|m| if m.is_empty() { String::from("no error message") } else { m }) .map(|m| if m.is_empty() { String::from("no error message") } else { m })
.unwrap_or_else(|_| String::from("non utf8 error message (suppressed)")); .unwrap_or_else(|_| String::from("non utf8 error message (suppressed)"));
worker.warn(msg); worker.log_warning(msg);
} else { } else {
bail!("terminated by signal"); bail!("terminated by signal");
} }

View File

@ -13,7 +13,7 @@ use proxmox::list_subdirs_api_method;
use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY}; use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
use pbs_buildcfg::configdir; use pbs_buildcfg::configdir;
use pbs_tools::cert; use pbs_tools::{task_log, task_warn, cert};
use crate::acme::AcmeClient; use crate::acme::AcmeClient;
use crate::api2::types::AcmeDomain; use crate::api2::types::AcmeDomain;
@ -303,7 +303,7 @@ async fn order_certificate(
}; };
if domains.is_empty() { if domains.is_empty() {
worker.log("No domains configured to be ordered from an ACME server."); task_log!(worker, "No domains configured to be ordered from an ACME server.");
return Ok(None); return Ok(None);
} }
@ -311,11 +311,11 @@ async fn order_certificate(
let mut acme = node_config.acme_client().await?; let mut acme = node_config.acme_client().await?;
worker.log("Placing ACME order"); task_log!(worker, "Placing ACME order");
let order = acme let order = acme
.new_order(domains.iter().map(|d| d.domain.to_ascii_lowercase())) .new_order(domains.iter().map(|d| d.domain.to_ascii_lowercase()))
.await?; .await?;
worker.log(format!("Order URL: {}", order.location)); task_log!(worker, "Order URL: {}", order.location);
let identifiers: Vec<String> = order let identifiers: Vec<String> = order
.data .data
@ -327,7 +327,7 @@ async fn order_certificate(
.collect(); .collect();
for auth_url in &order.data.authorizations { for auth_url in &order.data.authorizations {
worker.log(format!("Getting authorization details from '{}'", auth_url)); task_log!(worker, "Getting authorization details from '{}'", auth_url);
let mut auth = acme.get_authorization(&auth_url).await?; let mut auth = acme.get_authorization(&auth_url).await?;
let domain = match &mut auth.identifier { let domain = match &mut auth.identifier {
@ -335,11 +335,11 @@ async fn order_certificate(
}; };
if auth.status == Status::Valid { if auth.status == Status::Valid {
worker.log(format!("{} is already validated!", domain)); task_log!(worker, "{} is already validated!", domain);
continue; continue;
} }
worker.log(format!("The validation for {} is pending", domain)); task_log!(worker, "The validation for {} is pending", domain);
let domain_config: &AcmeDomain = get_domain_config(&domain)?; let domain_config: &AcmeDomain = get_domain_config(&domain)?;
let plugin_id = domain_config.plugin.as_deref().unwrap_or("standalone"); let plugin_id = domain_config.plugin.as_deref().unwrap_or("standalone");
let mut plugin_cfg = let mut plugin_cfg =
@ -347,7 +347,7 @@ async fn order_certificate(
format_err!("plugin '{}' for domain '{}' not found!", plugin_id, domain) format_err!("plugin '{}' for domain '{}' not found!", plugin_id, domain)
})?; })?;
worker.log("Setting up validation plugin"); task_log!(worker, "Setting up validation plugin");
let validation_url = plugin_cfg let validation_url = plugin_cfg
.setup(&mut acme, &auth, domain_config, Arc::clone(&worker)) .setup(&mut acme, &auth, domain_config, Arc::clone(&worker))
.await?; .await?;
@ -358,17 +358,18 @@ async fn order_certificate(
.teardown(&mut acme, &auth, domain_config, Arc::clone(&worker)) .teardown(&mut acme, &auth, domain_config, Arc::clone(&worker))
.await .await
{ {
worker.warn(format!( task_warn!(
worker,
"Failed to teardown plugin '{}' for domain '{}' - {}", "Failed to teardown plugin '{}' for domain '{}' - {}",
plugin_id, domain, err plugin_id, domain, err
)); );
} }
let _: () = result?; let _: () = result?;
} }
worker.log("All domains validated"); task_log!(worker, "All domains validated");
worker.log("Creating CSR"); task_log!(worker, "Creating CSR");
let csr = proxmox_acme_rs::util::Csr::generate(&identifiers, &Default::default())?; let csr = proxmox_acme_rs::util::Csr::generate(&identifiers, &Default::default())?;
let mut finalize_error_cnt = 0u8; let mut finalize_error_cnt = 0u8;
@ -381,7 +382,7 @@ async fn order_certificate(
match order.status { match order.status {
Status::Pending => { Status::Pending => {
worker.log("still pending, trying to finalize anyway"); task_log!(worker, "still pending, trying to finalize anyway");
let finalize = order let finalize = order
.finalize .finalize
.as_deref() .as_deref()
@ -396,7 +397,7 @@ async fn order_certificate(
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Status::Ready => { Status::Ready => {
worker.log("order is ready, finalizing"); task_log!(worker, "order is ready, finalizing");
let finalize = order let finalize = order
.finalize .finalize
.as_deref() .as_deref()
@ -405,18 +406,18 @@ async fn order_certificate(
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Status::Processing => { Status::Processing => {
worker.log("still processing, trying again in 30 seconds"); task_log!(worker, "still processing, trying again in 30 seconds");
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
} }
Status::Valid => { Status::Valid => {
worker.log("valid"); task_log!(worker, "valid");
break; break;
} }
other => bail!("order status: {:?}", other), other => bail!("order status: {:?}", other),
} }
} }
worker.log("Downloading certificate"); task_log!(worker, "Downloading certificate");
let certificate = acme let certificate = acme
.get_certificate( .get_certificate(
order order
@ -438,10 +439,10 @@ async fn request_validation(
auth_url: &str, auth_url: &str,
validation_url: &str, validation_url: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
worker.log("Triggering validation"); task_log!(worker, "Triggering validation");
acme.request_challenge_validation(&validation_url).await?; acme.request_challenge_validation(&validation_url).await?;
worker.log("Sleeping for 5 seconds"); task_log!(worker, "Sleeping for 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
loop { loop {
@ -450,7 +451,7 @@ async fn request_validation(
let auth = acme.get_authorization(&auth_url).await?; let auth = acme.get_authorization(&auth_url).await?;
match auth.status { match auth.status {
Status::Pending => { Status::Pending => {
worker.log("Status is still 'pending', trying again in 10 seconds"); task_log!(worker, "Status is still 'pending', trying again in 10 seconds");
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
} }
Status::Valid => return Ok(()), Status::Valid => return Ok(()),
@ -567,11 +568,11 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
auth_id, auth_id,
true, true,
move |worker| async move { move |worker| async move {
worker.log("Loading ACME account"); task_log!(worker, "Loading ACME account");
let mut acme = node_config.acme_client().await?; let mut acme = node_config.acme_client().await?;
worker.log("Revoking old certificate"); task_log!(worker, "Revoking old certificate");
acme.revoke_certificate(cert_pem.as_bytes(), None).await?; acme.revoke_certificate(cert_pem.as_bytes(), None).await?;
worker.log("Deleting certificate and regenerating a self-signed one"); task_log!(worker, "Deleting certificate and regenerating a self-signed one");
delete_custom_certificate().await?; delete_custom_certificate().await?;
Ok(()) Ok(())
}, },

View File

@ -10,6 +10,7 @@ use pbs_api_types::{
DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
}; };
use pbs_tools::task_log;
use crate::tools::disks::{ use crate::tools::disks::{
DiskManage, FileSystemType, DiskUsageType, DiskManage, FileSystemType, DiskUsageType,
@ -169,7 +170,7 @@ pub fn create_datastore_disk(
let upid_str = WorkerTask::new_thread( let upid_str = WorkerTask::new_thread(
"dircreate", Some(name.clone()), auth_id, to_stdout, move |worker| "dircreate", Some(name.clone()), auth_id, to_stdout, move |worker|
{ {
worker.log(format!("create datastore '{}' on disk {}", name, disk)); task_log!(worker, "create datastore '{}' on disk {}", name, disk);
let add_datastore = add_datastore.unwrap_or(false); let add_datastore = add_datastore.unwrap_or(false);
let filesystem = filesystem.unwrap_or(FileSystemType::Ext4); let filesystem = filesystem.unwrap_or(FileSystemType::Ext4);

View File

@ -16,6 +16,7 @@ use crate::tools::disks::{
get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk, get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk,
}; };
use proxmox_rest_server::WorkerTask; use proxmox_rest_server::WorkerTask;
use pbs_tools::task_log;
pub mod directory; pub mod directory;
pub mod zfs; pub mod zfs;
@ -155,7 +156,7 @@ pub fn initialize_disk(
let upid_str = WorkerTask::new_thread( let upid_str = WorkerTask::new_thread(
"diskinit", Some(disk.clone()), auth_id, to_stdout, move |worker| "diskinit", Some(disk.clone()), auth_id, to_stdout, move |worker|
{ {
worker.log(format!("initialize disk {}", disk)); task_log!(worker, "initialize disk {}", disk);
let disk_manager = DiskManage::new(); let disk_manager = DiskManage::new();
let disk_info = disk_manager.disk_by_name(&disk)?; let disk_info = disk_manager.disk_by_name(&disk)?;

View File

@ -13,6 +13,7 @@ use pbs_api_types::{
DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA, DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA,
PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
}; };
use pbs_tools::task_log;
use crate::tools::disks::{ use crate::tools::disks::{
zpool_list, zpool_status, parse_zpool_status_config_tree, vdev_list_to_tree, zpool_list, zpool_status, parse_zpool_status_config_tree, vdev_list_to_tree,
@ -231,7 +232,7 @@ pub fn create_zpool(
let upid_str = WorkerTask::new_thread( let upid_str = WorkerTask::new_thread(
"zfscreate", Some(name.clone()), auth_id, to_stdout, move |worker| "zfscreate", Some(name.clone()), auth_id, to_stdout, move |worker|
{ {
worker.log(format!("create {:?} zpool '{}' on devices '{}'", raidlevel, name, devices_text)); task_log!(worker, "create {:?} zpool '{}' on devices '{}'", raidlevel, name, devices_text);
let mut command = std::process::Command::new("zpool"); let mut command = std::process::Command::new("zpool");
@ -265,10 +266,10 @@ pub fn create_zpool(
} }
} }
worker.log(format!("# {:?}", command)); task_log!(worker, "# {:?}", command);
let output = pbs_tools::run_command(command, None)?; let output = pbs_tools::run_command(command, None)?;
worker.log(output); task_log!(worker, "{}", output);
if std::path::Path::new("/lib/systemd/system/zfs-import@.service").exists() { if std::path::Path::new("/lib/systemd/system/zfs-import@.service").exists() {
let import_unit = format!("zfs-import@{}.service", proxmox::tools::systemd::escape_unit(&name, false)); let import_unit = format!("zfs-import@{}.service", proxmox::tools::systemd::escape_unit(&name, false));
@ -278,9 +279,9 @@ pub fn create_zpool(
if let Some(compression) = compression { if let Some(compression) = compression {
let mut command = std::process::Command::new("zfs"); let mut command = std::process::Command::new("zfs");
command.args(&["set", &format!("compression={}", compression), &name]); command.args(&["set", &format!("compression={}", compression), &name]);
worker.log(format!("# {:?}", command)); task_log!(worker, "# {:?}", command);
let output = pbs_tools::run_command(command, None)?; let output = pbs_tools::run_command(command, None)?;
worker.log(output); task_log!(worker, "{}", output);
} }
if add_datastore { if add_datastore {

View File

@ -183,7 +183,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
let stdout_fut = async move { let stdout_fut = async move {
let mut reader = BufReader::new(stdout).lines(); let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await? { while let Some(line) = reader.next_line().await? {
worker_stdout.log(line); worker_stdout.log_message(line);
} }
Ok::<(), Error>(()) Ok::<(), Error>(())
}; };
@ -192,7 +192,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
let stderr_fut = async move { let stderr_fut = async move {
let mut reader = BufReader::new(stderr).lines(); let mut reader = BufReader::new(stderr).lines();
while let Some(line) = reader.next_line().await? { while let Some(line) = reader.next_line().await? {
worker_stderr.warn(line); worker_stderr.log_warning(line);
} }
Ok::<(), Error>(()) Ok::<(), Error>(())
}; };
@ -224,9 +224,9 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
} }
if let Err(err) = child.kill().await { if let Err(err) = child.kill().await {
worker.warn(format!("error killing termproxy: {}", err)); worker.log_warning(format!("error killing termproxy: {}", err));
} else if let Err(err) = child.wait().await { } else if let Err(err) = child.wait().await {
worker.warn(format!("error awaiting termproxy: {}", err)); worker.log_warning(format!("error awaiting termproxy: {}", err));
} }
} }

View File

@ -13,11 +13,12 @@ use pbs_api_types::{
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
}; };
use pbs_tools::task_log;
use proxmox_rest_server::WorkerTask; use proxmox_rest_server::WorkerTask;
use pbs_config::CachedUserInfo;
use crate::server::{jobstate::Job, pull::pull_store}; use crate::server::{jobstate::Job, pull::pull_store};
use crate::backup::DataStore; use crate::backup::DataStore;
use pbs_config::CachedUserInfo;
pub fn check_pull_privs( pub fn check_pull_privs(
auth_id: &Authid, auth_id: &Authid,
@ -97,16 +98,21 @@ pub fn do_sync_job(
let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone()); let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone());
let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?; let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
worker.log(format!("Starting datastore sync job '{}'", job_id)); task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
worker.log(format!("task triggered by schedule '{}'", event_str)); task_log!(worker, "task triggered by schedule '{}'", event_str);
} }
worker.log(format!("Sync datastore '{}' from '{}/{}'", task_log!(
sync_job.store, sync_job.remote, sync_job.remote_store)); worker,
"sync datastore '{}' from '{}/{}'",
sync_job.store,
sync_job.remote,
sync_job.remote_store,
);
pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?; pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
worker.log(format!("sync job '{}' end", &job_id)); task_log!(worker, "sync job '{}' end", &job_id);
Ok(()) Ok(())
}; };
@ -186,7 +192,7 @@ async fn pull (
// fixme: set to_stdout to false? // fixme: set to_stdout to false?
let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
worker.log(format!("sync datastore '{}' start", store)); task_log!(worker, "sync datastore '{}' start", store);
let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id); let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id);
let future = select!{ let future = select!{
@ -196,7 +202,7 @@ async fn pull (
let _ = future?; let _ = future?;
worker.log(format!("sync datastore '{}' end", store)); task_log!(worker, "sync datastore '{}' end", store);
Ok(()) Ok(())
})?; })?;

View File

@ -52,11 +52,11 @@ impl ReaderEnvironment {
} }
pub fn log<S: AsRef<str>>(&self, msg: S) { pub fn log<S: AsRef<str>>(&self, msg: S) {
self.worker.log(msg); self.worker.log_message(msg);
} }
pub fn debug<S: AsRef<str>>(&self, msg: S) { pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); } if self.debug { self.worker.log_message(msg); }
} }

View File

@ -35,7 +35,7 @@ use pbs_tape::{
sg_tape::tape_alert_flags_critical, sg_tape::tape_alert_flags_critical,
linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device}, linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device},
}; };
use pbs_tools::task_log; use pbs_tools::{task_log, task_warn};
use proxmox_rest_server::WorkerTask; use proxmox_rest_server::WorkerTask;
use crate::{ use crate::{
@ -548,7 +548,7 @@ fn write_media_label(
let media_id = if let Some(ref pool) = pool { let media_id = if let Some(ref pool) = pool {
// assign media to pool by writing special media set label // assign media to pool by writing special media set label
worker.log(format!("Label media '{}' for pool '{}'", label.label_text, pool)); task_log!(worker, "Label media '{}' for pool '{}'", label.label_text, pool);
let set = MediaSetLabel::with_data(&pool, [0u8; 16].into(), 0, label.ctime, None); let set = MediaSetLabel::with_data(&pool, [0u8; 16].into(), 0, label.ctime, None);
drive.write_media_set_label(&set, None)?; drive.write_media_set_label(&set, None)?;
@ -563,7 +563,7 @@ fn write_media_label(
media_id media_id
} else { } else {
worker.log(format!("Label media '{}' (no pool assignment)", label.label_text)); task_log!(worker, "Label media '{}' (no pool assignment)", label.label_text);
let media_id = MediaId { label, media_set_label: None }; let media_id = MediaId { label, media_set_label: None };
@ -771,7 +771,7 @@ pub fn clean_drive(
move |worker, config| { move |worker, config| {
let (mut changer, _changer_name) = required_media_changer(&config, &drive)?; let (mut changer, _changer_name) = required_media_changer(&config, &drive)?;
worker.log("Starting drive clean"); task_log!(worker, "Starting drive clean");
changer.clean_drive()?; changer.clean_drive()?;
@ -782,7 +782,7 @@ pub fn clean_drive(
// test for critical tape alert flags // test for critical tape alert flags
if let Ok(alert_flags) = handle.tape_alert_flags() { if let Ok(alert_flags) = handle.tape_alert_flags() {
if !alert_flags.is_empty() { if !alert_flags.is_empty() {
worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
if tape_alert_flags_critical(alert_flags) { if tape_alert_flags_critical(alert_flags) {
bail!("found critical tape alert flags: {:?}", alert_flags); bail!("found critical tape alert flags: {:?}", alert_flags);
} }
@ -791,13 +791,13 @@ pub fn clean_drive(
// test wearout (max. 50 mounts) // test wearout (max. 50 mounts)
if let Ok(volume_stats) = handle.volume_statistics() { if let Ok(volume_stats) = handle.volume_statistics() {
worker.log(format!("Volume mounts: {}", volume_stats.volume_mounts)); task_log!(worker, "Volume mounts: {}", volume_stats.volume_mounts);
let wearout = volume_stats.volume_mounts * 2; // (*100.0/50.0); let wearout = volume_stats.volume_mounts * 2; // (*100.0/50.0);
worker.log(format!("Cleaning tape wearout: {}%", wearout)); task_log!(worker, "Cleaning tape wearout: {}%", wearout);
} }
} }
worker.log("Drive cleaned successfully"); task_log!(worker, "Drive cleaned successfully");
Ok(()) Ok(())
}, },
@ -921,7 +921,7 @@ pub fn update_inventory(
let label_text_list = changer.online_media_label_texts()?; let label_text_list = changer.online_media_label_texts()?;
if label_text_list.is_empty() { if label_text_list.is_empty() {
worker.log("changer device does not list any media labels".to_string()); task_log!(worker, "changer device does not list any media labels");
} }
let state_path = Path::new(TAPE_STATUS_DIR); let state_path = Path::new(TAPE_STATUS_DIR);
@ -932,36 +932,36 @@ pub fn update_inventory(
for label_text in label_text_list.iter() { for label_text in label_text_list.iter() {
if label_text.starts_with("CLN") { if label_text.starts_with("CLN") {
worker.log(format!("skip cleaning unit '{}'", label_text)); task_log!(worker, "skip cleaning unit '{}'", label_text);
continue; continue;
} }
let label_text = label_text.to_string(); let label_text = label_text.to_string();
if !read_all_labels.unwrap_or(false) && inventory.find_media_by_label_text(&label_text).is_some() { if !read_all_labels.unwrap_or(false) && inventory.find_media_by_label_text(&label_text).is_some() {
worker.log(format!("media '{}' already inventoried", label_text)); task_log!(worker, "media '{}' already inventoried", label_text);
continue; continue;
} }
if let Err(err) = changer.load_media(&label_text) { if let Err(err) = changer.load_media(&label_text) {
worker.warn(format!("unable to load media '{}' - {}", label_text, err)); task_warn!(worker, "unable to load media '{}' - {}", label_text, err);
continue; continue;
} }
let mut drive = open_drive(&config, &drive)?; let mut drive = open_drive(&config, &drive)?;
match drive.read_label() { match drive.read_label() {
Err(err) => { Err(err) => {
worker.warn(format!("unable to read label form media '{}' - {}", label_text, err)); task_warn!(worker, "unable to read label form media '{}' - {}", label_text, err);
} }
Ok((None, _)) => { Ok((None, _)) => {
worker.log(format!("media '{}' is empty", label_text)); task_log!(worker, "media '{}' is empty", label_text);
} }
Ok((Some(media_id), _key_config)) => { Ok((Some(media_id), _key_config)) => {
if label_text != media_id.label.label_text { if label_text != media_id.label.label_text {
worker.warn(format!("label text mismatch ({} != {})", label_text, media_id.label.label_text)); task_warn!(worker, "label text mismatch ({} != {})", label_text, media_id.label.label_text);
continue; continue;
} }
worker.log(format!("inventorize media '{}' with uuid '{}'", label_text, media_id.label.uuid)); task_log!(worker, "inventorize media '{}' with uuid '{}'", label_text, media_id.label.uuid);
if let Some(MediaSetLabel { ref pool, ref uuid, ..}) = media_id.media_set_label { if let Some(MediaSetLabel { ref pool, ref uuid, ..}) = media_id.media_set_label {
let _pool_lock = lock_media_pool(state_path, pool)?; let _pool_lock = lock_media_pool(state_path, pool)?;
@ -1057,14 +1057,14 @@ fn barcode_label_media_worker(
inventory.reload()?; inventory.reload()?;
if inventory.find_media_by_label_text(&label_text).is_some() { if inventory.find_media_by_label_text(&label_text).is_some() {
worker.log(format!("media '{}' already inventoried (already labeled)", label_text)); task_log!(worker, "media '{}' already inventoried (already labeled)", label_text);
continue; continue;
} }
worker.log(format!("checking/loading media '{}'", label_text)); task_log!(worker, "checking/loading media '{}'", label_text);
if let Err(err) = changer.load_media(&label_text) { if let Err(err) = changer.load_media(&label_text) {
worker.warn(format!("unable to load media '{}' - {}", label_text, err)); task_warn!(worker, "unable to load media '{}' - {}", label_text, err);
continue; continue;
} }
@ -1073,13 +1073,13 @@ fn barcode_label_media_worker(
match drive.read_next_file() { match drive.read_next_file() {
Ok(_reader) => { Ok(_reader) => {
worker.log(format!("media '{}' is not empty (format it first)", label_text)); task_log!(worker, "media '{}' is not empty (format it first)", label_text);
continue; continue;
} }
Err(BlockReadError::EndOfFile) => { /* EOF mark at BOT, assume tape is empty */ }, Err(BlockReadError::EndOfFile) => { /* EOF mark at BOT, assume tape is empty */ },
Err(BlockReadError::EndOfStream) => { /* tape is empty */ }, Err(BlockReadError::EndOfStream) => { /* tape is empty */ },
Err(_err) => { Err(_err) => {
worker.warn(format!("media '{}' read error (maybe not empty - format it first)", label_text)); task_warn!(worker, "media '{}' read error (maybe not empty - format it first)", label_text);
continue; continue;
} }
} }
@ -1249,15 +1249,17 @@ pub fn catalog_media(
let media_id = match drive.read_label()? { let media_id = match drive.read_label()? {
(Some(media_id), key_config) => { (Some(media_id), key_config) => {
worker.log(format!( task_log!(
worker,
"found media label: {}", "found media label: {}",
serde_json::to_string_pretty(&serde_json::to_value(&media_id)?)? serde_json::to_string_pretty(&serde_json::to_value(&media_id)?)?
)); );
if key_config.is_some() { if key_config.is_some() {
worker.log(format!( task_log!(
worker,
"encryption key config: {}", "encryption key config: {}",
serde_json::to_string_pretty(&serde_json::to_value(&key_config)?)? serde_json::to_string_pretty(&serde_json::to_value(&key_config)?)?
)); );
} }
media_id media_id
}, },
@ -1270,7 +1272,7 @@ pub fn catalog_media(
let (_media_set_lock, media_set_uuid) = match media_id.media_set_label { let (_media_set_lock, media_set_uuid) = match media_id.media_set_label {
None => { None => {
worker.log("media is empty"); task_log!(worker, "media is empty");
let _lock = lock_unassigned_media_pool(status_path)?; let _lock = lock_unassigned_media_pool(status_path)?;
MediaCatalog::destroy(status_path, &media_id.label.uuid)?; MediaCatalog::destroy(status_path, &media_id.label.uuid)?;
inventory.store(media_id.clone(), false)?; inventory.store(media_id.clone(), false)?;
@ -1278,7 +1280,7 @@ pub fn catalog_media(
} }
Some(ref set) => { Some(ref set) => {
if set.uuid.as_ref() == [0u8;16] { // media is empty if set.uuid.as_ref() == [0u8;16] { // media is empty
worker.log("media is empty"); task_log!(worker, "media is empty");
let _lock = lock_unassigned_media_pool(status_path)?; let _lock = lock_unassigned_media_pool(status_path)?;
MediaCatalog::destroy(status_path, &media_id.label.uuid)?; MediaCatalog::destroy(status_path, &media_id.label.uuid)?;
inventory.store(media_id.clone(), false)?; inventory.store(media_id.clone(), false)?;

View File

@ -20,6 +20,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive;
use proxmox::tools::fs::CreateOptions; use proxmox::tools::fs::CreateOptions;
use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask}; use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask};
use pbs_tools::task_log;
use proxmox_backup::{ use proxmox_backup::{
backup::DataStore, backup::DataStore,
@ -748,16 +749,16 @@ async fn schedule_task_log_rotate() {
false, false,
move |worker| { move |worker| {
job.start(&worker.upid().to_string())?; job.start(&worker.upid().to_string())?;
worker.log("starting task log rotation".to_string()); task_log!(worker, "starting task log rotation");
let result = try_block!({ let result = try_block!({
let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file
let max_files = 20; // times twenty files gives > 100000 task entries let max_files = 20; // times twenty files gives > 100000 task entries
let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?; let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?;
if has_rotated { if has_rotated {
worker.log("task log archive was rotated".to_string()); task_log!(worker, "task log archive was rotated");
} else { } else {
worker.log("task log archive was not rotated".to_string()); task_log!(worker, "task log archive was not rotated");
} }
let max_size = 32 * 1024 * 1024 - 1; let max_size = 32 * 1024 * 1024 - 1;
@ -768,9 +769,9 @@ async fn schedule_task_log_rotate() {
if logrotate.rotate(max_size, None, Some(max_files))? { if logrotate.rotate(max_size, None, Some(max_files))? {
println!("rotated access log, telling daemons to re-open log file"); println!("rotated access log, telling daemons to re-open log file");
pbs_runtime::block_on(command_reopen_access_logfiles())?; pbs_runtime::block_on(command_reopen_access_logfiles())?;
worker.log("API access log was rotated".to_string()); task_log!(worker, "API access log was rotated");
} else { } else {
worker.log("API access log was not rotated".to_string()); task_log!(worker, "API access log was not rotated");
} }
let mut logrotate = LogRotate::new(pbs_buildcfg::API_AUTH_LOG_FN, true) let mut logrotate = LogRotate::new(pbs_buildcfg::API_AUTH_LOG_FN, true)
@ -779,9 +780,9 @@ async fn schedule_task_log_rotate() {
if logrotate.rotate(max_size, None, Some(max_files))? { if logrotate.rotate(max_size, None, Some(max_files))? {
println!("rotated auth log, telling daemons to re-open log file"); println!("rotated auth log, telling daemons to re-open log file");
pbs_runtime::block_on(command_reopen_auth_logfiles())?; pbs_runtime::block_on(command_reopen_auth_logfiles())?;
worker.log("API authentication log was rotated".to_string()); task_log!(worker, "API authentication log was rotated");
} else { } else {
worker.log("API authentication log was not rotated".to_string()); task_log!(worker, "API authentication log was not rotated");
} }
Ok(()) Ok(())

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use anyhow::Error; use anyhow::Error;
use pbs_api_types::Authid; use pbs_api_types::Authid;
use pbs_tools::task_log;
use proxmox_rest_server::WorkerTask; use proxmox_rest_server::WorkerTask;
use crate::{ use crate::{
@ -31,9 +32,9 @@ pub fn do_garbage_collection_job(
move |worker| { move |worker| {
job.start(&worker.upid().to_string())?; job.start(&worker.upid().to_string())?;
worker.log(format!("starting garbage collection on store {}", store)); task_log!(worker, "starting garbage collection on store {}", store);
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
worker.log(format!("task triggered by schedule '{}'", event_str)); task_log!(worker, "task triggered by schedule '{}'", event_str);
} }
let result = datastore.garbage_collection(&*worker, worker.upid()); let result = datastore.garbage_collection(&*worker, worker.upid());

View File

@ -89,10 +89,10 @@ async fn pull_index_chunks<I: IndexFile>(
target.cond_touch_chunk(&info.digest, false) target.cond_touch_chunk(&info.digest, false)
})?; })?;
if chunk_exists { if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
return Ok::<_, Error>(()); return Ok::<_, Error>(());
} }
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
let raw_size = chunk.raw_size() as usize; let raw_size = chunk.raw_size() as usize;
@ -118,11 +118,12 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst); let bytes = bytes.load(Ordering::SeqCst);
worker.log(format!( task_log!(
worker,
"downloaded {} bytes ({:.2} MiB/s)", "downloaded {} bytes ({:.2} MiB/s)",
bytes, bytes,
(bytes as f64) / (1024.0 * 1024.0 * elapsed) (bytes as f64) / (1024.0 * 1024.0 * elapsed)
)); );
Ok(()) Ok(())
} }
@ -181,7 +182,8 @@ async fn pull_single_archive(
let mut tmp_path = path.clone(); let mut tmp_path = path.clone();
tmp_path.set_extension("tmp"); tmp_path.set_extension("tmp");
worker.log(format!("sync archive {}", archive_name)); task_log!(worker, "sync archive {}", archive_name);
let mut tmpfile = std::fs::OpenOptions::new() let mut tmpfile = std::fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
@ -256,7 +258,7 @@ async fn try_client_log_download(
if let Err(err) = std::fs::rename(&tmp_path, &path) { if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err); bail!("Atomic rename file {:?} failed - {}", path, err);
} }
worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
} }
Ok(()) Ok(())
@ -287,10 +289,11 @@ async fn pull_snapshot(
match err.downcast_ref::<HttpError>() { match err.downcast_ref::<HttpError>() {
Some(HttpError { code, message }) => match *code { Some(HttpError { code, message }) => match *code {
StatusCode::NOT_FOUND => { StatusCode::NOT_FOUND => {
worker.log(format!( task_log!(
worker,
"skipping snapshot {} - vanished since start of sync", "skipping snapshot {} - vanished since start of sync",
snapshot snapshot
)); );
return Ok(()); return Ok(());
} }
_ => { _ => {
@ -330,7 +333,7 @@ async fn pull_snapshot(
if !client_log_name.exists() { if !client_log_name.exists() {
try_client_log_download(worker, reader, &client_log_name).await?; try_client_log_download(worker, reader, &client_log_name).await?;
} }
worker.log("no data changes"); task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name); let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed return Ok(()); // nothing changed
} }
@ -351,7 +354,7 @@ async fn pull_snapshot(
match manifest.verify_file(&item.filename, &csum, size) { match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue, Ok(_) => continue,
Err(err) => { Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err)); task_log!(worker, "detected changed file {:?} - {}", path, err);
} }
} }
} }
@ -361,7 +364,7 @@ async fn pull_snapshot(
match manifest.verify_file(&item.filename, &csum, size) { match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue, Ok(_) => continue,
Err(err) => { Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err)); task_log!(worker, "detected changed file {:?} - {}", path, err);
} }
} }
} }
@ -371,7 +374,7 @@ async fn pull_snapshot(
match manifest.verify_file(&item.filename, &csum, size) { match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue, Ok(_) => continue,
Err(err) => { Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err)); task_log!(worker, "detected changed file {:?} - {}", path, err);
} }
} }
} }
@ -421,7 +424,7 @@ pub async fn pull_snapshot_from(
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
if is_new { if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
if let Err(err) = pull_snapshot( if let Err(err) = pull_snapshot(
worker, worker,
@ -433,13 +436,13 @@ pub async fn pull_snapshot_from(
.await .await
{ {
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
worker.log(format!("cleanup error - {}", cleanup_err)); task_log!(worker, "cleanup error - {}", cleanup_err);
} }
return Err(err); return Err(err);
} }
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
} else { } else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
pull_snapshot( pull_snapshot(
worker, worker,
reader, reader,
@ -448,10 +451,7 @@ pub async fn pull_snapshot_from(
downloaded_chunks, downloaded_chunks,
) )
.await?; .await?;
worker.log(format!( task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
"re-sync snapshot {:?} done",
snapshot.relative_path()
));
} }
Ok(()) Ok(())
@ -547,10 +547,7 @@ pub async fn pull_group(
// in-progress backups can't be synced // in-progress backups can't be synced
if item.size.is_none() { if item.size.is_none() {
worker.log(format!( task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
"skipping snapshot {} - in-progress backup",
snapshot
));
continue; continue;
} }
@ -598,7 +595,7 @@ pub async fn pull_group(
.await; .await;
progress.done_snapshots = pos as u64 + 1; progress.done_snapshots = pos as u64 + 1;
worker.log(format!("percentage done: {}", progress)); task_log!(worker, "percentage done: {}", progress);
result?; // stop on error result?; // stop on error
} }
@ -610,10 +607,7 @@ pub async fn pull_group(
if remote_snapshots.contains(&backup_time) { if remote_snapshots.contains(&backup_time) {
continue; continue;
} }
worker.log(format!( task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
"delete vanished snapshot {:?}",
info.backup_dir.relative_path()
));
tgt_store.remove_backup_dir(&info.backup_dir, false)?; tgt_store.remove_backup_dir(&info.backup_dir, false)?;
} }
} }
@ -645,7 +639,7 @@ pub async fn pull_store(
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
worker.log(format!("found {} groups to sync", list.len())); task_log!(worker, "found {} groups to sync", list.len());
list.sort_unstable_by(|a, b| { list.sort_unstable_by(|a, b| {
let type_order = a.backup_type.cmp(&b.backup_type); let type_order = a.backup_type.cmp(&b.backup_type);
@ -675,10 +669,11 @@ pub async fn pull_store(
let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
Ok(result) => result, Ok(result) => result,
Err(err) => { Err(err) => {
worker.log(format!( task_log!(
worker,
"sync group {}/{} failed - group lock failed: {}", "sync group {}/{} failed - group lock failed: {}",
item.backup_type, item.backup_id, err item.backup_type, item.backup_id, err
)); );
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
continue; continue;
} }
@ -687,10 +682,11 @@ pub async fn pull_store(
// permission check // permission check
if auth_id != owner { if auth_id != owner {
// only the owner is allowed to create additional snapshots // only the owner is allowed to create additional snapshots
worker.log(format!( task_log!(
worker,
"sync group {}/{} failed - owner check failed ({} != {})", "sync group {}/{} failed - owner check failed ({} != {})",
item.backup_type, item.backup_id, auth_id, owner item.backup_type, item.backup_id, auth_id, owner
)); );
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
} else if let Err(err) = pull_group( } else if let Err(err) = pull_group(
worker, worker,
@ -703,10 +699,11 @@ pub async fn pull_store(
) )
.await .await
{ {
worker.log(format!( task_log!(
worker,
"sync group {}/{} failed - {}", "sync group {}/{} failed - {}",
item.backup_type, item.backup_id, err, item.backup_type, item.backup_id, err,
)); );
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
} }
} }
@ -718,20 +715,21 @@ pub async fn pull_store(
if new_groups.contains(&local_group) { if new_groups.contains(&local_group) {
continue; continue;
} }
worker.log(format!( task_log!(
worker,
"delete vanished group '{}/{}'", "delete vanished group '{}/{}'",
local_group.backup_type(), local_group.backup_type(),
local_group.backup_id() local_group.backup_id()
)); );
if let Err(err) = tgt_store.remove_backup_group(&local_group) { if let Err(err) = tgt_store.remove_backup_group(&local_group) {
worker.log(err.to_string()); task_log!(worker, "{}", err.to_string());
errors = true; errors = true;
} }
} }
Ok(()) Ok(())
}); });
if let Err(err) = result { if let Err(err) = result {
worker.log(format!("error during cleanup: {}", err)); task_log!(worker, "error during cleanup: {}", err);
errors = true; errors = true;
}; };
} }

View File

@ -58,9 +58,9 @@ pub fn do_verification_job(
let job_result = match result { let job_result = match result {
Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()), Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()),
Ok(ref failed_dirs) => { Ok(ref failed_dirs) => {
worker.log("Failed to verify the following snapshots/groups:"); task_log!(worker, "Failed to verify the following snapshots/groups:");
for dir in failed_dirs { for dir in failed_dirs {
worker.log(format!("\t{}", dir)); task_log!(worker, "\t{}", dir);
} }
Err(format_err!("verification failed - please check the log for details")) Err(format_err!("verification failed - please check the log for details"))

View File

@ -13,7 +13,7 @@ use anyhow::{bail, Error};
use proxmox::tools::Uuid; use proxmox::tools::Uuid;
use pbs_tools::task_log; use pbs_tools::{task_log, task_warn};
use pbs_config::tape_encryption_keys::load_key_configs; use pbs_config::tape_encryption_keys::load_key_configs;
use pbs_tape::{ use pbs_tape::{
TapeWrite, TapeWrite,
@ -135,13 +135,13 @@ impl PoolWriter {
let (drive_config, _digest) = pbs_config::drive::config()?; let (drive_config, _digest) = pbs_config::drive::config()?;
if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
worker.log("eject media"); task_log!(worker, "eject media");
status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
drop(status); // close drive drop(status); // close drive
worker.log("unload media"); task_log!(worker, "unload media");
changer.unload_media(None)?; //eject and unload changer.unload_media(None)?; //eject and unload
} else { } else {
worker.log("standalone drive - ejecting media"); task_log!(worker, "standalone drive - ejecting media");
status.drive.eject_media()?; status.drive.eject_media()?;
} }
@ -157,26 +157,26 @@ impl PoolWriter {
if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
if let Some(ref mut status) = status { if let Some(ref mut status) = status {
worker.log("eject media"); task_log!(worker, "eject media");
status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
} }
drop(status); // close drive drop(status); // close drive
worker.log("unload media"); task_log!(worker, "unload media");
changer.unload_media(None)?; changer.unload_media(None)?;
for media_uuid in self.pool.current_media_list()? { for media_uuid in self.pool.current_media_list()? {
let media = self.pool.lookup_media(media_uuid)?; let media = self.pool.lookup_media(media_uuid)?;
let label_text = media.label_text(); let label_text = media.label_text();
if let Some(slot) = changer.export_media(label_text)? { if let Some(slot) = changer.export_media(label_text)? {
worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot)); task_log!(worker, "exported media '{}' to import/export slot {}", label_text, slot);
} else { } else {
worker.warn(format!("export failed - media '{}' is not online", label_text)); task_warn!(worker, "export failed - media '{}' is not online", label_text);
} }
} }
} else if let Some(mut status) = status { } else if let Some(mut status) = status {
worker.log("standalone drive - ejecting media instead of export"); task_log!(worker, "standalone drive - ejecting media instead of export");
status.drive.eject_media()?; status.drive.eject_media()?;
} }
@ -233,7 +233,7 @@ impl PoolWriter {
// test for critical tape alert flags // test for critical tape alert flags
if let Ok(alert_flags) = drive.tape_alert_flags() { if let Ok(alert_flags) = drive.tape_alert_flags() {
if !alert_flags.is_empty() { if !alert_flags.is_empty() {
worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
if tape_alert_flags_critical(alert_flags) { if tape_alert_flags_critical(alert_flags) {
self.pool.set_media_status_damaged(&media_uuid)?; self.pool.set_media_status_damaged(&media_uuid)?;
bail!("aborting due to critical tape alert flags: {:?}", alert_flags); bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
@ -297,7 +297,7 @@ impl PoolWriter {
) -> Result<u64, Error> { ) -> Result<u64, Error> {
if !status.at_eom { if !status.at_eom {
worker.log(String::from("moving to end of media")); task_log!(worker, "moving to end of media");
status.drive.move_to_eom(true)?; status.drive.move_to_eom(true)?;
status.at_eom = true; status.at_eom = true;
} }
@ -499,12 +499,13 @@ impl PoolWriter {
status.bytes_written += bytes_written; status.bytes_written += bytes_written;
let elapsed = start_time.elapsed()?.as_secs_f64(); let elapsed = start_time.elapsed()?.as_secs_f64();
worker.log(format!( task_log!(
worker,
"wrote {} chunks ({:.2} MB at {:.2} MB/s)", "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
saved_chunks.len(), saved_chunks.len(),
bytes_written as f64 /1_000_000.0, bytes_written as f64 /1_000_000.0,
(bytes_written as f64)/(1_000_000.0*elapsed), (bytes_written as f64)/(1_000_000.0*elapsed),
)); );
let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
@ -571,7 +572,7 @@ fn write_chunk_archive<'a>(
} }
if writer.bytes_written() > max_size { if writer.bytes_written() > max_size {
//worker.log("Chunk Archive max size reached, closing archive".to_string()); //task_log!(worker, "Chunk Archive max size reached, closing archive");
break; break;
} }
} }
@ -614,7 +615,7 @@ fn update_media_set_label(
let new_media = match old_set { let new_media = match old_set {
None => { None => {
worker.log("writing new media set label".to_string()); task_log!(worker, "writing new media set label");
drive.write_media_set_label(new_set, key_config.as_ref())?; drive.write_media_set_label(new_set, key_config.as_ref())?;
media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
true true
@ -634,9 +635,11 @@ fn update_media_set_label(
false false
} else { } else {
worker.log( task_log!(
format!("writing new media set label (overwrite '{}/{}')", worker,
media_set_label.uuid.to_string(), media_set_label.seq_nr) "writing new media set label (overwrite '{}/{}')",
media_set_label.uuid.to_string(),
media_set_label.seq_nr,
); );
drive.write_media_set_label(new_set, key_config.as_ref())?; drive.write_media_set_label(new_set, key_config.as_ref())?;

View File

@ -9,22 +9,23 @@ use proxmox::try_block;
use proxmox::tools::fs::CreateOptions; use proxmox::tools::fs::CreateOptions;
use pbs_api_types::{Authid, UPID}; use pbs_api_types::{Authid, UPID};
use pbs_tools::task_log;
use proxmox_rest_server::{flog, CommandoSocket, WorkerTask}; use proxmox_rest_server::{CommandoSocket, WorkerTask};
fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> { fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> {
worker.log("start garbage collection"); task_log!(worker, "start garbage collection");
for i in 0..50 { for i in 0..50 {
worker.check_abort()?; worker.check_abort()?;
flog!(worker, "progress {}", i); task_log!(worker, "progress {}", i);
std::thread::sleep(std::time::Duration::from_millis(10)); std::thread::sleep(std::time::Duration::from_millis(10));
} }
worker.log("end garbage collection"); task_log!(worker, "end garbage collection");
Ok(()) Ok(())
} }