src/backup/verify.rs: use ParallelHandler to verify chunks

This commit is contained in:
Dietmar Maurer 2020-09-26 11:14:37 +02:00
parent ee7a308de4
commit f21508b9e1
1 changed files with 69 additions and 90 deletions

View File

@ -8,6 +8,7 @@ use anyhow::{bail, format_err, Error};
use crate::{
server::WorkerTask,
api2::types::*,
tools::ParallelHandler,
backup::{
DataStore,
DataBlob,
@ -74,23 +75,70 @@ fn rename_corrupted_chunk(
};
}
// We use a separate thread to read/load chunks, so that we can do
// load and verify in parallel to increase performance.
fn chunk_reader_thread(
fn verify_index_chunks(
datastore: Arc<DataStore>,
index: Box<dyn IndexFile + Send>,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
errors: Arc<AtomicUsize>,
crypt_mode: CryptMode,
worker: Arc<WorkerTask>,
) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
) -> Result<(), Error> {
let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
let errors = AtomicUsize::new(0);
let start_time = Instant::now();
let mut read_bytes = 0;
let mut decoded_bytes = 0;
let worker2 = Arc::clone(&worker);
let datastore2 = Arc::clone(&datastore);
let corrupt_chunks2 = Arc::clone(&corrupt_chunks);
let verified_chunks2 = Arc::clone(&verified_chunks);
let errors2 = &errors;
let decoder_pool = ParallelHandler::new(
"verify chunk decoder", 4,
move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
corrupt_chunks2.lock().unwrap().insert(digest);
worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err));
errors2.fetch_add(1, Ordering::SeqCst);
return Ok(());
},
Ok(mode) => mode,
};
if chunk_crypt_mode != crypt_mode {
worker2.log(format!(
"chunk CryptMode {:?} does not match index CryptMode {:?}",
chunk_crypt_mode,
crypt_mode
));
errors2.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
corrupt_chunks2.lock().unwrap().insert(digest);
worker2.log(format!("{}", err));
errors2.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone());
} else {
verified_chunks2.lock().unwrap().insert(digest);
}
Ok(())
}
);
std::thread::spawn(move|| {
for pos in 0..index.index_count() {
worker.fail_on_abort()?;
crate::tools::fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap();
let size = info.range.end - info.range.start;
let size = info.size();
if verified_chunks.lock().unwrap().contains(&info.digest) {
continue; // already verified
@ -112,83 +160,14 @@ fn chunk_reader_thread(
continue;
}
Ok(chunk) => {
if sender.send((chunk, info.digest, size)).is_err() {
break; // receiver gone - simply stop
}
}
}
}
});
receiver
}
fn verify_index_chunks(
datastore: Arc<DataStore>,
index: Box<dyn IndexFile + Send>,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_mode: CryptMode,
worker: Arc<WorkerTask>,
) -> Result<(), Error> {
let errors = Arc::new(AtomicUsize::new(0));
let start_time = Instant::now();
let chunk_channel = chunk_reader_thread(
datastore.clone(),
index,
verified_chunks.clone(),
corrupt_chunks.clone(),
errors.clone(),
worker.clone(),
);
let mut read_bytes = 0;
let mut decoded_bytes = 0;
loop {
worker.fail_on_abort()?;
crate::tools::fail_on_shutdown()?;
let (chunk, digest, size) = match chunk_channel.recv() {
Ok(tuple) => tuple,
Err(std::sync::mpsc::RecvError) => break,
};
read_bytes += chunk.raw_size();
decoder_pool.send((chunk, info.digest, size))?;
decoded_bytes += size;
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
corrupt_chunks.lock().unwrap().insert(digest);
worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
errors.fetch_add(1, Ordering::SeqCst);
continue;
},
Ok(mode) => mode,
};
if chunk_crypt_mode != crypt_mode {
worker.log(format!(
"chunk CryptMode {:?} does not match index CryptMode {:?}",
chunk_crypt_mode,
crypt_mode
));
errors.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
corrupt_chunks.lock().unwrap().insert(digest);
worker.log(format!("{}", err));
errors.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore.clone(), &digest, worker.clone());
} else {
verified_chunks.lock().unwrap().insert(digest);
}
}
}
decoder_pool.complete()?;
let elapsed = start_time.elapsed().as_secs_f64();