api2/pull: extend do_sync_job to also handle schedule and jobstate
so that we can log if triggered by a schedule, and writing to a jobstatefile also correctly polls now the abort_future of the worker, so that users can stop a sync Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
parent
42b68f72e6
commit
02543a5c7f
|
@ -9,7 +9,7 @@ use crate::api2::types::*;
|
|||
use crate::api2::pull::do_sync_job;
|
||||
use crate::config::sync::{self, SyncJobStatus, SyncJobConfig};
|
||||
use crate::server::UPID;
|
||||
use crate::config::jobstate::JobState;
|
||||
use crate::config::jobstate::{Job, JobState};
|
||||
use crate::tools::systemd::time::{
|
||||
parse_calendar_event, compute_next_event};
|
||||
|
||||
|
@ -87,7 +87,10 @@ fn run_sync_job(
|
|||
|
||||
let userid: Userid = rpcenv.get_user().unwrap().parse()?;
|
||||
|
||||
let upid_str = do_sync_job(&id, sync_job, &userid)?;
|
||||
let mut job = Job::new("syncjob", &id)?;
|
||||
job.load()?;
|
||||
|
||||
let upid_str = do_sync_job(&id, sync_job, &userid, None, job)?;
|
||||
|
||||
Ok(upid_str)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
use std::sync::{Arc};
|
||||
|
||||
use anyhow::{format_err, Error};
|
||||
use futures::{select, future::FutureExt};
|
||||
|
||||
use proxmox::api::api;
|
||||
use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
|
||||
|
@ -13,6 +14,7 @@ use crate::api2::types::*;
|
|||
use crate::config::{
|
||||
remote,
|
||||
sync::SyncJobConfig,
|
||||
jobstate::Job,
|
||||
acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ},
|
||||
cached_user_info::CachedUserInfo,
|
||||
};
|
||||
|
@ -67,28 +69,60 @@ pub fn do_sync_job(
|
|||
id: &str,
|
||||
sync_job: SyncJobConfig,
|
||||
userid: &Userid,
|
||||
schedule: Option<String>,
|
||||
mut job: Job,
|
||||
) -> Result<String, Error> {
|
||||
|
||||
let job_id = id.to_string();
|
||||
let worker_type = "syncjob";
|
||||
|
||||
let upid_str = WorkerTask::spawn(
|
||||
worker_type,
|
||||
Some(id.to_string()),
|
||||
userid.clone(),
|
||||
false,
|
||||
move |worker| async move {
|
||||
|
||||
job.start(&worker.upid().to_string())?;
|
||||
|
||||
let worker2 = worker.clone();
|
||||
|
||||
let worker_future = async move {
|
||||
|
||||
let upid_str = WorkerTask::spawn("syncjob", Some(id.to_string()), userid.clone(), false, move |worker| async move {
|
||||
let delete = sync_job.remove_vanished.unwrap_or(true);
|
||||
let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
|
||||
|
||||
worker.log(format!("sync job '{}' start", &job_id));
|
||||
worker.log(format!("Starting datastore sync job '{}'", job_id));
|
||||
if let Some(event_str) = schedule {
|
||||
worker.log(format!("task triggered by schedule '{}'", event_str));
|
||||
}
|
||||
worker.log(format!("Sync datastore '{}' from '{}/{}'",
|
||||
sync_job.store, sync_job.remote, sync_job.remote_store));
|
||||
|
||||
crate::client::pull::pull_store(
|
||||
&worker,
|
||||
&client,
|
||||
&src_repo,
|
||||
tgt_store.clone(),
|
||||
delete,
|
||||
Userid::backup_userid().clone(),
|
||||
).await?;
|
||||
crate::client::pull::pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, Userid::backup_userid().clone()).await?;
|
||||
|
||||
worker.log(format!("sync job '{}' end", &job_id));
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
|
||||
|
||||
let res = select!{
|
||||
worker = worker_future.fuse() => worker,
|
||||
abort = abort_future => abort,
|
||||
};
|
||||
|
||||
let status = worker2.create_state(&res);
|
||||
|
||||
match job.finish(status) {
|
||||
Ok(_) => {},
|
||||
Err(err) => {
|
||||
eprintln!("could not finish job state: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
})?;
|
||||
|
||||
Ok(upid_str)
|
||||
|
|
Loading…
Reference in New Issue