diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index af3af0ad..c260b62d 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -518,7 +518,14 @@ pub fn verify( let failed_dirs = if let Some(backup_dir) = backup_dir { let mut res = Vec::new(); - if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? { + if !verify_backup_dir( + datastore, + &backup_dir, + verified_chunks, + corrupt_chunks, + worker.clone(), + worker.upid().clone(), + )? { res.push(backup_dir.to_string()); } res @@ -530,10 +537,11 @@ pub fn verify( corrupt_chunks, None, worker.clone(), + worker.upid(), )?; failed_dirs } else { - verify_all_backups(datastore, worker.clone())? + verify_all_backups(datastore, worker.clone(), worker.upid())? }; if failed_dirs.len() > 0 { worker.log("Failed to verify following snapshots:"); @@ -770,7 +778,7 @@ fn start_garbage_collection( to_stdout, move |worker| { worker.log(format!("starting garbage collection on store {}", store)); - datastore.garbage_collection(&worker) + datastore.garbage_collection(&*worker, worker.upid()) }, )?; diff --git a/src/backup/chunk_store.rs b/src/backup/chunk_store.rs index 9c81ff27..4397648b 100644 --- a/src/backup/chunk_store.rs +++ b/src/backup/chunk_store.rs @@ -11,7 +11,7 @@ use crate::tools; use crate::api2::types::GarbageCollectionStatus; use super::DataBlob; -use crate::server::WorkerTask; +use crate::task::TaskState; /// File system based chunk store pub struct ChunkStore { @@ -278,7 +278,7 @@ impl ChunkStore { oldest_writer: i64, phase1_start_time: i64, status: &mut GarbageCollectionStatus, - worker: &WorkerTask, + worker: &dyn TaskState, ) -> Result<(), Error> { use nix::sys::stat::fstatat; use nix::unistd::{unlinkat, UnlinkatFlags}; @@ -297,10 +297,15 @@ impl ChunkStore { for (entry, percentage, bad) in self.get_chunk_iterator()? { if last_percentage != percentage { last_percentage = percentage; - worker.log(format!("percentage done: phase2 {}% (processed {} chunks)", percentage, chunk_count)); + crate::task_log!( + worker, + "percentage done: phase2 {}% (processed {} chunks)", + percentage, + chunk_count, + ); } - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; let (dirfd, entry) = match entry { @@ -334,12 +339,13 @@ impl ChunkStore { Ok(_) => { match unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) { Err(err) => - worker.warn(format!( + crate::task_warn!( + worker, "unlinking corrupt chunk {:?} failed on store '{}' - {}", filename, self.name, err, - )), + ), Ok(_) => { status.removed_bad += 1; status.removed_bytes += stat.st_size as u64; @@ -351,11 +357,12 @@ impl ChunkStore { }, Err(err) => { // some other error, warn user and keep .bad file around too - worker.warn(format!( + crate::task_warn!( + worker, "error during stat on '{:?}' - {}", orig_filename, err, - )); + ); } } } else if stat.st_atime < min_atime { diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index 1b5f7f8a..1f708293 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -18,11 +18,12 @@ use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}; use super::index::*; use super::{DataBlob, ArchiveType, archive_type}; use crate::config::datastore; -use crate::server::WorkerTask; +use crate::task::TaskState; use crate::tools; use crate::tools::format::HumanByte; use crate::tools::fs::{lock_dir_noblock, DirLockGuard}; use crate::api2::types::{GarbageCollectionStatus, Userid}; +use crate::server::UPID; lazy_static! { static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); @@ -411,25 +412,34 @@ impl DataStore { index: I, file_name: &Path, // only used for error reporting status: &mut GarbageCollectionStatus, - worker: &WorkerTask, + worker: &dyn TaskState, ) -> Result<(), Error> { status.index_file_count += 1; status.index_data_bytes += index.index_bytes(); for pos in 0..index.index_count() { - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; let digest = index.index_digest(pos).unwrap(); if let Err(err) = self.chunk_store.touch_chunk(digest) { - worker.warn(&format!("warning: unable to access chunk {}, required by {:?} - {}", - proxmox::tools::digest_to_hex(digest), file_name, err)); + crate::task_warn!( + worker, + "warning: unable to access chunk {}, required by {:?} - {}", + proxmox::tools::digest_to_hex(digest), + file_name, + err, + ); } } Ok(()) } - fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> { + fn mark_used_chunks( + &self, + status: &mut GarbageCollectionStatus, + worker: &dyn TaskState, + ) -> Result<(), Error> { let image_list = self.list_images()?; @@ -441,7 +451,7 @@ impl DataStore { for path in image_list { - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; if let Ok(archive_type) = archive_type(&path) { @@ -457,8 +467,13 @@ impl DataStore { let percentage = done*100/image_count; if percentage > last_percentage { - worker.log(format!("percentage done: phase1 {}% ({} of {} index files)", - percentage, done, image_count)); + crate::task_log!( + worker, + "percentage done: phase1 {}% ({} of {} index files)", + percentage, + done, + image_count, + ); last_percentage = percentage; } } @@ -474,7 +489,7 @@ impl DataStore { if let Ok(_) = self.gc_mutex.try_lock() { false } else { true } } - pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> { + pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { @@ -487,36 +502,59 @@ impl DataStore { let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time); let mut gc_status = GarbageCollectionStatus::default(); - gc_status.upid = Some(worker.to_string()); + gc_status.upid = Some(upid.to_string()); - worker.log("Start GC phase1 (mark used chunks)"); + crate::task_log!(worker, "Start GC phase1 (mark used chunks)"); - self.mark_used_chunks(&mut gc_status, &worker)?; + self.mark_used_chunks(&mut gc_status, worker)?; - worker.log("Start GC phase2 (sweep unused chunks)"); - self.chunk_store.sweep_unused_chunks(oldest_writer, phase1_start_time, &mut gc_status, &worker)?; + crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)"); + self.chunk_store.sweep_unused_chunks( + oldest_writer, + phase1_start_time, + &mut gc_status, + worker, + )?; - worker.log(&format!("Removed garbage: {}", HumanByte::from(gc_status.removed_bytes))); - worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks)); + crate::task_log!( + worker, + "Removed garbage: {}", + HumanByte::from(gc_status.removed_bytes), + ); + crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks); if gc_status.pending_bytes > 0 { - worker.log(&format!("Pending removals: {} (in {} chunks)", HumanByte::from(gc_status.pending_bytes), gc_status.pending_chunks)); + crate::task_log!( + worker, + "Pending removals: {} (in {} chunks)", + HumanByte::from(gc_status.pending_bytes), + gc_status.pending_chunks, + ); } if gc_status.removed_bad > 0 { - worker.log(&format!("Removed bad files: {}", gc_status.removed_bad)); + crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad); } - worker.log(&format!("Original data usage: {}", HumanByte::from(gc_status.index_data_bytes))); + crate::task_log!( + worker, + "Original data usage: {}", + HumanByte::from(gc_status.index_data_bytes), + ); if gc_status.index_data_bytes > 0 { let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64; - worker.log(&format!("On-Disk usage: {} ({:.2}%)", HumanByte::from(gc_status.disk_bytes), comp_per)); + crate::task_log!( + worker, + "On-Disk usage: {} ({:.2}%)", + HumanByte::from(gc_status.disk_bytes), + comp_per, + ); } - worker.log(&format!("On-Disk chunks: {}", gc_status.disk_chunks)); + crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks); if gc_status.disk_chunks > 0 { let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64); - worker.log(&format!("Average chunk size: {}", HumanByte::from(avg_chunk))); + crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk)); } *self.last_gc_status.lock().unwrap() = gc_status; diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 0c55305f..f2c38eae 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -6,9 +6,7 @@ use std::time::Instant; use anyhow::{bail, format_err, Error}; use crate::{ - server::WorkerTask, api2::types::*, - tools::ParallelHandler, backup::{ DataStore, DataBlob, @@ -21,6 +19,10 @@ use crate::{ ArchiveType, archive_type, }, + server::UPID, + task::TaskState, + task_log, + tools::ParallelHandler, }; fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { @@ -51,7 +53,7 @@ fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInf fn rename_corrupted_chunk( datastore: Arc, digest: &[u8;32], - worker: Arc, + worker: &dyn TaskState, ) { let (path, digest_str) = datastore.chunk_path(digest); @@ -64,12 +66,12 @@ fn rename_corrupted_chunk( match std::fs::rename(&path, &new_path) { Ok(_) => { - worker.log(format!("corrupted chunk renamed to {:?}", &new_path)); + task_log!(worker, "corrupted chunk renamed to {:?}", &new_path); }, Err(err) => { match err.kind() { std::io::ErrorKind::NotFound => { /* ignored */ }, - _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err)) + _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err) } } }; @@ -81,7 +83,7 @@ fn verify_index_chunks( verified_chunks: Arc>>, corrupt_chunks: Arc>>, crypt_mode: CryptMode, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let errors = Arc::new(AtomicUsize::new(0)); @@ -103,7 +105,7 @@ fn verify_index_chunks( let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { corrupt_chunks2.lock().unwrap().insert(digest); - worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err)); + task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err); errors2.fetch_add(1, Ordering::SeqCst); return Ok(()); }, @@ -111,19 +113,20 @@ fn verify_index_chunks( }; if chunk_crypt_mode != crypt_mode { - worker2.log(format!( + task_log!( + worker2, "chunk CryptMode {:?} does not match index CryptMode {:?}", chunk_crypt_mode, crypt_mode - )); + ); errors2.fetch_add(1, Ordering::SeqCst); } if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { corrupt_chunks2.lock().unwrap().insert(digest); - worker2.log(format!("{}", err)); + task_log!(worker2, "{}", err); errors2.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone()); + rename_corrupted_chunk(datastore2.clone(), &digest, &worker2); } else { verified_chunks2.lock().unwrap().insert(digest); } @@ -134,7 +137,7 @@ fn verify_index_chunks( for pos in 0..index.index_count() { - worker.fail_on_abort()?; + worker.check_abort()?; crate::tools::fail_on_shutdown()?; let info = index.chunk_info(pos).unwrap(); @@ -146,7 +149,7 @@ fn verify_index_chunks( if corrupt_chunks.lock().unwrap().contains(&info.digest) { let digest_str = proxmox::tools::digest_to_hex(&info.digest); - worker.log(format!("chunk {} was marked as corrupt", digest_str)); + task_log!(worker, "chunk {} was marked as corrupt", digest_str); errors.fetch_add(1, Ordering::SeqCst); continue; } @@ -154,9 +157,9 @@ fn verify_index_chunks( match datastore.load_chunk(&info.digest) { Err(err) => { corrupt_chunks.lock().unwrap().insert(info.digest); - worker.log(format!("can't verify chunk, load failed - {}", err)); + task_log!(worker, "can't verify chunk, load failed - {}", err); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); + rename_corrupted_chunk(datastore.clone(), &info.digest, &worker); continue; } Ok(chunk) => { @@ -179,8 +182,16 @@ fn verify_index_chunks( let error_count = errors.load(Ordering::SeqCst); - worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", - read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); + task_log!( + worker, + " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", + read_bytes_mib, + decoded_bytes_mib, + elapsed, + read_speed, + decode_speed, + error_count, + ); if errors.load(Ordering::SeqCst) > 0 { bail!("chunks could not be verified"); @@ -195,7 +206,7 @@ fn verify_fixed_index( info: &FileInfo, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -212,7 +223,14 @@ fn verify_fixed_index( bail!("wrong index checksum"); } - verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) + verify_index_chunks( + datastore, + Box::new(index), + verified_chunks, + corrupt_chunks, + info.chunk_crypt_mode(), + worker, + ) } fn verify_dynamic_index( @@ -221,7 +239,7 @@ fn verify_dynamic_index( info: &FileInfo, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -238,7 +256,14 @@ fn verify_dynamic_index( bail!("wrong index checksum"); } - verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) + verify_index_chunks( + datastore, + Box::new(index), + verified_chunks, + corrupt_chunks, + info.chunk_crypt_mode(), + worker, + ) } /// Verify a single backup snapshot @@ -255,25 +280,32 @@ pub fn verify_backup_dir( backup_dir: &BackupDir, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc + worker: Arc, + upid: UPID, ) -> Result { let mut manifest = match datastore.load_manifest(&backup_dir) { Ok((manifest, _)) => manifest, Err(err) => { - worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err)); + task_log!( + worker, + "verify {}:{} - manifest load error: {}", + datastore.name(), + backup_dir, + err, + ); return Ok(false); } }; - worker.log(format!("verify {}:{}", datastore.name(), backup_dir)); + task_log!(worker, "verify {}:{}", datastore.name(), backup_dir); let mut error_count = 0; let mut verify_result = VerifyState::Ok; for info in manifest.files() { let result = proxmox::try_block!({ - worker.log(format!(" check {}", info.filename)); + task_log!(worker, " check {}", info.filename); match archive_type(&info.filename)? { ArchiveType::FixedIndex => verify_fixed_index( @@ -297,11 +329,18 @@ pub fn verify_backup_dir( } }); - worker.fail_on_abort()?; + worker.check_abort()?; crate::tools::fail_on_shutdown()?; if let Err(err) = result { - worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err)); + task_log!( + worker, + "verify {}:{}/{} failed: {}", + datastore.name(), + backup_dir, + info.filename, + err, + ); error_count += 1; verify_result = VerifyState::Failed; } @@ -310,7 +349,7 @@ pub fn verify_backup_dir( let verify_state = SnapshotVerifyState { state: verify_result, - upid: worker.upid().clone(), + upid, }; manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?; datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?) @@ -332,19 +371,26 @@ pub fn verify_backup_group( verified_chunks: Arc>>, corrupt_chunks: Arc>>, progress: Option<(usize, usize)>, // (done, snapshot_count) - worker: Arc, + worker: Arc, + upid: &UPID, ) -> Result<(usize, Vec), Error> { let mut errors = Vec::new(); let mut list = match group.list_backups(&datastore.base_path()) { Ok(list) => list, Err(err) => { - worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err)); + task_log!( + worker, + "verify group {}:{} - unable to list backups: {}", + datastore.name(), + group, + err, + ); return Ok((0, errors)); } }; - worker.log(format!("verify group {}:{}", datastore.name(), group)); + task_log!(worker, "verify group {}:{}", datastore.name(), group); let (done, snapshot_count) = progress.unwrap_or((0, list.len())); @@ -352,13 +398,26 @@ pub fn verify_backup_group( BackupInfo::sort_list(&mut list, false); // newest first for info in list { count += 1; - if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ + if !verify_backup_dir( + datastore.clone(), + &info.backup_dir, + verified_chunks.clone(), + corrupt_chunks.clone(), + worker.clone(), + upid.clone(), + )? { errors.push(info.backup_dir.to_string()); } if snapshot_count != 0 { let pos = done + count; let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); - worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count)); + task_log!( + worker, + "percentage done: {:.2}% ({} of {} snapshots)", + percentage, + pos, + snapshot_count, + ); } } @@ -372,8 +431,11 @@ pub fn verify_backup_group( /// Returns /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_all_backups(datastore: Arc, worker: Arc) -> Result, Error> { - +pub fn verify_all_backups( + datastore: Arc, + worker: Arc, + upid: &UPID, +) -> Result, Error> { let mut errors = Vec::new(); let mut list = match BackupGroup::list_groups(&datastore.base_path()) { @@ -382,7 +444,12 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) .collect::>(), Err(err) => { - worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err)); + task_log!( + worker, + "verify datastore {} - unable to list backups: {}", + datastore.name(), + err, + ); return Ok(errors); } }; @@ -400,7 +467,7 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> // start with 64 chunks since we assume there are few corrupt ones let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); - worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count)); + task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count); let mut done = 0; for group in list { @@ -411,6 +478,7 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> corrupt_chunks.clone(), Some((done, snapshot_count)), worker.clone(), + upid, )?; errors.append(&mut group_errors); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 67fbc541..b28ac035 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -306,7 +306,7 @@ async fn schedule_datastore_garbage_collection() { worker.log(format!("starting garbage collection on store {}", store)); worker.log(format!("task triggered by schedule '{}'", event_str)); - let result = datastore.garbage_collection(&worker); + let result = datastore.garbage_collection(&*worker, worker.upid()); let status = worker.create_state(&result); @@ -557,7 +557,8 @@ async fn schedule_datastore_verification() { worker.log(format!("starting verification on store {}", store2)); worker.log(format!("task triggered by schedule '{}'", event_str)); let result = try_block!({ - let failed_dirs = verify_all_backups(datastore, worker.clone())?; + let failed_dirs = + verify_all_backups(datastore, worker.clone(), worker.upid())?; if failed_dirs.len() > 0 { worker.log("Failed to verify following snapshots:"); for dir in failed_dirs {