WorkerTaskContext: add shutdown_requested() and fail_on_shutdown()

This commit is contained in:
Dietmar Maurer 2021-09-24 11:56:53 +02:00
parent 619cd5cbcb
commit 0fd55b08d9
6 changed files with 37 additions and 11 deletions

View File

@ -280,13 +280,12 @@ impl ChunkStore {
ProcessLocker::oldest_shared_lock(self.locker.clone()) ProcessLocker::oldest_shared_lock(self.locker.clone())
} }
pub fn sweep_unused_chunks<F: Fn() -> Result<(), Error>>( pub fn sweep_unused_chunks(
&self, &self,
oldest_writer: i64, oldest_writer: i64,
phase1_start_time: i64, phase1_start_time: i64,
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &dyn WorkerTaskContext, worker: &dyn WorkerTaskContext,
fail_on_shutdown: F,
) -> Result<(), Error> { ) -> Result<(), Error> {
use nix::sys::stat::fstatat; use nix::sys::stat::fstatat;
use nix::unistd::{unlinkat, UnlinkatFlags}; use nix::unistd::{unlinkat, UnlinkatFlags};
@ -314,7 +313,7 @@ impl ChunkStore {
} }
worker.check_abort()?; worker.check_abort()?;
fail_on_shutdown()?; worker.fail_on_shutdown()?;
let (dirfd, entry) = match entry { let (dirfd, entry) = match entry {
Ok(entry) => (entry.parent_fd(), entry), Ok(entry) => (entry.parent_fd(), entry),

View File

@ -16,6 +16,19 @@ pub trait WorkerTaskContext {
Ok(()) Ok(())
} }
/// Test if there was a request to shutdown the server.
fn shutdown_requested(&self) -> bool;
/// This should fail with a reasonable error message if there was
/// a request to shutdown the server.
fn fail_on_shutdown(&self) -> Result<(), Error> {
if self.shutdown_requested() {
bail!("Server shutdown requested - aborting task");
}
Ok(())
}
/// Create a log message for this task. /// Create a log message for this task.
fn log(&self, level: log::Level, message: &std::fmt::Arguments); fn log(&self, level: log::Level, message: &std::fmt::Arguments);
} }
@ -30,6 +43,14 @@ impl<T: WorkerTaskContext + ?Sized> WorkerTaskContext for std::sync::Arc<T> {
<T as WorkerTaskContext>::check_abort(&*self) <T as WorkerTaskContext>::check_abort(&*self)
} }
fn shutdown_requested(&self) -> bool {
<T as WorkerTaskContext>::shutdown_requested(&*self)
}
fn fail_on_shutdown(&self) -> Result<(), Error> {
<T as WorkerTaskContext>::fail_on_shutdown(&*self)
}
fn log(&self, level: log::Level, message: &std::fmt::Arguments) { fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
<T as WorkerTaskContext>::log(&*self, level, message) <T as WorkerTaskContext>::log(&*self, level, message)
} }

View File

@ -853,6 +853,14 @@ impl WorkerTaskContext for WorkerTask {
self.abort_requested.load(Ordering::SeqCst) self.abort_requested.load(Ordering::SeqCst)
} }
fn shutdown_requested(&self) -> bool {
crate::shutdown_requested()
}
fn fail_on_shutdown(&self) -> Result<(), Error> {
crate::fail_on_shutdown()
}
fn log(&self, level: log::Level, message: &std::fmt::Arguments) { fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
match level { match level {
log::Level::Error => self.log_warning(&message.to_string()), log::Level::Error => self.log_warning(&message.to_string()),

View File

@ -28,7 +28,6 @@ use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
use pbs_tools::process_locker::ProcessLockSharedGuard; use pbs_tools::process_locker::ProcessLockSharedGuard;
use pbs_tools::{task_log, task_warn, task::WorkerTaskContext}; use pbs_tools::{task_log, task_warn, task::WorkerTaskContext};
use pbs_config::{open_backup_lockfile, BackupLockGuard}; use pbs_config::{open_backup_lockfile, BackupLockGuard};
use proxmox_rest_server::fail_on_shutdown;
lazy_static! { lazy_static! {
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new()); static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
@ -506,7 +505,7 @@ impl DataStore {
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
worker.check_abort()?; worker.check_abort()?;
fail_on_shutdown()?; worker.fail_on_shutdown()?;
let digest = index.index_digest(pos).unwrap(); let digest = index.index_digest(pos).unwrap();
if !self.chunk_store.cond_touch_chunk(digest, false)? { if !self.chunk_store.cond_touch_chunk(digest, false)? {
task_warn!( task_warn!(
@ -547,7 +546,7 @@ impl DataStore {
for (i, img) in image_list.into_iter().enumerate() { for (i, img) in image_list.into_iter().enumerate() {
worker.check_abort()?; worker.check_abort()?;
fail_on_shutdown()?; worker.fail_on_shutdown()?;
if let Some(backup_dir_path) = img.parent() { if let Some(backup_dir_path) = img.parent() {
let backup_dir_path = backup_dir_path.strip_prefix(self.base_path())?; let backup_dir_path = backup_dir_path.strip_prefix(self.base_path())?;
@ -636,7 +635,6 @@ impl DataStore {
phase1_start_time, phase1_start_time,
&mut gc_status, &mut gc_status,
worker, worker,
fail_on_shutdown,
)?; )?;
task_log!( task_log!(

View File

@ -172,7 +172,7 @@ fn verify_index_chunks(
let check_abort = |pos: usize| -> Result<(), Error> { let check_abort = |pos: usize| -> Result<(), Error> {
if pos & 1023 == 0 { if pos & 1023 == 0 {
verify_worker.worker.check_abort()?; verify_worker.worker.check_abort()?;
proxmox_rest_server::fail_on_shutdown()?; verify_worker.worker.fail_on_shutdown()?;
} }
Ok(()) Ok(())
}; };
@ -184,7 +184,7 @@ fn verify_index_chunks(
for (pos, _) in chunk_list { for (pos, _) in chunk_list {
verify_worker.worker.check_abort()?; verify_worker.worker.check_abort()?;
proxmox_rest_server::fail_on_shutdown()?; verify_worker.worker.fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap(); let info = index.chunk_info(pos).unwrap();
@ -376,7 +376,7 @@ pub fn verify_backup_dir_with_lock(
}); });
verify_worker.worker.check_abort()?; verify_worker.worker.check_abort()?;
proxmox_rest_server::fail_on_shutdown()?; verify_worker.worker.fail_on_shutdown()?;
if let Err(err) = result { if let Err(err) = result {
task_log!( task_log!(

View File

@ -9,7 +9,7 @@ use proxmox::try_block;
use proxmox::tools::fs::CreateOptions; use proxmox::tools::fs::CreateOptions;
use pbs_api_types::{Authid, UPID}; use pbs_api_types::{Authid, UPID};
use pbs_tools::task_log; use pbs_tools::{task_log, task::WorkerTaskContext};
use proxmox_rest_server::{CommandoSocket, WorkerTask}; use proxmox_rest_server::{CommandoSocket, WorkerTask};