verify: rename corrupted chunks with .bad extension

This ensures that following backups will always upload the chunk,
thereby replacing it with a correct version again.

Format for renaming is <digest>.<counter>.bad where <counter> is used if
a chunk is found to be bad again before a GC cleans it up.

Care has been taken to deliberately only rename a chunk in conditions
where it is guaranteed to be an error in the chunk itself. Otherwise a
broken index file could lead to an unwanted mass-rename of chunks.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
Stefan Reiter 2020-09-07 17:30:33 +02:00 committed by Dietmar Maurer
parent 7c77e2f94a
commit 0f3b7efa84
1 changed files with 31 additions and 1 deletions

View File

@ -39,6 +39,34 @@ fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInf
}
}
fn rename_corrupted_chunk(
datastore: Arc<DataStore>,
digest: &[u8;32],
worker: Arc<WorkerTask>,
) {
let (path, digest_str) = datastore.chunk_path(digest);
let mut counter = 0;
let mut new_path = path.clone();
new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
while new_path.exists() && counter < 9 {
counter += 1;
new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
}
match std::fs::rename(&path, &new_path) {
Ok(_) => {
worker.log(format!("corrupted chunk renamed to {:?}", &new_path));
},
Err(err) => {
match err.kind() {
std::io::ErrorKind::NotFound => { /* ignored */ },
_ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err))
}
}
};
}
// We use a separate thread to read/load chunks, so that we can do
// load and verify in parallel to increase performance.
fn chunk_reader_thread(
@ -73,6 +101,7 @@ fn chunk_reader_thread(
corrupt_chunks.lock().unwrap().insert(info.digest);
worker.log(format!("can't verify chunk, load failed - {}", err));
errors.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone());
continue;
}
Ok(chunk) => {
@ -101,7 +130,7 @@ fn verify_index_chunks(
let start_time = Instant::now();
let chunk_channel = chunk_reader_thread(
datastore,
datastore.clone(),
index,
verified_chunks.clone(),
corrupt_chunks.clone(),
@ -148,6 +177,7 @@ fn verify_index_chunks(
corrupt_chunks.lock().unwrap().insert(digest);
worker.log(format!("{}", err));
errors.fetch_add(1, Ordering::SeqCst);
rename_corrupted_chunk(datastore.clone(), &digest, worker.clone());
} else {
verified_chunks.lock().unwrap().insert(digest);
}