diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index ebfa1bea..fa52cd82 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -397,6 +397,10 @@ impl DataStore { self.last_gc_status.lock().unwrap().clone() } + pub fn garbage_collection_running(&self) -> bool { + if let Ok(_) = self.gc_mutex.try_lock() { false } else { true } + } + pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index ef6e6703..2d306f32 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -107,6 +107,8 @@ async fn run() -> Result<(), Error> { bail!("unable to start daemon - {}", err); } + start_task_scheduler(); + server.await?; log::info!("server shutting down, waiting for active workers to complete"); proxmox_backup::server::last_worker_future().await?; @@ -114,3 +116,184 @@ async fn run() -> Result<(), Error> { Ok(()) } + +fn start_task_scheduler() { + let abort_future = server::shutdown_future(); + let future = Box::pin(run_task_scheduler()); + let task = futures::future::select(future, abort_future); + tokio::spawn(task.map(|_| ())); +} + +use std::time:: {Instant, Duration, SystemTime, UNIX_EPOCH}; + +fn next_minute() -> Result { + let epoch_now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60); + Ok(Instant::now() + epoch_next - epoch_now) +} + +async fn run_task_scheduler() { + + let mut count: usize = 0; + + loop { + count += 1; + + let delay_target = match next_minute() { // try to run very minute + Ok(d) => d, + Err(err) => { + eprintln!("task scheduler: compute next minute failed - {}", err); + tokio::time::delay_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await; + continue; + } + }; + + if count > 2 { // wait 1..2 minutes before starting + match schedule_tasks().catch_unwind().await { + Err(panic) => { + match panic.downcast::<&str>() { + Ok(msg) => { + eprintln!("task scheduler panic: {}", msg); + } + Err(_) => { + eprintln!("task scheduler panic - unknown type"); + } + } + } + Ok(Err(err)) => { + eprintln!("task scheduler failed - {:?}", err); + } + Ok(Ok(_)) => {} + } + } + + tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await; + } +} + +async fn schedule_tasks() -> Result<(), Error> { + + schedule_datastore_garbage_collection().await; + + Ok(()) +} + +fn lookup_last_worker_start(worker_type: &str, worker_id: &str) -> Result { + + let list = proxmox_backup::server::read_task_list()?; + + for entry in list { + if entry.upid.worker_type == worker_type { + if let Some(id) = entry.upid.worker_id { + if id == worker_id { + return Ok(entry.upid.starttime); + } + } + } + } + + Ok(0) +} + + +async fn schedule_datastore_garbage_collection() { + + use proxmox_backup::backup::DataStore; + use proxmox_backup::server::{UPID, WorkerTask}; + use proxmox_backup::tools::systemd::time::{ + parse_calendar_event, compute_next_event}; + + let config = match proxmox_backup::config::datastore::config() { + Err(err) => { + eprintln!("unable to read datastore config - {}", err); + return; + } + Ok((config, _digest)) => config, + }; + + for (store, (_, store_config)) in config.sections { + let datastore = match DataStore::lookup_datastore(&store) { + Ok(datastore) => datastore, + Err(err) => { + eprintln!("lookup_datastore failed - {}", err); + continue; + } + }; + + let store_config: proxmox_backup::config::datastore::DataStoreConfig = match serde_json::from_value(store_config) { + Ok(c) => c, + Err(err) => { + eprintln!("datastore config from_value failed - {}", err); + continue; + } + }; + + let event_str = match store_config.gc_schedule { + Some(event_str) => event_str, + None => continue, + }; + + let event = match parse_calendar_event(&event_str) { + Ok(event) => event, + Err(err) => { + eprintln!("unable to parse schedule '{}' - {}", event_str, err); + continue; + } + }; + + if datastore.garbage_collection_running() { continue; } + + let worker_type = "garbage_collection"; + + let stat = datastore.last_gc_status(); + let last = if let Some(upid_str) = stat.upid { + match upid_str.parse::() { + Ok(upid) => upid.starttime, + Err(err) => { + eprintln!("unable to parse upid '{}' - {}", upid_str, err); + continue; + } + } + } else { + match lookup_last_worker_start(worker_type, &store) { + Ok(t) => t, + Err(err) => { + eprintln!("lookup_last_job_start failed: {}", err); + continue; + } + } + }; + + let next = match compute_next_event(&event, last, false) { + Ok(next) => next, + Err(err) => { + eprintln!("compute_next_event for '{}' failed - {}", event_str, err); + continue; + } + }; + let now = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(epoch_now) => epoch_now.as_secs() as i64, + Err(err) => { + eprintln!("query system time failed - {}", err); + continue; + } + }; + if next > now { continue; } + + let store2 = store.clone(); + + if let Err(err) = WorkerTask::new_thread( + worker_type, + Some(store.clone()), + "root@pam", + false, + move |worker| { + worker.log(format!("starting garbage collection on store {}", store)); + worker.log(format!("task triggered by schedule '{}'", event_str)); + datastore.garbage_collection(&worker) + } + ) { + eprintln!("unable to start garbage collection on store {} - {}", store2, err); + } + } +}