proxmox-backup/src/server/jobstate.rs

263 lines
8.2 KiB
Rust
Raw Normal View History

//! Generic JobState handling
//!
//! A 'Job' can have 3 states
//! - Created, when a schedule was created but never executed
//! - Started, when a job is running right now
//! - Finished, when a job was running in the past
//!
//! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
//!
//! This module Provides 2 helper structs to handle those coniditons
//! 'Job' which handles locking and writing to a file
//! 'JobState' which is the actual state
//!
//! an example usage would be
//! ```no_run
//! # use anyhow::{bail, Error};
//! # use proxmox_backup::server::TaskState;
2020-10-28 09:52:16 +00:00
//! # use proxmox_backup::server::jobstate::*;
//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
//! # fn code() -> Result<(), Error> {
//! // locks the correct file under /var/lib
//! // or fails if someone else holds the lock
//! let mut job = match Job::new("jobtype", "jobname") {
//! Ok(job) => job,
//! Err(err) => bail!("could not lock jobstate"),
//! };
//!
//! // job holds the lock, we can start it
//! job.start("someupid")?;
//! // do something
//! let task_state = some_code();
//! job.finish(task_state)?;
//!
//! // release the lock
//! drop(job);
//! # Ok(())
//! # }
//!
//! ```
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{bail, format_err, Error};
use proxmox::tools::fs::{
create_path, file_read_optional_string, open_file_locked, replace_file, CreateOptions,
};
use serde::{Deserialize, Serialize};
use crate::server::{upid_read_status, worker_is_active_local, TaskState, UPID};
#[serde(rename_all = "kebab-case")]
#[derive(Serialize, Deserialize)]
/// Represents the State of a specific Job
pub enum JobState {
/// A job was created at 'time', but never started/finished
Created { time: i64 },
/// The Job was last started in 'upid',
Started { upid: String },
/// The Job was last started in 'upid', which finished with 'state'
Finished { upid: String, state: TaskState },
}
/// Represents a Job and holds the correct lock
pub struct Job {
jobtype: String,
jobname: String,
/// The State of the job
pub state: JobState,
_lock: File,
}
const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
/// Create jobstate stat dir with correct permission
pub fn create_jobstate_dir() -> Result<(), Error> {
let backup_user = crate::backup::backup_user()?;
let opts = CreateOptions::new()
.owner(backup_user.uid)
.group(backup_user.gid);
create_path(JOB_STATE_BASEDIR, None, Some(opts))
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
Ok(())
}
fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
let mut path = PathBuf::from(JOB_STATE_BASEDIR);
path.push(format!("{}-{}.json", jobtype, jobname));
path
}
fn get_lock<P>(path: P) -> Result<File, Error>
where
P: AsRef<Path>,
{
let mut path = path.as_ref().to_path_buf();
path.set_extension("lck");
2020-09-28 08:50:44 +00:00
let lock = open_file_locked(&path, Duration::new(10, 0), true)?;
let backup_user = crate::backup::backup_user()?;
nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
Ok(lock)
}
/// Removes the statefile of a job, this is useful if we delete a job
pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
let mut path = get_path(jobtype, jobname);
let _lock = get_lock(&path)?;
std::fs::remove_file(&path).map_err(|err| {
format_err!(
"cannot remove statefile for {} - {}: {}",
jobtype,
jobname,
err
)
})?;
path.set_extension("lck");
// ignore errors
let _ = std::fs::remove_file(&path).map_err(|err| {
format_err!(
"cannot remove lockfile for {} - {}: {}",
jobtype,
jobname,
err
)
});
Ok(())
}
/// Creates the statefile with the state 'Created'
/// overwrites if it exists already
pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
let mut job = Job::new(jobtype, jobname)?;
job.write_state()
}
/// Returns the last run time of a job by reading the statefile
/// Note that this is not locked
pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
match JobState::load(jobtype, jobname)? {
JobState::Created { time } => Ok(time),
JobState::Started { upid } | JobState::Finished { upid, .. } => {
let upid: UPID = upid
.parse()
.map_err(|err| format_err!("could not parse upid from state: {}", err))?;
Ok(upid.starttime)
}
}
}
impl JobState {
/// Loads and deserializes the jobstate from type and name.
/// When the loaded state indicates a started UPID,
/// we go and check if it has already stopped, and
/// returning the correct state.
///
/// This does not update the state in the file.
pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
match serde_json::from_str(&state)? {
JobState::Started { upid } => {
let parsed: UPID = upid
.parse()
.map_err(|err| format_err!("error parsing upid: {}", err))?;
if !worker_is_active_local(&parsed) {
let state = upid_read_status(&parsed)
.map_err(|err| format_err!("error reading upid log status: {}", err))?;
Ok(JobState::Finished { upid, state })
} else {
Ok(JobState::Started { upid })
}
}
other => Ok(other),
}
} else {
Ok(JobState::Created {
time: proxmox::tools::time::epoch_i64() - 30,
})
}
}
}
impl Job {
/// Creates a new instance of a job with the correct lock held
/// (will be hold until the job is dropped again).
///
/// This does not load the state from the file, to do that,
/// 'load' must be called
pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
let path = get_path(jobtype, jobname);
let _lock = get_lock(&path)?;
Ok(Self {
jobtype: jobtype.to_string(),
jobname: jobname.to_string(),
state: JobState::Created {
time: proxmox::tools::time::epoch_i64(),
},
_lock,
})
}
/// Start the job and update the statefile accordingly
/// Fails if the job was already started
pub fn start(&mut self, upid: &str) -> Result<(), Error> {
match self.state {
JobState::Started { .. } => {
bail!("cannot start job that is started!");
}
_ => {}
}
self.state = JobState::Started {
upid: upid.to_string(),
};
self.write_state()
}
/// Finish the job and update the statefile accordingly with the given taskstate
/// Fails if the job was not yet started
pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
let upid = match &self.state {
JobState::Created { .. } => bail!("cannot finish when not started"),
JobState::Started { upid } => upid,
JobState::Finished { upid, .. } => upid,
}
.to_string();
self.state = JobState::Finished { upid, state };
self.write_state()
}
pub fn jobtype(&self) -> &str {
&self.jobtype
}
pub fn jobname(&self) -> &str {
&self.jobname
}
fn write_state(&mut self) -> Result<(), Error> {
let serialized = serde_json::to_string(&self.state)?;
let path = get_path(&self.jobtype, &self.jobname);
let backup_user = crate::backup::backup_user()?;
let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
// set the correct owner/group/permissions while saving file
// owner(rw) = backup, group(r)= backup
let options = CreateOptions::new()
.perm(mode)
.owner(backup_user.uid)
.group(backup_user.gid);
replace_file(path, serialized.as_bytes(), options)
}
}