verify: partially rust fmt
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
parent
7f394c807b
commit
c894909e17
|
@ -1,8 +1,8 @@
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::sync::atomic::{Ordering, AtomicUsize};
|
|
||||||
use std::time::Instant;
|
|
||||||
use nix::dir::Dir;
|
use nix::dir::Dir;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
|
|
||||||
|
@ -25,8 +25,8 @@ use crate::{
|
||||||
server::UPID,
|
server::UPID,
|
||||||
task::TaskState,
|
task::TaskState,
|
||||||
task_log,
|
task_log,
|
||||||
tools::ParallelHandler,
|
|
||||||
tools::fs::lock_dir_noblock_shared,
|
tools::fs::lock_dir_noblock_shared,
|
||||||
|
tools::ParallelHandler,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
|
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
|
||||||
|
@ -34,8 +34,8 @@ use crate::{
|
||||||
pub struct VerifyWorker {
|
pub struct VerifyWorker {
|
||||||
worker: Arc<dyn TaskState + Send + Sync>,
|
worker: Arc<dyn TaskState + Send + Sync>,
|
||||||
datastore: Arc<DataStore>,
|
datastore: Arc<DataStore>,
|
||||||
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VerifyWorker {
|
impl VerifyWorker {
|
||||||
|
@ -45,15 +45,18 @@ impl VerifyWorker {
|
||||||
worker,
|
worker,
|
||||||
datastore,
|
datastore,
|
||||||
// start with 16k chunks == up to 64G data
|
// start with 16k chunks == up to 64G data
|
||||||
verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16*1024))),
|
verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
|
||||||
// start with 64 chunks since we assume there are few corrupt ones
|
// start with 64 chunks since we assume there are few corrupt ones
|
||||||
corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
|
corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
|
fn verify_blob(
|
||||||
|
datastore: Arc<DataStore>,
|
||||||
|
backup_dir: &BackupDir,
|
||||||
|
info: &FileInfo,
|
||||||
|
) -> Result<(), Error> {
|
||||||
let blob = datastore.load_blob(backup_dir, &info.filename)?;
|
let blob = datastore.load_blob(backup_dir, &info.filename)?;
|
||||||
|
|
||||||
let raw_size = blob.raw_size();
|
let raw_size = blob.raw_size();
|
||||||
|
@ -88,7 +91,11 @@ fn rename_corrupted_chunk(
|
||||||
let mut new_path = path.clone();
|
let mut new_path = path.clone();
|
||||||
loop {
|
loop {
|
||||||
new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
|
new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
|
||||||
if new_path.exists() && counter < 9 { counter += 1; } else { break; }
|
if new_path.exists() && counter < 9 {
|
||||||
|
counter += 1;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match std::fs::rename(&path, &new_path) {
|
match std::fs::rename(&path, &new_path) {
|
||||||
|
@ -109,7 +116,6 @@ fn verify_index_chunks(
|
||||||
index: Box<dyn IndexFile + Send>,
|
index: Box<dyn IndexFile + Send>,
|
||||||
crypt_mode: CryptMode,
|
crypt_mode: CryptMode,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let errors = Arc::new(AtomicUsize::new(0));
|
let errors = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
|
@ -124,8 +130,9 @@ fn verify_index_chunks(
|
||||||
let errors2 = Arc::clone(&errors);
|
let errors2 = Arc::clone(&errors);
|
||||||
|
|
||||||
let decoder_pool = ParallelHandler::new(
|
let decoder_pool = ParallelHandler::new(
|
||||||
"verify chunk decoder", 4,
|
"verify chunk decoder",
|
||||||
move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
|
4,
|
||||||
|
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
|
||||||
let chunk_crypt_mode = match chunk.crypt_mode() {
|
let chunk_crypt_mode = match chunk.crypt_mode() {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
corrupt_chunks2.lock().unwrap().insert(digest);
|
corrupt_chunks2.lock().unwrap().insert(digest);
|
||||||
|
@ -186,7 +193,11 @@ fn verify_index_chunks(
|
||||||
verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
|
verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
|
||||||
task_log!(verify_worker.worker, "can't verify chunk, stat failed - {}", err);
|
task_log!(verify_worker.worker, "can't verify chunk, stat failed - {}", err);
|
||||||
errors.fetch_add(1, Ordering::SeqCst);
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker);
|
rename_corrupted_chunk(
|
||||||
|
verify_worker.datastore.clone(),
|
||||||
|
&info.digest,
|
||||||
|
&verify_worker.worker,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(metadata) => {
|
Ok(metadata) => {
|
||||||
chunk_list.push((pos, metadata.ino()));
|
chunk_list.push((pos, metadata.ino()));
|
||||||
|
@ -194,9 +205,7 @@ fn verify_index_chunks(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| {
|
chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(&ino_b));
|
||||||
ino_a.cmp(&ino_b)
|
|
||||||
});
|
|
||||||
|
|
||||||
for (pos, _) in chunk_list {
|
for (pos, _) in chunk_list {
|
||||||
verify_worker.worker.check_abort()?;
|
verify_worker.worker.check_abort()?;
|
||||||
|
@ -209,7 +218,11 @@ fn verify_index_chunks(
|
||||||
verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
|
verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
|
||||||
task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
|
task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
|
||||||
errors.fetch_add(1, Ordering::SeqCst);
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker);
|
rename_corrupted_chunk(
|
||||||
|
verify_worker.datastore.clone(),
|
||||||
|
&info.digest,
|
||||||
|
&verify_worker.worker,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(chunk) => {
|
Ok(chunk) => {
|
||||||
let size = info.size();
|
let size = info.size();
|
||||||
|
@ -224,11 +237,11 @@ fn verify_index_chunks(
|
||||||
|
|
||||||
let elapsed = start_time.elapsed().as_secs_f64();
|
let elapsed = start_time.elapsed().as_secs_f64();
|
||||||
|
|
||||||
let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
|
let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
|
||||||
let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
|
let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
|
||||||
|
|
||||||
let read_speed = read_bytes_mib/elapsed;
|
let read_speed = read_bytes_mib / elapsed;
|
||||||
let decode_speed = decoded_bytes_mib/elapsed;
|
let decode_speed = decoded_bytes_mib / elapsed;
|
||||||
|
|
||||||
let error_count = errors.load(Ordering::SeqCst);
|
let error_count = errors.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
@ -255,7 +268,6 @@ fn verify_fixed_index(
|
||||||
backup_dir: &BackupDir,
|
backup_dir: &BackupDir,
|
||||||
info: &FileInfo,
|
info: &FileInfo,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut path = backup_dir.relative_path();
|
let mut path = backup_dir.relative_path();
|
||||||
path.push(&info.filename);
|
path.push(&info.filename);
|
||||||
|
|
||||||
|
@ -270,11 +282,7 @@ fn verify_fixed_index(
|
||||||
bail!("wrong index checksum");
|
bail!("wrong index checksum");
|
||||||
}
|
}
|
||||||
|
|
||||||
verify_index_chunks(
|
verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
|
||||||
verify_worker,
|
|
||||||
Box::new(index),
|
|
||||||
info.chunk_crypt_mode(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_dynamic_index(
|
fn verify_dynamic_index(
|
||||||
|
@ -282,7 +290,6 @@ fn verify_dynamic_index(
|
||||||
backup_dir: &BackupDir,
|
backup_dir: &BackupDir,
|
||||||
info: &FileInfo,
|
info: &FileInfo,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut path = backup_dir.relative_path();
|
let mut path = backup_dir.relative_path();
|
||||||
path.push(&info.filename);
|
path.push(&info.filename);
|
||||||
|
|
||||||
|
@ -297,11 +304,7 @@ fn verify_dynamic_index(
|
||||||
bail!("wrong index checksum");
|
bail!("wrong index checksum");
|
||||||
}
|
}
|
||||||
|
|
||||||
verify_index_chunks(
|
verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
|
||||||
verify_worker,
|
|
||||||
Box::new(index),
|
|
||||||
info.chunk_crypt_mode(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Verify a single backup snapshot
|
/// Verify a single backup snapshot
|
||||||
|
@ -322,15 +325,12 @@ pub fn verify_backup_dir(
|
||||||
let snap_lock = lock_dir_noblock_shared(
|
let snap_lock = lock_dir_noblock_shared(
|
||||||
&verify_worker.datastore.snapshot_path(&backup_dir),
|
&verify_worker.datastore.snapshot_path(&backup_dir),
|
||||||
"snapshot",
|
"snapshot",
|
||||||
"locked by another operation");
|
"locked by another operation",
|
||||||
|
);
|
||||||
match snap_lock {
|
match snap_lock {
|
||||||
Ok(snap_lock) => verify_backup_dir_with_lock(
|
Ok(snap_lock) => {
|
||||||
verify_worker,
|
verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
|
||||||
backup_dir,
|
}
|
||||||
upid,
|
|
||||||
filter,
|
|
||||||
snap_lock
|
|
||||||
),
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
task_log!(
|
task_log!(
|
||||||
verify_worker.worker,
|
verify_worker.worker,
|
||||||
|
@ -387,19 +387,11 @@ pub fn verify_backup_dir_with_lock(
|
||||||
let result = proxmox::try_block!({
|
let result = proxmox::try_block!({
|
||||||
task_log!(verify_worker.worker, " check {}", info.filename);
|
task_log!(verify_worker.worker, " check {}", info.filename);
|
||||||
match archive_type(&info.filename)? {
|
match archive_type(&info.filename)? {
|
||||||
ArchiveType::FixedIndex =>
|
ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
|
||||||
verify_fixed_index(
|
ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
|
||||||
verify_worker,
|
ArchiveType::Blob => {
|
||||||
&backup_dir,
|
verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
|
||||||
info,
|
}
|
||||||
),
|
|
||||||
ArchiveType::DynamicIndex =>
|
|
||||||
verify_dynamic_index(
|
|
||||||
verify_worker,
|
|
||||||
&backup_dir,
|
|
||||||
info,
|
|
||||||
),
|
|
||||||
ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), &backup_dir, info),
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -418,7 +410,6 @@ pub fn verify_backup_dir_with_lock(
|
||||||
error_count += 1;
|
error_count += 1;
|
||||||
verify_result = VerifyState::Failed;
|
verify_result = VerifyState::Failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let verify_state = SnapshotVerifyState {
|
let verify_state = SnapshotVerifyState {
|
||||||
|
@ -426,9 +417,12 @@ pub fn verify_backup_dir_with_lock(
|
||||||
upid,
|
upid,
|
||||||
};
|
};
|
||||||
let verify_state = serde_json::to_value(verify_state)?;
|
let verify_state = serde_json::to_value(verify_state)?;
|
||||||
verify_worker.datastore.update_manifest(&backup_dir, |manifest| {
|
verify_worker
|
||||||
manifest.unprotected["verify_state"] = verify_state;
|
.datastore
|
||||||
}).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
|
.update_manifest(&backup_dir, |manifest| {
|
||||||
|
manifest.unprotected["verify_state"] = verify_state;
|
||||||
|
})
|
||||||
|
.map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
|
||||||
|
|
||||||
Ok(error_count == 0)
|
Ok(error_count == 0)
|
||||||
}
|
}
|
||||||
|
@ -447,7 +441,6 @@ pub fn verify_backup_group(
|
||||||
upid: &UPID,
|
upid: &UPID,
|
||||||
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
|
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
|
||||||
) -> Result<Vec<String>, Error> {
|
) -> Result<Vec<String>, Error> {
|
||||||
|
|
||||||
let mut errors = Vec::new();
|
let mut errors = Vec::new();
|
||||||
let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
|
let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
|
||||||
Ok(list) => list,
|
Ok(list) => list,
|
||||||
|
@ -464,26 +457,23 @@ pub fn verify_backup_group(
|
||||||
};
|
};
|
||||||
|
|
||||||
let snapshot_count = list.len();
|
let snapshot_count = list.len();
|
||||||
task_log!(verify_worker.worker, "verify group {}:{} ({} snapshots)", verify_worker.datastore.name(), group, snapshot_count);
|
task_log!(
|
||||||
|
verify_worker.worker,
|
||||||
|
"verify group {}:{} ({} snapshots)",
|
||||||
|
verify_worker.datastore.name(),
|
||||||
|
group,
|
||||||
|
snapshot_count
|
||||||
|
);
|
||||||
|
|
||||||
progress.group_snapshots = snapshot_count as u64;
|
progress.group_snapshots = snapshot_count as u64;
|
||||||
|
|
||||||
BackupInfo::sort_list(&mut list, false); // newest first
|
BackupInfo::sort_list(&mut list, false); // newest first
|
||||||
for (pos, info) in list.into_iter().enumerate() {
|
for (pos, info) in list.into_iter().enumerate() {
|
||||||
if !verify_backup_dir(
|
if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
|
||||||
verify_worker,
|
|
||||||
&info.backup_dir,
|
|
||||||
upid.clone(),
|
|
||||||
filter,
|
|
||||||
)? {
|
|
||||||
errors.push(info.backup_dir.to_string());
|
errors.push(info.backup_dir.to_string());
|
||||||
}
|
}
|
||||||
progress.done_snapshots = pos as u64 + 1;
|
progress.done_snapshots = pos as u64 + 1;
|
||||||
task_log!(
|
task_log!(verify_worker.worker, "percentage done: {}", progress);
|
||||||
verify_worker.worker,
|
|
||||||
"percentage done: {}",
|
|
||||||
progress
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(errors)
|
Ok(errors)
|
||||||
|
@ -547,11 +537,7 @@ pub fn verify_all_backups(
|
||||||
.filter(filter_by_owner)
|
.filter(filter_by_owner)
|
||||||
.collect::<Vec<BackupGroup>>(),
|
.collect::<Vec<BackupGroup>>(),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
task_log!(
|
task_log!(worker, "unable to list backups: {}", err,);
|
||||||
worker,
|
|
||||||
"unable to list backups: {}",
|
|
||||||
err,
|
|
||||||
);
|
|
||||||
return Ok(errors);
|
return Ok(errors);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -568,13 +554,8 @@ pub fn verify_all_backups(
|
||||||
progress.done_snapshots = 0;
|
progress.done_snapshots = 0;
|
||||||
progress.group_snapshots = 0;
|
progress.group_snapshots = 0;
|
||||||
|
|
||||||
let mut group_errors = verify_backup_group(
|
let mut group_errors =
|
||||||
verify_worker,
|
verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
|
||||||
&group,
|
|
||||||
&mut progress,
|
|
||||||
upid,
|
|
||||||
filter,
|
|
||||||
)?;
|
|
||||||
errors.append(&mut group_errors);
|
errors.append(&mut group_errors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue