don't require WorkerTask in backup/

To untangle the server code from the actual backup
implementation.
It would be ideal if the whole backup/ dir could become its
own crate with minimal dependencies, certainly without
depending on the actual api server. That would then also be
used more easily to create forensic tools for all the data
file types we have in the backup repositories.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2020-10-12 11:46:34 +02:00
parent d1993187b6
commit f6b1d1cc66
5 changed files with 195 additions and 73 deletions

View File

@ -518,7 +518,14 @@ pub fn verify(
let failed_dirs = if let Some(backup_dir) = backup_dir { let failed_dirs = if let Some(backup_dir) = backup_dir {
let mut res = Vec::new(); let mut res = Vec::new();
if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? { if !verify_backup_dir(
datastore,
&backup_dir,
verified_chunks,
corrupt_chunks,
worker.clone(),
worker.upid().clone(),
)? {
res.push(backup_dir.to_string()); res.push(backup_dir.to_string());
} }
res res
@ -530,10 +537,11 @@ pub fn verify(
corrupt_chunks, corrupt_chunks,
None, None,
worker.clone(), worker.clone(),
worker.upid(),
)?; )?;
failed_dirs failed_dirs
} else { } else {
verify_all_backups(datastore, worker.clone())? verify_all_backups(datastore, worker.clone(), worker.upid())?
}; };
if failed_dirs.len() > 0 { if failed_dirs.len() > 0 {
worker.log("Failed to verify following snapshots:"); worker.log("Failed to verify following snapshots:");
@ -770,7 +778,7 @@ fn start_garbage_collection(
to_stdout, to_stdout,
move |worker| { move |worker| {
worker.log(format!("starting garbage collection on store {}", store)); worker.log(format!("starting garbage collection on store {}", store));
datastore.garbage_collection(&worker) datastore.garbage_collection(&*worker, worker.upid())
}, },
)?; )?;

View File

@ -11,7 +11,7 @@ use crate::tools;
use crate::api2::types::GarbageCollectionStatus; use crate::api2::types::GarbageCollectionStatus;
use super::DataBlob; use super::DataBlob;
use crate::server::WorkerTask; use crate::task::TaskState;
/// File system based chunk store /// File system based chunk store
pub struct ChunkStore { pub struct ChunkStore {
@ -278,7 +278,7 @@ impl ChunkStore {
oldest_writer: i64, oldest_writer: i64,
phase1_start_time: i64, phase1_start_time: i64,
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &WorkerTask, worker: &dyn TaskState,
) -> Result<(), Error> { ) -> Result<(), Error> {
use nix::sys::stat::fstatat; use nix::sys::stat::fstatat;
use nix::unistd::{unlinkat, UnlinkatFlags}; use nix::unistd::{unlinkat, UnlinkatFlags};
@ -297,10 +297,15 @@ impl ChunkStore {
for (entry, percentage, bad) in self.get_chunk_iterator()? { for (entry, percentage, bad) in self.get_chunk_iterator()? {
if last_percentage != percentage { if last_percentage != percentage {
last_percentage = percentage; last_percentage = percentage;
worker.log(format!("percentage done: phase2 {}% (processed {} chunks)", percentage, chunk_count)); crate::task_log!(
worker,
"percentage done: phase2 {}% (processed {} chunks)",
percentage,
chunk_count,
);
} }
worker.fail_on_abort()?; worker.check_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
let (dirfd, entry) = match entry { let (dirfd, entry) = match entry {
@ -334,12 +339,13 @@ impl ChunkStore {
Ok(_) => { Ok(_) => {
match unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) { match unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) {
Err(err) => Err(err) =>
worker.warn(format!( crate::task_warn!(
worker,
"unlinking corrupt chunk {:?} failed on store '{}' - {}", "unlinking corrupt chunk {:?} failed on store '{}' - {}",
filename, filename,
self.name, self.name,
err, err,
)), ),
Ok(_) => { Ok(_) => {
status.removed_bad += 1; status.removed_bad += 1;
status.removed_bytes += stat.st_size as u64; status.removed_bytes += stat.st_size as u64;
@ -351,11 +357,12 @@ impl ChunkStore {
}, },
Err(err) => { Err(err) => {
// some other error, warn user and keep .bad file around too // some other error, warn user and keep .bad file around too
worker.warn(format!( crate::task_warn!(
worker,
"error during stat on '{:?}' - {}", "error during stat on '{:?}' - {}",
orig_filename, orig_filename,
err, err,
)); );
} }
} }
} else if stat.st_atime < min_atime { } else if stat.st_atime < min_atime {

View File

@ -18,11 +18,12 @@ use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest};
use super::index::*; use super::index::*;
use super::{DataBlob, ArchiveType, archive_type}; use super::{DataBlob, ArchiveType, archive_type};
use crate::config::datastore; use crate::config::datastore;
use crate::server::WorkerTask; use crate::task::TaskState;
use crate::tools; use crate::tools;
use crate::tools::format::HumanByte; use crate::tools::format::HumanByte;
use crate::tools::fs::{lock_dir_noblock, DirLockGuard}; use crate::tools::fs::{lock_dir_noblock, DirLockGuard};
use crate::api2::types::{GarbageCollectionStatus, Userid}; use crate::api2::types::{GarbageCollectionStatus, Userid};
use crate::server::UPID;
lazy_static! { lazy_static! {
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new()); static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
@ -411,25 +412,34 @@ impl DataStore {
index: I, index: I,
file_name: &Path, // only used for error reporting file_name: &Path, // only used for error reporting
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &WorkerTask, worker: &dyn TaskState,
) -> Result<(), Error> { ) -> Result<(), Error> {
status.index_file_count += 1; status.index_file_count += 1;
status.index_data_bytes += index.index_bytes(); status.index_data_bytes += index.index_bytes();
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
worker.fail_on_abort()?; worker.check_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
let digest = index.index_digest(pos).unwrap(); let digest = index.index_digest(pos).unwrap();
if let Err(err) = self.chunk_store.touch_chunk(digest) { if let Err(err) = self.chunk_store.touch_chunk(digest) {
worker.warn(&format!("warning: unable to access chunk {}, required by {:?} - {}", crate::task_warn!(
proxmox::tools::digest_to_hex(digest), file_name, err)); worker,
"warning: unable to access chunk {}, required by {:?} - {}",
proxmox::tools::digest_to_hex(digest),
file_name,
err,
);
} }
} }
Ok(()) Ok(())
} }
fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> { fn mark_used_chunks(
&self,
status: &mut GarbageCollectionStatus,
worker: &dyn TaskState,
) -> Result<(), Error> {
let image_list = self.list_images()?; let image_list = self.list_images()?;
@ -441,7 +451,7 @@ impl DataStore {
for path in image_list { for path in image_list {
worker.fail_on_abort()?; worker.check_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
if let Ok(archive_type) = archive_type(&path) { if let Ok(archive_type) = archive_type(&path) {
@ -457,8 +467,13 @@ impl DataStore {
let percentage = done*100/image_count; let percentage = done*100/image_count;
if percentage > last_percentage { if percentage > last_percentage {
worker.log(format!("percentage done: phase1 {}% ({} of {} index files)", crate::task_log!(
percentage, done, image_count)); worker,
"percentage done: phase1 {}% ({} of {} index files)",
percentage,
done,
image_count,
);
last_percentage = percentage; last_percentage = percentage;
} }
} }
@ -474,7 +489,7 @@ impl DataStore {
if let Ok(_) = self.gc_mutex.try_lock() { false } else { true } if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
} }
pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> { pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> {
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
@ -487,36 +502,59 @@ impl DataStore {
let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time); let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
let mut gc_status = GarbageCollectionStatus::default(); let mut gc_status = GarbageCollectionStatus::default();
gc_status.upid = Some(worker.to_string()); gc_status.upid = Some(upid.to_string());
worker.log("Start GC phase1 (mark used chunks)"); crate::task_log!(worker, "Start GC phase1 (mark used chunks)");
self.mark_used_chunks(&mut gc_status, &worker)?; self.mark_used_chunks(&mut gc_status, worker)?;
worker.log("Start GC phase2 (sweep unused chunks)"); crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)");
self.chunk_store.sweep_unused_chunks(oldest_writer, phase1_start_time, &mut gc_status, &worker)?; self.chunk_store.sweep_unused_chunks(
oldest_writer,
phase1_start_time,
&mut gc_status,
worker,
)?;
worker.log(&format!("Removed garbage: {}", HumanByte::from(gc_status.removed_bytes))); crate::task_log!(
worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks)); worker,
"Removed garbage: {}",
HumanByte::from(gc_status.removed_bytes),
);
crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
if gc_status.pending_bytes > 0 { if gc_status.pending_bytes > 0 {
worker.log(&format!("Pending removals: {} (in {} chunks)", HumanByte::from(gc_status.pending_bytes), gc_status.pending_chunks)); crate::task_log!(
worker,
"Pending removals: {} (in {} chunks)",
HumanByte::from(gc_status.pending_bytes),
gc_status.pending_chunks,
);
} }
if gc_status.removed_bad > 0 { if gc_status.removed_bad > 0 {
worker.log(&format!("Removed bad files: {}", gc_status.removed_bad)); crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad);
} }
worker.log(&format!("Original data usage: {}", HumanByte::from(gc_status.index_data_bytes))); crate::task_log!(
worker,
"Original data usage: {}",
HumanByte::from(gc_status.index_data_bytes),
);
if gc_status.index_data_bytes > 0 { if gc_status.index_data_bytes > 0 {
let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64; let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64;
worker.log(&format!("On-Disk usage: {} ({:.2}%)", HumanByte::from(gc_status.disk_bytes), comp_per)); crate::task_log!(
worker,
"On-Disk usage: {} ({:.2}%)",
HumanByte::from(gc_status.disk_bytes),
comp_per,
);
} }
worker.log(&format!("On-Disk chunks: {}", gc_status.disk_chunks)); crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
if gc_status.disk_chunks > 0 { if gc_status.disk_chunks > 0 {
let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64); let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
worker.log(&format!("Average chunk size: {}", HumanByte::from(avg_chunk))); crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
} }
*self.last_gc_status.lock().unwrap() = gc_status; *self.last_gc_status.lock().unwrap() = gc_status;

View File

@ -6,9 +6,7 @@ use std::time::Instant;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use crate::{ use crate::{
server::WorkerTask,
api2::types::*, api2::types::*,
tools::ParallelHandler,
backup::{ backup::{
DataStore, DataStore,
DataBlob, DataBlob,
@ -21,6 +19,10 @@ use crate::{
ArchiveType, ArchiveType,
archive_type, archive_type,
}, },
server::UPID,
task::TaskState,
task_log,
tools::ParallelHandler,
}; };
fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
@ -51,7 +53,7 @@ fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInf
fn rename_corrupted_chunk( fn rename_corrupted_chunk(
datastore: Arc<DataStore>, datastore: Arc<DataStore>,
digest: &[u8;32], digest: &[u8;32],
worker: Arc<WorkerTask>, worker: &dyn TaskState,
) { ) {
let (path, digest_str) = datastore.chunk_path(digest); let (path, digest_str) = datastore.chunk_path(digest);
@ -64,12 +66,12 @@ fn rename_corrupted_chunk(
match std::fs::rename(&path, &new_path) { match std::fs::rename(&path, &new_path) {
Ok(_) => { Ok(_) => {
worker.log(format!("corrupted chunk renamed to {:?}", &new_path)); task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
}, },
Err(err) => { Err(err) => {
match err.kind() { match err.kind() {
std::io::ErrorKind::NotFound => { /* ignored */ }, std::io::ErrorKind::NotFound => { /* ignored */ },
_ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err)) _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
} }
} }
}; };
@ -81,7 +83,7 @@ fn verify_index_chunks(
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_mode: CryptMode, crypt_mode: CryptMode,
worker: Arc<WorkerTask>, worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let errors = Arc::new(AtomicUsize::new(0)); let errors = Arc::new(AtomicUsize::new(0));
@ -103,7 +105,7 @@ fn verify_index_chunks(
let chunk_crypt_mode = match chunk.crypt_mode() { let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => { Err(err) => {
corrupt_chunks2.lock().unwrap().insert(digest); corrupt_chunks2.lock().unwrap().insert(digest);
worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err)); task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
errors2.fetch_add(1, Ordering::SeqCst); errors2.fetch_add(1, Ordering::SeqCst);
return Ok(()); return Ok(());
}, },
@ -111,19 +113,20 @@ fn verify_index_chunks(
}; };
if chunk_crypt_mode != crypt_mode { if chunk_crypt_mode != crypt_mode {
worker2.log(format!( task_log!(
worker2,
"chunk CryptMode {:?} does not match index CryptMode {:?}", "chunk CryptMode {:?} does not match index CryptMode {:?}",
chunk_crypt_mode, chunk_crypt_mode,
crypt_mode crypt_mode
)); );
errors2.fetch_add(1, Ordering::SeqCst); errors2.fetch_add(1, Ordering::SeqCst);
} }
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
corrupt_chunks2.lock().unwrap().insert(digest); corrupt_chunks2.lock().unwrap().insert(digest);
worker2.log(format!("{}", err)); task_log!(worker2, "{}", err);
errors2.fetch_add(1, Ordering::SeqCst); errors2.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone()); rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
} else { } else {
verified_chunks2.lock().unwrap().insert(digest); verified_chunks2.lock().unwrap().insert(digest);
} }
@ -134,7 +137,7 @@ fn verify_index_chunks(
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
worker.fail_on_abort()?; worker.check_abort()?;
crate::tools::fail_on_shutdown()?; crate::tools::fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap(); let info = index.chunk_info(pos).unwrap();
@ -146,7 +149,7 @@ fn verify_index_chunks(
if corrupt_chunks.lock().unwrap().contains(&info.digest) { if corrupt_chunks.lock().unwrap().contains(&info.digest) {
let digest_str = proxmox::tools::digest_to_hex(&info.digest); let digest_str = proxmox::tools::digest_to_hex(&info.digest);
worker.log(format!("chunk {} was marked as corrupt", digest_str)); task_log!(worker, "chunk {} was marked as corrupt", digest_str);
errors.fetch_add(1, Ordering::SeqCst); errors.fetch_add(1, Ordering::SeqCst);
continue; continue;
} }
@ -154,9 +157,9 @@ fn verify_index_chunks(
match datastore.load_chunk(&info.digest) { match datastore.load_chunk(&info.digest) {
Err(err) => { Err(err) => {
corrupt_chunks.lock().unwrap().insert(info.digest); corrupt_chunks.lock().unwrap().insert(info.digest);
worker.log(format!("can't verify chunk, load failed - {}", err)); task_log!(worker, "can't verify chunk, load failed - {}", err);
errors.fetch_add(1, Ordering::SeqCst); errors.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
continue; continue;
} }
Ok(chunk) => { Ok(chunk) => {
@ -179,8 +182,16 @@ fn verify_index_chunks(
let error_count = errors.load(Ordering::SeqCst); let error_count = errors.load(Ordering::SeqCst);
worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", task_log!(
read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); worker,
" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
read_bytes_mib,
decoded_bytes_mib,
elapsed,
read_speed,
decode_speed,
error_count,
);
if errors.load(Ordering::SeqCst) > 0 { if errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified"); bail!("chunks could not be verified");
@ -195,7 +206,7 @@ fn verify_fixed_index(
info: &FileInfo, info: &FileInfo,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
worker: Arc<WorkerTask>, worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut path = backup_dir.relative_path(); let mut path = backup_dir.relative_path();
@ -212,7 +223,14 @@ fn verify_fixed_index(
bail!("wrong index checksum"); bail!("wrong index checksum");
} }
verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) verify_index_chunks(
datastore,
Box::new(index),
verified_chunks,
corrupt_chunks,
info.chunk_crypt_mode(),
worker,
)
} }
fn verify_dynamic_index( fn verify_dynamic_index(
@ -221,7 +239,7 @@ fn verify_dynamic_index(
info: &FileInfo, info: &FileInfo,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
worker: Arc<WorkerTask>, worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut path = backup_dir.relative_path(); let mut path = backup_dir.relative_path();
@ -238,7 +256,14 @@ fn verify_dynamic_index(
bail!("wrong index checksum"); bail!("wrong index checksum");
} }
verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) verify_index_chunks(
datastore,
Box::new(index),
verified_chunks,
corrupt_chunks,
info.chunk_crypt_mode(),
worker,
)
} }
/// Verify a single backup snapshot /// Verify a single backup snapshot
@ -255,25 +280,32 @@ pub fn verify_backup_dir(
backup_dir: &BackupDir, backup_dir: &BackupDir,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
worker: Arc<WorkerTask> worker: Arc<dyn TaskState + Send + Sync>,
upid: UPID,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let mut manifest = match datastore.load_manifest(&backup_dir) { let mut manifest = match datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest, Ok((manifest, _)) => manifest,
Err(err) => { Err(err) => {
worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err)); task_log!(
worker,
"verify {}:{} - manifest load error: {}",
datastore.name(),
backup_dir,
err,
);
return Ok(false); return Ok(false);
} }
}; };
worker.log(format!("verify {}:{}", datastore.name(), backup_dir)); task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
let mut error_count = 0; let mut error_count = 0;
let mut verify_result = VerifyState::Ok; let mut verify_result = VerifyState::Ok;
for info in manifest.files() { for info in manifest.files() {
let result = proxmox::try_block!({ let result = proxmox::try_block!({
worker.log(format!(" check {}", info.filename)); task_log!(worker, " check {}", info.filename);
match archive_type(&info.filename)? { match archive_type(&info.filename)? {
ArchiveType::FixedIndex => ArchiveType::FixedIndex =>
verify_fixed_index( verify_fixed_index(
@ -297,11 +329,18 @@ pub fn verify_backup_dir(
} }
}); });
worker.fail_on_abort()?; worker.check_abort()?;
crate::tools::fail_on_shutdown()?; crate::tools::fail_on_shutdown()?;
if let Err(err) = result { if let Err(err) = result {
worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err)); task_log!(
worker,
"verify {}:{}/{} failed: {}",
datastore.name(),
backup_dir,
info.filename,
err,
);
error_count += 1; error_count += 1;
verify_result = VerifyState::Failed; verify_result = VerifyState::Failed;
} }
@ -310,7 +349,7 @@ pub fn verify_backup_dir(
let verify_state = SnapshotVerifyState { let verify_state = SnapshotVerifyState {
state: verify_result, state: verify_result,
upid: worker.upid().clone(), upid,
}; };
manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?; manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?) datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
@ -332,19 +371,26 @@ pub fn verify_backup_group(
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
progress: Option<(usize, usize)>, // (done, snapshot_count) progress: Option<(usize, usize)>, // (done, snapshot_count)
worker: Arc<WorkerTask>, worker: Arc<dyn TaskState + Send + Sync>,
upid: &UPID,
) -> Result<(usize, Vec<String>), Error> { ) -> Result<(usize, Vec<String>), Error> {
let mut errors = Vec::new(); let mut errors = Vec::new();
let mut list = match group.list_backups(&datastore.base_path()) { let mut list = match group.list_backups(&datastore.base_path()) {
Ok(list) => list, Ok(list) => list,
Err(err) => { Err(err) => {
worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err)); task_log!(
worker,
"verify group {}:{} - unable to list backups: {}",
datastore.name(),
group,
err,
);
return Ok((0, errors)); return Ok((0, errors));
} }
}; };
worker.log(format!("verify group {}:{}", datastore.name(), group)); task_log!(worker, "verify group {}:{}", datastore.name(), group);
let (done, snapshot_count) = progress.unwrap_or((0, list.len())); let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
@ -352,13 +398,26 @@ pub fn verify_backup_group(
BackupInfo::sort_list(&mut list, false); // newest first BackupInfo::sort_list(&mut list, false); // newest first
for info in list { for info in list {
count += 1; count += 1;
if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ if !verify_backup_dir(
datastore.clone(),
&info.backup_dir,
verified_chunks.clone(),
corrupt_chunks.clone(),
worker.clone(),
upid.clone(),
)? {
errors.push(info.backup_dir.to_string()); errors.push(info.backup_dir.to_string());
} }
if snapshot_count != 0 { if snapshot_count != 0 {
let pos = done + count; let pos = done + count;
let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count)); task_log!(
worker,
"percentage done: {:.2}% ({} of {} snapshots)",
percentage,
pos,
snapshot_count,
);
} }
} }
@ -372,8 +431,11 @@ pub fn verify_backup_group(
/// Returns /// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors /// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted /// - Err(_) if task was aborted
pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> { pub fn verify_all_backups(
datastore: Arc<DataStore>,
worker: Arc<dyn TaskState + Send + Sync>,
upid: &UPID,
) -> Result<Vec<String>, Error> {
let mut errors = Vec::new(); let mut errors = Vec::new();
let mut list = match BackupGroup::list_groups(&datastore.base_path()) { let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
@ -382,7 +444,12 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
.filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
.collect::<Vec<BackupGroup>>(), .collect::<Vec<BackupGroup>>(),
Err(err) => { Err(err) => {
worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err)); task_log!(
worker,
"verify datastore {} - unable to list backups: {}",
datastore.name(),
err,
);
return Ok(errors); return Ok(errors);
} }
}; };
@ -400,7 +467,7 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
// start with 64 chunks since we assume there are few corrupt ones // start with 64 chunks since we assume there are few corrupt ones
let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count)); task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count);
let mut done = 0; let mut done = 0;
for group in list { for group in list {
@ -411,6 +478,7 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
corrupt_chunks.clone(), corrupt_chunks.clone(),
Some((done, snapshot_count)), Some((done, snapshot_count)),
worker.clone(), worker.clone(),
upid,
)?; )?;
errors.append(&mut group_errors); errors.append(&mut group_errors);

View File

@ -306,7 +306,7 @@ async fn schedule_datastore_garbage_collection() {
worker.log(format!("starting garbage collection on store {}", store)); worker.log(format!("starting garbage collection on store {}", store));
worker.log(format!("task triggered by schedule '{}'", event_str)); worker.log(format!("task triggered by schedule '{}'", event_str));
let result = datastore.garbage_collection(&worker); let result = datastore.garbage_collection(&*worker, worker.upid());
let status = worker.create_state(&result); let status = worker.create_state(&result);
@ -557,7 +557,8 @@ async fn schedule_datastore_verification() {
worker.log(format!("starting verification on store {}", store2)); worker.log(format!("starting verification on store {}", store2));
worker.log(format!("task triggered by schedule '{}'", event_str)); worker.log(format!("task triggered by schedule '{}'", event_str));
let result = try_block!({ let result = try_block!({
let failed_dirs = verify_all_backups(datastore, worker.clone())?; let failed_dirs =
verify_all_backups(datastore, worker.clone(), worker.upid())?;
if failed_dirs.len() > 0 { if failed_dirs.len() > 0 {
worker.log("Failed to verify following snapshots:"); worker.log("Failed to verify following snapshots:");
for dir in failed_dirs { for dir in failed_dirs {