garbage_collect: call fail_on_abort to abort GV when requested.

This commit is contained in:
Dietmar Maurer 2020-05-05 09:06:34 +02:00
parent 74f7240b8d
commit 99641a6bbb
4 changed files with 13 additions and 9 deletions

View File

@ -630,7 +630,7 @@ fn start_garbage_collection(
"garbage_collection", Some(store.clone()), "root@pam", to_stdout, move |worker| "garbage_collection", Some(store.clone()), "root@pam", to_stdout, move |worker|
{ {
worker.log(format!("starting garbage collection on store {}", store)); worker.log(format!("starting garbage collection on store {}", store));
datastore.garbage_collection(worker) datastore.garbage_collection(&worker)
})?; })?;
Ok(json!(upid_str)) Ok(json!(upid_str))

View File

@ -291,7 +291,7 @@ impl ChunkStore {
&self, &self,
oldest_writer: i64, oldest_writer: i64,
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: Arc<WorkerTask>, worker: &WorkerTask,
) -> Result<(), Error> { ) -> Result<(), Error> {
use nix::sys::stat::fstatat; use nix::sys::stat::fstatat;
@ -314,6 +314,7 @@ impl ChunkStore {
worker.log(format!("percentage done: {}, chunk count: {}", percentage, chunk_count)); worker.log(format!("percentage done: {}, chunk count: {}", percentage, chunk_count));
} }
worker.fail_on_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
let (dirfd, entry) = match entry { let (dirfd, entry) = match entry {

View File

@ -352,12 +352,14 @@ impl DataStore {
index: I, index: I,
file_name: &Path, // only used for error reporting file_name: &Path, // only used for error reporting
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &WorkerTask,
) -> Result<(), Error> { ) -> Result<(), Error> {
status.index_file_count += 1; status.index_file_count += 1;
status.index_data_bytes += index.index_bytes(); status.index_data_bytes += index.index_bytes();
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
worker.fail_on_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
let digest = index.index_digest(pos).unwrap(); let digest = index.index_digest(pos).unwrap();
if let Err(err) = self.chunk_store.touch_chunk(digest) { if let Err(err) = self.chunk_store.touch_chunk(digest) {
@ -368,21 +370,22 @@ impl DataStore {
Ok(()) Ok(())
} }
fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> { fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> {
let image_list = self.list_images()?; let image_list = self.list_images()?;
for path in image_list { for path in image_list {
worker.fail_on_abort()?;
tools::fail_on_shutdown()?; tools::fail_on_shutdown()?;
if let Ok(archive_type) = archive_type(&path) { if let Ok(archive_type) = archive_type(&path) {
if archive_type == ArchiveType::FixedIndex { if archive_type == ArchiveType::FixedIndex {
let index = self.open_fixed_reader(&path)?; let index = self.open_fixed_reader(&path)?;
self.index_mark_used_chunks(index, &path, status)?; self.index_mark_used_chunks(index, &path, status, worker)?;
} else if archive_type == ArchiveType::DynamicIndex { } else if archive_type == ArchiveType::DynamicIndex {
let index = self.open_dynamic_reader(&path)?; let index = self.open_dynamic_reader(&path)?;
self.index_mark_used_chunks(index, &path, status)?; self.index_mark_used_chunks(index, &path, status, worker)?;
} }
} }
} }
@ -394,7 +397,7 @@ impl DataStore {
self.last_gc_status.lock().unwrap().clone() self.last_gc_status.lock().unwrap().clone()
} }
pub fn garbage_collection(&self, worker: Arc<WorkerTask>) -> Result<(), Error> { pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
@ -409,10 +412,10 @@ impl DataStore {
worker.log("Start GC phase1 (mark used chunks)"); worker.log("Start GC phase1 (mark used chunks)");
self.mark_used_chunks(&mut gc_status)?; self.mark_used_chunks(&mut gc_status, &worker)?;
worker.log("Start GC phase2 (sweep unused chunks)"); worker.log("Start GC phase2 (sweep unused chunks)");
self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status, worker.clone())?; self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status, &worker)?;
worker.log(&format!("Removed bytes: {}", gc_status.removed_bytes)); worker.log(&format!("Removed bytes: {}", gc_status.removed_bytes));
worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks)); worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));

View File

@ -514,7 +514,7 @@ impl WorkerTask {
/// Fail if abort was requested. /// Fail if abort was requested.
pub fn fail_on_abort(&self) -> Result<(), Error> { pub fn fail_on_abort(&self) -> Result<(), Error> {
if self.abort_requested() { if self.abort_requested() {
bail!("task '{}': abort requested - aborting task", self.upid); bail!("abort requested - aborting task");
} }
Ok(()) Ok(())
} }