From f21508b9e1cead5a790570cfa22f5af270c94511 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 26 Sep 2020 11:14:37 +0200 Subject: [PATCH] src/backup/verify.rs: use ParallelHandler to verify chunks --- src/backup/verify.rs | 159 +++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 90 deletions(-) diff --git a/src/backup/verify.rs b/src/backup/verify.rs index e132135b..fd48d907 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -8,6 +8,7 @@ use anyhow::{bail, format_err, Error}; use crate::{ server::WorkerTask, api2::types::*, + tools::ParallelHandler, backup::{ DataStore, DataBlob, @@ -74,55 +75,6 @@ fn rename_corrupted_chunk( }; } -// We use a separate thread to read/load chunks, so that we can do -// load and verify in parallel to increase performance. -fn chunk_reader_thread( - datastore: Arc, - index: Box, - verified_chunks: Arc>>, - corrupt_chunks: Arc>>, - errors: Arc, - worker: Arc, -) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> { - - let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks - - std::thread::spawn(move|| { - for pos in 0..index.index_count() { - let info = index.chunk_info(pos).unwrap(); - let size = info.range.end - info.range.start; - - if verified_chunks.lock().unwrap().contains(&info.digest) { - continue; // already verified - } - - if corrupt_chunks.lock().unwrap().contains(&info.digest) { - let digest_str = proxmox::tools::digest_to_hex(&info.digest); - worker.log(format!("chunk {} was marked as corrupt", digest_str)); - errors.fetch_add(1, Ordering::SeqCst); - continue; - } - - match datastore.load_chunk(&info.digest) { - Err(err) => { - corrupt_chunks.lock().unwrap().insert(info.digest); - worker.log(format!("can't verify chunk, load failed - {}", err)); - errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); - continue; - } - Ok(chunk) => { - if sender.send((chunk, info.digest, size)).is_err() { - break; // receiver gone - simply stop - } - } - } - } - }); - - receiver -} - fn verify_index_chunks( datastore: Arc, index: Box, @@ -132,64 +84,91 @@ fn verify_index_chunks( worker: Arc, ) -> Result<(), Error> { - let errors = Arc::new(AtomicUsize::new(0)); + let errors = AtomicUsize::new(0); let start_time = Instant::now(); - let chunk_channel = chunk_reader_thread( - datastore.clone(), - index, - verified_chunks.clone(), - corrupt_chunks.clone(), - errors.clone(), - worker.clone(), - ); - let mut read_bytes = 0; let mut decoded_bytes = 0; - loop { + let worker2 = Arc::clone(&worker); + let datastore2 = Arc::clone(&datastore); + let corrupt_chunks2 = Arc::clone(&corrupt_chunks); + let verified_chunks2 = Arc::clone(&verified_chunks); + let errors2 = &errors; + + let decoder_pool = ParallelHandler::new( + "verify chunk decoder", 4, + move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { + let chunk_crypt_mode = match chunk.crypt_mode() { + Err(err) => { + corrupt_chunks2.lock().unwrap().insert(digest); + worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err)); + errors2.fetch_add(1, Ordering::SeqCst); + return Ok(()); + }, + Ok(mode) => mode, + }; + + if chunk_crypt_mode != crypt_mode { + worker2.log(format!( + "chunk CryptMode {:?} does not match index CryptMode {:?}", + chunk_crypt_mode, + crypt_mode + )); + errors2.fetch_add(1, Ordering::SeqCst); + } + + if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { + corrupt_chunks2.lock().unwrap().insert(digest); + worker2.log(format!("{}", err)); + errors2.fetch_add(1, Ordering::SeqCst); + rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone()); + } else { + verified_chunks2.lock().unwrap().insert(digest); + } + + Ok(()) + } + ); + + for pos in 0..index.index_count() { worker.fail_on_abort()?; crate::tools::fail_on_shutdown()?; - let (chunk, digest, size) = match chunk_channel.recv() { - Ok(tuple) => tuple, - Err(std::sync::mpsc::RecvError) => break, - }; + let info = index.chunk_info(pos).unwrap(); + let size = info.size(); - read_bytes += chunk.raw_size(); - decoded_bytes += size; - - let chunk_crypt_mode = match chunk.crypt_mode() { - Err(err) => { - corrupt_chunks.lock().unwrap().insert(digest); - worker.log(format!("can't verify chunk, unknown CryptMode - {}", err)); - errors.fetch_add(1, Ordering::SeqCst); - continue; - }, - Ok(mode) => mode, - }; - - if chunk_crypt_mode != crypt_mode { - worker.log(format!( - "chunk CryptMode {:?} does not match index CryptMode {:?}", - chunk_crypt_mode, - crypt_mode - )); - errors.fetch_add(1, Ordering::SeqCst); + if verified_chunks.lock().unwrap().contains(&info.digest) { + continue; // already verified } - if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { - corrupt_chunks.lock().unwrap().insert(digest); - worker.log(format!("{}", err)); + if corrupt_chunks.lock().unwrap().contains(&info.digest) { + let digest_str = proxmox::tools::digest_to_hex(&info.digest); + worker.log(format!("chunk {} was marked as corrupt", digest_str)); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore.clone(), &digest, worker.clone()); - } else { - verified_chunks.lock().unwrap().insert(digest); + continue; + } + + match datastore.load_chunk(&info.digest) { + Err(err) => { + corrupt_chunks.lock().unwrap().insert(info.digest); + worker.log(format!("can't verify chunk, load failed - {}", err)); + errors.fetch_add(1, Ordering::SeqCst); + rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); + continue; + } + Ok(chunk) => { + read_bytes += chunk.raw_size(); + decoder_pool.send((chunk, info.digest, size))?; + decoded_bytes += size; + } } } + decoder_pool.complete()?; + let elapsed = start_time.elapsed().as_secs_f64(); let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);