From c894909e171d3634daef9e16f8a35a71592a9b89 Mon Sep 17 00:00:00 2001 From: Thomas Lamprecht Date: Wed, 14 Apr 2021 12:53:19 +0200 Subject: [PATCH] verify: partially rust fmt Signed-off-by: Thomas Lamprecht --- src/backup/verify.rs | 151 +++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 85 deletions(-) diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 9173bd9d..e5a2a4de 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -1,8 +1,8 @@ -use std::collections::HashSet; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{Ordering, AtomicUsize}; -use std::time::Instant; use nix::dir::Dir; +use std::collections::HashSet; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; use anyhow::{bail, format_err, Error}; @@ -25,8 +25,8 @@ use crate::{ server::UPID, task::TaskState, task_log, - tools::ParallelHandler, tools::fs::lock_dir_noblock_shared, + tools::ParallelHandler, }; /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have @@ -34,8 +34,8 @@ use crate::{ pub struct VerifyWorker { worker: Arc, datastore: Arc, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, } impl VerifyWorker { @@ -45,15 +45,18 @@ impl VerifyWorker { worker, datastore, // start with 16k chunks == up to 64G data - verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16*1024))), + verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))), // start with 64 chunks since we assume there are few corrupt ones corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))), } } } -fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { - +fn verify_blob( + datastore: Arc, + backup_dir: &BackupDir, + info: &FileInfo, +) -> Result<(), Error> { let blob = datastore.load_blob(backup_dir, &info.filename)?; let raw_size = blob.raw_size(); @@ -88,7 +91,11 @@ fn rename_corrupted_chunk( let mut new_path = path.clone(); loop { new_path.set_file_name(format!("{}.{}.bad", digest_str, counter)); - if new_path.exists() && counter < 9 { counter += 1; } else { break; } + if new_path.exists() && counter < 9 { + counter += 1; + } else { + break; + } } match std::fs::rename(&path, &new_path) { @@ -109,7 +116,6 @@ fn verify_index_chunks( index: Box, crypt_mode: CryptMode, ) -> Result<(), Error> { - let errors = Arc::new(AtomicUsize::new(0)); let start_time = Instant::now(); @@ -124,8 +130,9 @@ fn verify_index_chunks( let errors2 = Arc::clone(&errors); let decoder_pool = ParallelHandler::new( - "verify chunk decoder", 4, - move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { + "verify chunk decoder", + 4, + move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { corrupt_chunks2.lock().unwrap().insert(digest); @@ -186,7 +193,11 @@ fn verify_index_chunks( verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest); task_log!(verify_worker.worker, "can't verify chunk, stat failed - {}", err); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker); + rename_corrupted_chunk( + verify_worker.datastore.clone(), + &info.digest, + &verify_worker.worker, + ); } Ok(metadata) => { chunk_list.push((pos, metadata.ino())); @@ -194,9 +205,7 @@ fn verify_index_chunks( } } - chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| { - ino_a.cmp(&ino_b) - }); + chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(&ino_b)); for (pos, _) in chunk_list { verify_worker.worker.check_abort()?; @@ -209,7 +218,11 @@ fn verify_index_chunks( verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest); task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker); + rename_corrupted_chunk( + verify_worker.datastore.clone(), + &info.digest, + &verify_worker.worker, + ); } Ok(chunk) => { let size = info.size(); @@ -224,11 +237,11 @@ fn verify_index_chunks( let elapsed = start_time.elapsed().as_secs_f64(); - let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0); - let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0); + let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); + let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); - let read_speed = read_bytes_mib/elapsed; - let decode_speed = decoded_bytes_mib/elapsed; + let read_speed = read_bytes_mib / elapsed; + let decode_speed = decoded_bytes_mib / elapsed; let error_count = errors.load(Ordering::SeqCst); @@ -255,7 +268,6 @@ fn verify_fixed_index( backup_dir: &BackupDir, info: &FileInfo, ) -> Result<(), Error> { - let mut path = backup_dir.relative_path(); path.push(&info.filename); @@ -270,11 +282,7 @@ fn verify_fixed_index( bail!("wrong index checksum"); } - verify_index_chunks( - verify_worker, - Box::new(index), - info.chunk_crypt_mode(), - ) + verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode()) } fn verify_dynamic_index( @@ -282,7 +290,6 @@ fn verify_dynamic_index( backup_dir: &BackupDir, info: &FileInfo, ) -> Result<(), Error> { - let mut path = backup_dir.relative_path(); path.push(&info.filename); @@ -297,11 +304,7 @@ fn verify_dynamic_index( bail!("wrong index checksum"); } - verify_index_chunks( - verify_worker, - Box::new(index), - info.chunk_crypt_mode(), - ) + verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode()) } /// Verify a single backup snapshot @@ -322,15 +325,12 @@ pub fn verify_backup_dir( let snap_lock = lock_dir_noblock_shared( &verify_worker.datastore.snapshot_path(&backup_dir), "snapshot", - "locked by another operation"); + "locked by another operation", + ); match snap_lock { - Ok(snap_lock) => verify_backup_dir_with_lock( - verify_worker, - backup_dir, - upid, - filter, - snap_lock - ), + Ok(snap_lock) => { + verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock) + } Err(err) => { task_log!( verify_worker.worker, @@ -387,19 +387,11 @@ pub fn verify_backup_dir_with_lock( let result = proxmox::try_block!({ task_log!(verify_worker.worker, " check {}", info.filename); match archive_type(&info.filename)? { - ArchiveType::FixedIndex => - verify_fixed_index( - verify_worker, - &backup_dir, - info, - ), - ArchiveType::DynamicIndex => - verify_dynamic_index( - verify_worker, - &backup_dir, - info, - ), - ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), &backup_dir, info), + ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info), + ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info), + ArchiveType::Blob => { + verify_blob(verify_worker.datastore.clone(), &backup_dir, info) + } } }); @@ -418,7 +410,6 @@ pub fn verify_backup_dir_with_lock( error_count += 1; verify_result = VerifyState::Failed; } - } let verify_state = SnapshotVerifyState { @@ -426,9 +417,12 @@ pub fn verify_backup_dir_with_lock( upid, }; let verify_state = serde_json::to_value(verify_state)?; - verify_worker.datastore.update_manifest(&backup_dir, |manifest| { - manifest.unprotected["verify_state"] = verify_state; - }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?; + verify_worker + .datastore + .update_manifest(&backup_dir, |manifest| { + manifest.unprotected["verify_state"] = verify_state; + }) + .map_err(|err| format_err!("unable to update manifest blob - {}", err))?; Ok(error_count == 0) } @@ -447,7 +441,6 @@ pub fn verify_backup_group( upid: &UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, ) -> Result, Error> { - let mut errors = Vec::new(); let mut list = match group.list_backups(&verify_worker.datastore.base_path()) { Ok(list) => list, @@ -464,26 +457,23 @@ pub fn verify_backup_group( }; let snapshot_count = list.len(); - task_log!(verify_worker.worker, "verify group {}:{} ({} snapshots)", verify_worker.datastore.name(), group, snapshot_count); + task_log!( + verify_worker.worker, + "verify group {}:{} ({} snapshots)", + verify_worker.datastore.name(), + group, + snapshot_count + ); progress.group_snapshots = snapshot_count as u64; BackupInfo::sort_list(&mut list, false); // newest first for (pos, info) in list.into_iter().enumerate() { - if !verify_backup_dir( - verify_worker, - &info.backup_dir, - upid.clone(), - filter, - )? { + if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? { errors.push(info.backup_dir.to_string()); } progress.done_snapshots = pos as u64 + 1; - task_log!( - verify_worker.worker, - "percentage done: {}", - progress - ); + task_log!(verify_worker.worker, "percentage done: {}", progress); } Ok(errors) @@ -547,11 +537,7 @@ pub fn verify_all_backups( .filter(filter_by_owner) .collect::>(), Err(err) => { - task_log!( - worker, - "unable to list backups: {}", - err, - ); + task_log!(worker, "unable to list backups: {}", err,); return Ok(errors); } }; @@ -568,13 +554,8 @@ pub fn verify_all_backups( progress.done_snapshots = 0; progress.group_snapshots = 0; - let mut group_errors = verify_backup_group( - verify_worker, - &group, - &mut progress, - upid, - filter, - )?; + let mut group_errors = + verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?; errors.append(&mut group_errors); }