diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index ba8f3417..3d5b6af6 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -3,7 +3,6 @@ use std::collections::HashSet; use std::ffi::OsStr; use std::os::unix::ffi::OsStrExt; -use std::sync::{Arc, Mutex}; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -672,17 +671,12 @@ pub fn verify( auth_id.clone(), to_stdout, move |worker| { - let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); - let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); - + let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); let failed_dirs = if let Some(backup_dir) = backup_dir { let mut res = Vec::new(); if !verify_backup_dir( - datastore, + &verify_worker, &backup_dir, - verified_chunks, - corrupt_chunks, - worker.clone(), worker.upid().clone(), None, )? { @@ -691,12 +685,9 @@ pub fn verify( res } else if let Some(backup_group) = backup_group { let failed_dirs = verify_backup_group( - datastore, + &verify_worker, &backup_group, - verified_chunks, - corrupt_chunks, &mut StoreProgress::new(1), - worker.clone(), worker.upid(), None, )?; @@ -711,7 +702,7 @@ pub fn verify( None }; - verify_all_backups(datastore, worker.clone(), worker.upid(), owner, None)? + verify_all_backups(&verify_worker, worker.upid(), owner, None)? }; if !failed_dirs.is_empty() { worker.log("Failed to verify the following snapshots/groups:"); diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs index 38061816..c8f52b6e 100644 --- a/src/api2/backup/environment.rs +++ b/src/api2/backup/environment.rs @@ -1,6 +1,6 @@ use anyhow::{bail, format_err, Error}; use std::sync::{Arc, Mutex}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use nix::dir::Dir; use ::serde::{Serialize}; @@ -525,15 +525,11 @@ impl BackupEnvironment { move |worker| { worker.log("Automatically verifying newly added snapshot"); - let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); - let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); + let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); if !verify_backup_dir_with_lock( - datastore, + &verify_worker, &backup_dir, - verified_chunks, - corrupt_chunks, - worker.clone(), worker.upid().clone(), None, snap_lock, diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 5e4bc7fb..ac4a6c29 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -29,6 +29,29 @@ use crate::{ tools::fs::lock_dir_noblock_shared, }; +/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have +/// already been verified or detected as corrupt. +pub struct VerifyWorker { + worker: Arc, + datastore: Arc, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, +} + +impl VerifyWorker { + /// Creates a new VerifyWorker for a given task worker and datastore. + pub fn new(worker: Arc, datastore: Arc) -> Self { + Self { + worker, + datastore, + // start with 16k chunks == up to 64G data + verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16*1024))), + // start with 64 chunks since we assume there are few corrupt ones + corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))), + } + } +} + fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { let blob = datastore.load_blob(backup_dir, &info.filename)?; @@ -82,12 +105,9 @@ fn rename_corrupted_chunk( } fn verify_index_chunks( - datastore: Arc, + verify_worker: &VerifyWorker, index: Box, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, crypt_mode: CryptMode, - worker: Arc, ) -> Result<(), Error> { let errors = Arc::new(AtomicUsize::new(0)); @@ -97,10 +117,10 @@ fn verify_index_chunks( let mut read_bytes = 0; let mut decoded_bytes = 0; - let worker2 = Arc::clone(&worker); - let datastore2 = Arc::clone(&datastore); - let corrupt_chunks2 = Arc::clone(&corrupt_chunks); - let verified_chunks2 = Arc::clone(&verified_chunks); + let worker2 = Arc::clone(&verify_worker.worker); + let datastore2 = Arc::clone(&verify_worker.datastore); + let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks); + let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks); let errors2 = Arc::clone(&errors); let decoder_pool = ParallelHandler::new( @@ -141,29 +161,29 @@ fn verify_index_chunks( for pos in 0..index.index_count() { - worker.check_abort()?; + verify_worker.worker.check_abort()?; crate::tools::fail_on_shutdown()?; let info = index.chunk_info(pos).unwrap(); let size = info.size(); - if verified_chunks.lock().unwrap().contains(&info.digest) { + if verify_worker.verified_chunks.lock().unwrap().contains(&info.digest) { continue; // already verified } - if corrupt_chunks.lock().unwrap().contains(&info.digest) { + if verify_worker.corrupt_chunks.lock().unwrap().contains(&info.digest) { let digest_str = proxmox::tools::digest_to_hex(&info.digest); - task_log!(worker, "chunk {} was marked as corrupt", digest_str); + task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str); errors.fetch_add(1, Ordering::SeqCst); continue; } - match datastore.load_chunk(&info.digest) { + match verify_worker.datastore.load_chunk(&info.digest) { Err(err) => { - corrupt_chunks.lock().unwrap().insert(info.digest); - task_log!(worker, "can't verify chunk, load failed - {}", err); + verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest); + task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore.clone(), &info.digest, &worker); + rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker); continue; } Ok(chunk) => { @@ -187,7 +207,7 @@ fn verify_index_chunks( let error_count = errors.load(Ordering::SeqCst); task_log!( - worker, + verify_worker.worker, " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", read_bytes_mib, decoded_bytes_mib, @@ -205,18 +225,15 @@ fn verify_index_chunks( } fn verify_fixed_index( - datastore: Arc, + verify_worker: &VerifyWorker, backup_dir: &BackupDir, info: &FileInfo, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, - worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); path.push(&info.filename); - let index = datastore.open_fixed_reader(&path)?; + let index = verify_worker.datastore.open_fixed_reader(&path)?; let (csum, size) = index.compute_csum(); if size != info.size { @@ -228,28 +245,22 @@ fn verify_fixed_index( } verify_index_chunks( - datastore, + verify_worker, Box::new(index), - verified_chunks, - corrupt_chunks, info.chunk_crypt_mode(), - worker, ) } fn verify_dynamic_index( - datastore: Arc, + verify_worker: &VerifyWorker, backup_dir: &BackupDir, info: &FileInfo, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, - worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); path.push(&info.filename); - let index = datastore.open_dynamic_reader(&path)?; + let index = verify_worker.datastore.open_dynamic_reader(&path)?; let (csum, size) = index.compute_csum(); if size != info.size { @@ -261,12 +272,9 @@ fn verify_dynamic_index( } verify_index_chunks( - datastore, + verify_worker, Box::new(index), - verified_chunks, - corrupt_chunks, info.chunk_crypt_mode(), - worker, ) } @@ -280,34 +288,28 @@ fn verify_dynamic_index( /// - Ok(false) if there were verification errors /// - Err(_) if task was aborted pub fn verify_backup_dir( - datastore: Arc, + verify_worker: &VerifyWorker, backup_dir: &BackupDir, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, - worker: Arc, upid: UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, ) -> Result { let snap_lock = lock_dir_noblock_shared( - &datastore.snapshot_path(&backup_dir), + &verify_worker.datastore.snapshot_path(&backup_dir), "snapshot", "locked by another operation"); match snap_lock { Ok(snap_lock) => verify_backup_dir_with_lock( - datastore, + verify_worker, backup_dir, - verified_chunks, - corrupt_chunks, - worker, upid, filter, snap_lock ), Err(err) => { task_log!( - worker, + verify_worker.worker, "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}", - datastore.name(), + verify_worker.datastore.name(), backup_dir, err, ); @@ -318,22 +320,19 @@ pub fn verify_backup_dir( /// See verify_backup_dir pub fn verify_backup_dir_with_lock( - datastore: Arc, + verify_worker: &VerifyWorker, backup_dir: &BackupDir, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, - worker: Arc, upid: UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, _snap_lock: Dir, ) -> Result { - let manifest = match datastore.load_manifest(&backup_dir) { + let manifest = match verify_worker.datastore.load_manifest(&backup_dir) { Ok((manifest, _)) => manifest, Err(err) => { task_log!( - worker, + verify_worker.worker, "verify {}:{} - manifest load error: {}", - datastore.name(), + verify_worker.datastore.name(), backup_dir, err, ); @@ -344,54 +343,48 @@ pub fn verify_backup_dir_with_lock( if let Some(filter) = filter { if !filter(&manifest) { task_log!( - worker, + verify_worker.worker, "SKIPPED: verify {}:{} (recently verified)", - datastore.name(), + verify_worker.datastore.name(), backup_dir, ); return Ok(true); } } - task_log!(worker, "verify {}:{}", datastore.name(), backup_dir); + task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir); let mut error_count = 0; let mut verify_result = VerifyState::Ok; for info in manifest.files() { let result = proxmox::try_block!({ - task_log!(worker, " check {}", info.filename); + task_log!(verify_worker.worker, " check {}", info.filename); match archive_type(&info.filename)? { ArchiveType::FixedIndex => verify_fixed_index( - datastore.clone(), + verify_worker, &backup_dir, info, - verified_chunks.clone(), - corrupt_chunks.clone(), - worker.clone(), ), ArchiveType::DynamicIndex => verify_dynamic_index( - datastore.clone(), + verify_worker, &backup_dir, info, - verified_chunks.clone(), - corrupt_chunks.clone(), - worker.clone(), ), - ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info), + ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), &backup_dir, info), } }); - worker.check_abort()?; + verify_worker.worker.check_abort()?; crate::tools::fail_on_shutdown()?; if let Err(err) = result { task_log!( - worker, + verify_worker.worker, "verify {}:{}/{} failed: {}", - datastore.name(), + verify_worker.datastore.name(), backup_dir, info.filename, err, @@ -407,7 +400,7 @@ pub fn verify_backup_dir_with_lock( upid, }; let verify_state = serde_json::to_value(verify_state)?; - datastore.update_manifest(&backup_dir, |manifest| { + verify_worker.datastore.update_manifest(&backup_dir, |manifest| { manifest.unprotected["verify_state"] = verify_state; }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?; @@ -422,24 +415,21 @@ pub fn verify_backup_dir_with_lock( /// - Ok((count, failed_dirs)) where failed_dirs had verification errors /// - Err(_) if task was aborted pub fn verify_backup_group( - datastore: Arc, + verify_worker: &VerifyWorker, group: &BackupGroup, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, progress: &mut StoreProgress, - worker: Arc, upid: &UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, ) -> Result, Error> { let mut errors = Vec::new(); - let mut list = match group.list_backups(&datastore.base_path()) { + let mut list = match group.list_backups(&verify_worker.datastore.base_path()) { Ok(list) => list, Err(err) => { task_log!( - worker, + verify_worker.worker, "verify group {}:{} - unable to list backups: {}", - datastore.name(), + verify_worker.datastore.name(), group, err, ); @@ -448,18 +438,15 @@ pub fn verify_backup_group( }; let snapshot_count = list.len(); - task_log!(worker, "verify group {}:{} ({} snapshots)", datastore.name(), group, snapshot_count); + task_log!(verify_worker.worker, "verify group {}:{} ({} snapshots)", verify_worker.datastore.name(), group, snapshot_count); progress.group_snapshots = snapshot_count as u64; BackupInfo::sort_list(&mut list, false); // newest first for (pos, info) in list.into_iter().enumerate() { if !verify_backup_dir( - datastore.clone(), + verify_worker, &info.backup_dir, - verified_chunks.clone(), - corrupt_chunks.clone(), - worker.clone(), upid.clone(), filter, )? { @@ -467,7 +454,7 @@ pub fn verify_backup_group( } progress.done_snapshots = pos as u64 + 1; task_log!( - worker, + verify_worker.worker, "percentage done: {}", progress ); @@ -484,22 +471,22 @@ pub fn verify_backup_group( /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted pub fn verify_all_backups( - datastore: Arc, - worker: Arc, + verify_worker: &VerifyWorker, upid: &UPID, owner: Option, filter: Option<&dyn Fn(&BackupManifest) -> bool>, ) -> Result, Error> { let mut errors = Vec::new(); + let worker = Arc::clone(&verify_worker.worker); - task_log!(worker, "verify datastore {}", datastore.name()); + task_log!(worker, "verify datastore {}", verify_worker.datastore.name()); if let Some(owner) = &owner { task_log!(worker, "limiting to backups owned by {}", owner); } let filter_by_owner = |group: &BackupGroup| { - match (datastore.get_owner(group), &owner) { + match (verify_worker.datastore.get_owner(group), &owner) { (Ok(ref group_owner), Some(owner)) => { group_owner == owner || (group_owner.is_token() @@ -527,7 +514,7 @@ pub fn verify_all_backups( } }; - let mut list = match BackupInfo::list_backup_groups(&datastore.base_path()) { + let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) { Ok(list) => list .into_iter() .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) @@ -545,12 +532,6 @@ pub fn verify_all_backups( list.sort_unstable(); - // start with 16384 chunks (up to 65GB) - let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); - - // start with 64 chunks since we assume there are few corrupt ones - let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); - let group_count = list.len(); task_log!(worker, "found {} groups", group_count); @@ -562,12 +543,9 @@ pub fn verify_all_backups( progress.group_snapshots = 0; let mut group_errors = verify_backup_group( - datastore.clone(), + verify_worker, &group, - verified_chunks.clone(), - corrupt_chunks.clone(), &mut progress, - worker.clone(), upid, filter, )?; diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs index ca6eb554..1dd8baa7 100644 --- a/src/server/verify_job.rs +++ b/src/server/verify_job.rs @@ -67,7 +67,8 @@ pub fn do_verification_job( task_log!(worker,"task triggered by schedule '{}'", event_str); } - let result = verify_all_backups(datastore, worker.clone(), worker.upid(), None, Some(&filter)); + let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); + let result = verify_all_backups(&verify_worker, worker.upid(), None, Some(&filter)); let job_result = match result { Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()), Ok(ref failed_dirs) => {