src/backup/verify.rs: use separate thread to load data
This commit is contained in:
parent
afe08d2755
commit
6b809ff59b
|
@ -1,6 +1,7 @@
|
||||||
use std::collections::{HashSet, HashMap};
|
use std::collections::{HashSet, HashMap};
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
use std::os::unix::ffi::OsStrExt;
|
use std::os::unix::ffi::OsStrExt;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use futures::*;
|
use futures::*;
|
||||||
|
@ -513,17 +514,17 @@ pub fn verify(
|
||||||
to_stdout,
|
to_stdout,
|
||||||
move |worker| {
|
move |worker| {
|
||||||
let failed_dirs = if let Some(backup_dir) = backup_dir {
|
let failed_dirs = if let Some(backup_dir) = backup_dir {
|
||||||
let mut verified_chunks = HashSet::with_capacity(1024*16);
|
let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
|
||||||
let mut corrupt_chunks = HashSet::with_capacity(64);
|
let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
|
||||||
let mut res = Vec::new();
|
let mut res = Vec::new();
|
||||||
if !verify_backup_dir(&datastore, &backup_dir, &mut verified_chunks, &mut corrupt_chunks, &worker)? {
|
if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? {
|
||||||
res.push(backup_dir.to_string());
|
res.push(backup_dir.to_string());
|
||||||
}
|
}
|
||||||
res
|
res
|
||||||
} else if let Some(backup_group) = backup_group {
|
} else if let Some(backup_group) = backup_group {
|
||||||
verify_backup_group(&datastore, &backup_group, &worker)?
|
verify_backup_group(datastore, &backup_group, worker.clone())?
|
||||||
} else {
|
} else {
|
||||||
verify_all_backups(&datastore, &worker)?
|
verify_all_backups(datastore, worker.clone())?
|
||||||
};
|
};
|
||||||
if failed_dirs.len() > 0 {
|
if failed_dirs.len() > 0 {
|
||||||
worker.log("Failed to verify following snapshots:");
|
worker.log("Failed to verify following snapshots:");
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::sync::atomic::{Ordering, AtomicUsize};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
|
|
||||||
|
@ -6,12 +9,12 @@ use crate::server::WorkerTask;
|
||||||
use crate::api2::types::*;
|
use crate::api2::types::*;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
DataStore, BackupGroup, BackupDir, BackupInfo, IndexFile,
|
DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile,
|
||||||
CryptMode,
|
CryptMode,
|
||||||
FileInfo, ArchiveType, archive_type,
|
FileInfo, ArchiveType, archive_type,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
|
fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
|
||||||
|
|
||||||
let blob = datastore.load_blob(backup_dir, &info.filename)?;
|
let blob = datastore.load_blob(backup_dir, &info.filename)?;
|
||||||
|
|
||||||
|
@ -36,48 +39,96 @@ fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) -
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We use a separate thread to read/load chunks, so that we can do
|
||||||
|
// load and verify in parallel to increase performance.
|
||||||
|
fn chunk_reader_thread(
|
||||||
|
datastore: Arc<DataStore>,
|
||||||
|
index: Box<dyn IndexFile + Send>,
|
||||||
|
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
|
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
|
errors: Arc<AtomicUsize>,
|
||||||
|
worker: Arc<WorkerTask>,
|
||||||
|
) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
|
||||||
|
|
||||||
|
let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
|
||||||
|
|
||||||
|
std::thread::spawn(move|| {
|
||||||
|
for pos in 0..index.index_count() {
|
||||||
|
let info = index.chunk_info(pos).unwrap();
|
||||||
|
let size = info.range.end - info.range.start;
|
||||||
|
|
||||||
|
if verified_chunks.lock().unwrap().contains(&info.digest) {
|
||||||
|
continue; // already verified
|
||||||
|
}
|
||||||
|
|
||||||
|
if corrupt_chunks.lock().unwrap().contains(&info.digest) {
|
||||||
|
let digest_str = proxmox::tools::digest_to_hex(&info.digest);
|
||||||
|
worker.log(format!("chunk {} was marked as corrupt", digest_str));
|
||||||
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match datastore.load_chunk(&info.digest) {
|
||||||
|
Err(err) => {
|
||||||
|
corrupt_chunks.lock().unwrap().insert(info.digest);
|
||||||
|
worker.log(format!("can't verify chunk, load failed - {}", err));
|
||||||
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(chunk) => {
|
||||||
|
if sender.send((chunk, info.digest, size)).is_err() {
|
||||||
|
break; // receiver gone - simply stop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
receiver
|
||||||
|
}
|
||||||
|
|
||||||
fn verify_index_chunks(
|
fn verify_index_chunks(
|
||||||
datastore: &DataStore,
|
datastore: Arc<DataStore>,
|
||||||
index: Box<dyn IndexFile>,
|
index: Box<dyn IndexFile + Send>,
|
||||||
verified_chunks: &mut HashSet<[u8;32]>,
|
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
corrupt_chunks: &mut HashSet<[u8; 32]>,
|
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
crypt_mode: CryptMode,
|
crypt_mode: CryptMode,
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut errors = 0;
|
let errors = Arc::new(AtomicUsize::new(0));
|
||||||
for pos in 0..index.index_count() {
|
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
|
let chunk_channel = chunk_reader_thread(
|
||||||
|
datastore,
|
||||||
|
index,
|
||||||
|
verified_chunks.clone(),
|
||||||
|
corrupt_chunks.clone(),
|
||||||
|
errors.clone(),
|
||||||
|
worker.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut read_bytes = 0;
|
||||||
|
let mut decoded_bytes = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
|
||||||
worker.fail_on_abort()?;
|
worker.fail_on_abort()?;
|
||||||
|
|
||||||
let info = index.chunk_info(pos).unwrap();
|
let (chunk, digest, size) = match chunk_channel.recv() {
|
||||||
|
Ok(tuple) => tuple,
|
||||||
if verified_chunks.contains(&info.digest) {
|
Err(std::sync::mpsc::RecvError) => break,
|
||||||
continue; // already verified
|
|
||||||
}
|
|
||||||
|
|
||||||
if corrupt_chunks.contains(&info.digest) {
|
|
||||||
let digest_str = proxmox::tools::digest_to_hex(&info.digest);
|
|
||||||
worker.log(format!("chunk {} was marked as corrupt", digest_str));
|
|
||||||
errors += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let chunk = match datastore.load_chunk(&info.digest) {
|
|
||||||
Err(err) => {
|
|
||||||
corrupt_chunks.insert(info.digest);
|
|
||||||
worker.log(format!("can't verify chunk, load failed - {}", err));
|
|
||||||
errors += 1;
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
Ok(chunk) => chunk,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
read_bytes += chunk.raw_size();
|
||||||
|
decoded_bytes += size;
|
||||||
|
|
||||||
let chunk_crypt_mode = match chunk.crypt_mode() {
|
let chunk_crypt_mode = match chunk.crypt_mode() {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
corrupt_chunks.insert(info.digest);
|
corrupt_chunks.lock().unwrap().insert(digest);
|
||||||
worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
|
worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
|
||||||
errors += 1;
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
Ok(mode) => mode,
|
Ok(mode) => mode,
|
||||||
|
@ -89,21 +140,32 @@ fn verify_index_chunks(
|
||||||
chunk_crypt_mode,
|
chunk_crypt_mode,
|
||||||
crypt_mode
|
crypt_mode
|
||||||
));
|
));
|
||||||
errors += 1;
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
|
||||||
let size = info.range.end - info.range.start;
|
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
|
||||||
|
corrupt_chunks.lock().unwrap().insert(digest);
|
||||||
if let Err(err) = chunk.verify_unencrypted(size as usize, &info.digest) {
|
|
||||||
corrupt_chunks.insert(info.digest);
|
|
||||||
worker.log(format!("{}", err));
|
worker.log(format!("{}", err));
|
||||||
errors += 1;
|
errors.fetch_add(1, Ordering::SeqCst);
|
||||||
} else {
|
} else {
|
||||||
verified_chunks.insert(info.digest);
|
verified_chunks.lock().unwrap().insert(digest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors > 0 {
|
let elapsed = start_time.elapsed().as_secs_f64();
|
||||||
|
|
||||||
|
let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
|
||||||
|
let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
|
||||||
|
|
||||||
|
let read_speed = read_bytes_mib/elapsed;
|
||||||
|
let decode_speed = decoded_bytes_mib/elapsed;
|
||||||
|
|
||||||
|
let error_count = errors.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
worker.log(format!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)",
|
||||||
|
read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
|
||||||
|
|
||||||
|
if errors.load(Ordering::SeqCst) > 0 {
|
||||||
bail!("chunks could not be verified");
|
bail!("chunks could not be verified");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,12 +173,12 @@ fn verify_index_chunks(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_fixed_index(
|
fn verify_fixed_index(
|
||||||
datastore: &DataStore,
|
datastore: Arc<DataStore>,
|
||||||
backup_dir: &BackupDir,
|
backup_dir: &BackupDir,
|
||||||
info: &FileInfo,
|
info: &FileInfo,
|
||||||
verified_chunks: &mut HashSet<[u8;32]>,
|
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
corrupt_chunks: &mut HashSet<[u8;32]>,
|
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut path = backup_dir.relative_path();
|
let mut path = backup_dir.relative_path();
|
||||||
|
@ -137,12 +199,12 @@ fn verify_fixed_index(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_dynamic_index(
|
fn verify_dynamic_index(
|
||||||
datastore: &DataStore,
|
datastore: Arc<DataStore>,
|
||||||
backup_dir: &BackupDir,
|
backup_dir: &BackupDir,
|
||||||
info: &FileInfo,
|
info: &FileInfo,
|
||||||
verified_chunks: &mut HashSet<[u8;32]>,
|
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
corrupt_chunks: &mut HashSet<[u8;32]>,
|
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut path = backup_dir.relative_path();
|
let mut path = backup_dir.relative_path();
|
||||||
|
@ -172,11 +234,11 @@ fn verify_dynamic_index(
|
||||||
/// - Ok(false) if there were verification errors
|
/// - Ok(false) if there were verification errors
|
||||||
/// - Err(_) if task was aborted
|
/// - Err(_) if task was aborted
|
||||||
pub fn verify_backup_dir(
|
pub fn verify_backup_dir(
|
||||||
datastore: &DataStore,
|
datastore: Arc<DataStore>,
|
||||||
backup_dir: &BackupDir,
|
backup_dir: &BackupDir,
|
||||||
verified_chunks: &mut HashSet<[u8;32]>,
|
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
corrupt_chunks: &mut HashSet<[u8;32]>,
|
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
worker: &WorkerTask
|
worker: Arc<WorkerTask>
|
||||||
) -> Result<bool, Error> {
|
) -> Result<bool, Error> {
|
||||||
|
|
||||||
let mut manifest = match datastore.load_manifest(&backup_dir) {
|
let mut manifest = match datastore.load_manifest(&backup_dir) {
|
||||||
|
@ -198,23 +260,23 @@ pub fn verify_backup_dir(
|
||||||
match archive_type(&info.filename)? {
|
match archive_type(&info.filename)? {
|
||||||
ArchiveType::FixedIndex =>
|
ArchiveType::FixedIndex =>
|
||||||
verify_fixed_index(
|
verify_fixed_index(
|
||||||
&datastore,
|
datastore.clone(),
|
||||||
&backup_dir,
|
&backup_dir,
|
||||||
info,
|
info,
|
||||||
verified_chunks,
|
verified_chunks.clone(),
|
||||||
corrupt_chunks,
|
corrupt_chunks.clone(),
|
||||||
worker
|
worker.clone(),
|
||||||
),
|
),
|
||||||
ArchiveType::DynamicIndex =>
|
ArchiveType::DynamicIndex =>
|
||||||
verify_dynamic_index(
|
verify_dynamic_index(
|
||||||
&datastore,
|
datastore.clone(),
|
||||||
&backup_dir,
|
&backup_dir,
|
||||||
info,
|
info,
|
||||||
verified_chunks,
|
verified_chunks.clone(),
|
||||||
corrupt_chunks,
|
corrupt_chunks.clone(),
|
||||||
worker
|
worker.clone(),
|
||||||
),
|
),
|
||||||
ArchiveType::Blob => verify_blob(&datastore, &backup_dir, info),
|
ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -247,7 +309,7 @@ pub fn verify_backup_dir(
|
||||||
/// Returns
|
/// Returns
|
||||||
/// - Ok(failed_dirs) where failed_dirs had verification errors
|
/// - Ok(failed_dirs) where failed_dirs had verification errors
|
||||||
/// - Err(_) if task was aborted
|
/// - Err(_) if task was aborted
|
||||||
pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &WorkerTask) -> Result<Vec<String>, Error> {
|
pub fn verify_backup_group(datastore: Arc<DataStore>, group: &BackupGroup, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
|
||||||
|
|
||||||
let mut errors = Vec::new();
|
let mut errors = Vec::new();
|
||||||
let mut list = match group.list_backups(&datastore.base_path()) {
|
let mut list = match group.list_backups(&datastore.base_path()) {
|
||||||
|
@ -260,12 +322,15 @@ pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &
|
||||||
|
|
||||||
worker.log(format!("verify group {}:{}", datastore.name(), group));
|
worker.log(format!("verify group {}:{}", datastore.name(), group));
|
||||||
|
|
||||||
let mut verified_chunks = HashSet::with_capacity(1024*16); // start with 16384 chunks (up to 65GB)
|
// start with 16384 chunks (up to 65GB)
|
||||||
let mut corrupt_chunks = HashSet::with_capacity(64); // start with 64 chunks since we assume there are few corrupt ones
|
let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
|
||||||
|
|
||||||
|
// start with 64 chunks since we assume there are few corrupt ones
|
||||||
|
let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
|
||||||
|
|
||||||
BackupInfo::sort_list(&mut list, false); // newest first
|
BackupInfo::sort_list(&mut list, false); // newest first
|
||||||
for info in list {
|
for info in list {
|
||||||
if !verify_backup_dir(datastore, &info.backup_dir, &mut verified_chunks, &mut corrupt_chunks, worker)?{
|
if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
|
||||||
errors.push(info.backup_dir.to_string());
|
errors.push(info.backup_dir.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -280,7 +345,7 @@ pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &
|
||||||
/// Returns
|
/// Returns
|
||||||
/// - Ok(failed_dirs) where failed_dirs had verification errors
|
/// - Ok(failed_dirs) where failed_dirs had verification errors
|
||||||
/// - Err(_) if task was aborted
|
/// - Err(_) if task was aborted
|
||||||
pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result<Vec<String>, Error> {
|
pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
|
||||||
|
|
||||||
let mut errors = Vec::new();
|
let mut errors = Vec::new();
|
||||||
|
|
||||||
|
@ -297,7 +362,7 @@ pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result<
|
||||||
worker.log(format!("verify datastore {}", datastore.name()));
|
worker.log(format!("verify datastore {}", datastore.name()));
|
||||||
|
|
||||||
for group in list {
|
for group in list {
|
||||||
let mut group_errors = verify_backup_group(datastore, &group, worker)?;
|
let mut group_errors = verify_backup_group(datastore.clone(), &group, worker.clone())?;
|
||||||
errors.append(&mut group_errors);
|
errors.append(&mut group_errors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue