src/bin/proxmox-backup-proxy.rs: add simple task scheduler for garbage collection

This commit is contained in:
Dietmar Maurer 2020-05-20 08:59:45 +02:00
parent d6c28ddf84
commit 8545480a31
2 changed files with 187 additions and 0 deletions

View File

@ -397,6 +397,10 @@ impl DataStore {
self.last_gc_status.lock().unwrap().clone()
}
pub fn garbage_collection_running(&self) -> bool {
if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
}
pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {

View File

@ -107,6 +107,8 @@ async fn run() -> Result<(), Error> {
bail!("unable to start daemon - {}", err);
}
start_task_scheduler();
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
proxmox_backup::server::last_worker_future().await?;
@ -114,3 +116,184 @@ async fn run() -> Result<(), Error> {
Ok(())
}
fn start_task_scheduler() {
let abort_future = server::shutdown_future();
let future = Box::pin(run_task_scheduler());
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
}
use std::time:: {Instant, Duration, SystemTime, UNIX_EPOCH};
fn next_minute() -> Result<Instant, Error> {
let epoch_now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60);
Ok(Instant::now() + epoch_next - epoch_now)
}
async fn run_task_scheduler() {
let mut count: usize = 0;
loop {
count += 1;
let delay_target = match next_minute() { // try to run very minute
Ok(d) => d,
Err(err) => {
eprintln!("task scheduler: compute next minute failed - {}", err);
tokio::time::delay_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await;
continue;
}
};
if count > 2 { // wait 1..2 minutes before starting
match schedule_tasks().catch_unwind().await {
Err(panic) => {
match panic.downcast::<&str>() {
Ok(msg) => {
eprintln!("task scheduler panic: {}", msg);
}
Err(_) => {
eprintln!("task scheduler panic - unknown type");
}
}
}
Ok(Err(err)) => {
eprintln!("task scheduler failed - {:?}", err);
}
Ok(Ok(_)) => {}
}
}
tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
}
}
async fn schedule_tasks() -> Result<(), Error> {
schedule_datastore_garbage_collection().await;
Ok(())
}
fn lookup_last_worker_start(worker_type: &str, worker_id: &str) -> Result<i64, Error> {
let list = proxmox_backup::server::read_task_list()?;
for entry in list {
if entry.upid.worker_type == worker_type {
if let Some(id) = entry.upid.worker_id {
if id == worker_id {
return Ok(entry.upid.starttime);
}
}
}
}
Ok(0)
}
async fn schedule_datastore_garbage_collection() {
use proxmox_backup::backup::DataStore;
use proxmox_backup::server::{UPID, WorkerTask};
use proxmox_backup::tools::systemd::time::{
parse_calendar_event, compute_next_event};
let config = match proxmox_backup::config::datastore::config() {
Err(err) => {
eprintln!("unable to read datastore config - {}", err);
return;
}
Ok((config, _digest)) => config,
};
for (store, (_, store_config)) in config.sections {
let datastore = match DataStore::lookup_datastore(&store) {
Ok(datastore) => datastore,
Err(err) => {
eprintln!("lookup_datastore failed - {}", err);
continue;
}
};
let store_config: proxmox_backup::config::datastore::DataStoreConfig = match serde_json::from_value(store_config) {
Ok(c) => c,
Err(err) => {
eprintln!("datastore config from_value failed - {}", err);
continue;
}
};
let event_str = match store_config.gc_schedule {
Some(event_str) => event_str,
None => continue,
};
let event = match parse_calendar_event(&event_str) {
Ok(event) => event,
Err(err) => {
eprintln!("unable to parse schedule '{}' - {}", event_str, err);
continue;
}
};
if datastore.garbage_collection_running() { continue; }
let worker_type = "garbage_collection";
let stat = datastore.last_gc_status();
let last = if let Some(upid_str) = stat.upid {
match upid_str.parse::<UPID>() {
Ok(upid) => upid.starttime,
Err(err) => {
eprintln!("unable to parse upid '{}' - {}", upid_str, err);
continue;
}
}
} else {
match lookup_last_worker_start(worker_type, &store) {
Ok(t) => t,
Err(err) => {
eprintln!("lookup_last_job_start failed: {}", err);
continue;
}
}
};
let next = match compute_next_event(&event, last, false) {
Ok(next) => next,
Err(err) => {
eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
continue;
}
};
let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(epoch_now) => epoch_now.as_secs() as i64,
Err(err) => {
eprintln!("query system time failed - {}", err);
continue;
}
};
if next > now { continue; }
let store2 = store.clone();
if let Err(err) = WorkerTask::new_thread(
worker_type,
Some(store.clone()),
"root@pam",
false,
move |worker| {
worker.log(format!("starting garbage collection on store {}", store));
worker.log(format!("task triggered by schedule '{}'", event_str));
datastore.garbage_collection(&worker)
}
) {
eprintln!("unable to start garbage collection on store {} - {}", store2, err);
}
}
}