cleanup: merge endtime into TaskState

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2020-08-13 14:30:17 +02:00 committed by Dietmar Maurer
parent 97af919530
commit 77bd2a469c
5 changed files with 81 additions and 64 deletions

View File

@ -42,9 +42,9 @@ pub fn list_sync_jobs(
let parsed_upid: UPID = upid.parse()?; let parsed_upid: UPID = upid.parse()?;
(Some(upid), None, None, parsed_upid.starttime) (Some(upid), None, None, parsed_upid.starttime)
}, },
JobState::Finished { upid, endtime, state } => { JobState::Finished { upid, state } => {
let parsed_upid: UPID = upid.parse()?; let parsed_upid: UPID = upid.parse()?;
(Some(upid), Some(endtime), Some(state.to_string()), parsed_upid.starttime) (Some(upid), Some(state.endtime()), Some(state.to_string()), parsed_upid.starttime)
}, },
}; };

View File

@ -105,7 +105,7 @@ async fn get_task_status(
if crate::server::worker_is_active(&upid).await? { if crate::server::worker_is_active(&upid).await? {
result["status"] = Value::from("running"); result["status"] = Value::from("running");
} else { } else {
let (_, exitstatus) = crate::server::upid_read_status(&upid).unwrap_or((0, TaskState::Unknown)); let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
result["status"] = Value::from("stopped"); result["status"] = Value::from("stopped");
result["exitstatus"] = Value::from(exitstatus.to_string()); result["exitstatus"] = Value::from(exitstatus.to_string());
}; };
@ -352,8 +352,9 @@ pub fn list_tasks(
if let Some(ref state) = info.state { if let Some(ref state) = info.state {
if running { continue; } if running { continue; }
if errors && state.1 == crate::server::TaskState::OK { match state {
continue; crate::server::TaskState::OK { .. } if errors => continue,
_ => {},
} }
} }

View File

@ -595,7 +595,7 @@ impl From<crate::server::TaskListInfo> for TaskListItem {
fn from(info: crate::server::TaskListInfo) -> Self { fn from(info: crate::server::TaskListInfo) -> Self {
let (endtime, status) = info let (endtime, status) = info
.state .state
.map_or_else(|| (None, None), |(a,b)| (Some(a), Some(b.to_string()))); .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
TaskListItem { TaskListItem {
upid: info.upid_str, upid: info.upid_str,

View File

@ -16,7 +16,7 @@
//! # use anyhow::{bail, Error}; //! # use anyhow::{bail, Error};
//! # use proxmox_backup::server::TaskState; //! # use proxmox_backup::server::TaskState;
//! # use proxmox_backup::config::jobstate::*; //! # use proxmox_backup::config::jobstate::*;
//! # fn some_code() -> TaskState { TaskState::OK } //! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
//! # fn code() -> Result<(), Error> { //! # fn code() -> Result<(), Error> {
//! // locks the correct file under /var/lib //! // locks the correct file under /var/lib
//! // or fails if someone else holds the lock //! // or fails if someone else holds the lock
@ -62,8 +62,8 @@ pub enum JobState {
Created { time: i64 }, Created { time: i64 },
/// The Job was last started in 'upid', /// The Job was last started in 'upid',
Started { upid: String }, Started { upid: String },
/// The Job was last started in 'upid', which finished with 'state' at 'endtime' /// The Job was last started in 'upid', which finished with 'state'
Finished { upid: String, endtime: i64, state: TaskState } Finished { upid: String, state: TaskState }
} }
/// Represents a Job and holds the correct lock /// Represents a Job and holds the correct lock
@ -143,12 +143,11 @@ impl JobState {
.map_err(|err| format_err!("error parsing upid: {}", err))?; .map_err(|err| format_err!("error parsing upid: {}", err))?;
if !worker_is_active_local(&parsed) { if !worker_is_active_local(&parsed) {
let (endtime, state) = upid_read_status(&parsed) let state = upid_read_status(&parsed)
.map_err(|err| format_err!("error reading upid log status: {}", err))?; .map_err(|err| format_err!("error reading upid log status: {}", err))?;
Ok(JobState::Finished { Ok(JobState::Finished {
upid, upid,
endtime,
state state
}) })
} else { } else {
@ -225,11 +224,8 @@ impl Job {
JobState::Finished { upid, .. } => upid, JobState::Finished { upid, .. } => upid,
}.to_string(); }.to_string();
let endtime: i64 = epoch_now_u64()? as i64;
self.state = JobState::Finished { self.state = JobState::Finished {
upid, upid,
endtime,
state, state,
}; };

View File

@ -156,7 +156,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
super::send_command(socketname, cmd).map_ok(|_| ()).await super::send_command(socketname, cmd).map_ok(|_| ()).await
} }
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
let data = line.splitn(3, ' ').collect::<Vec<&str>>(); let data = line.splitn(3, ' ').collect::<Vec<&str>>();
@ -166,7 +166,8 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
3 => { 3 => {
let endtime = i64::from_str_radix(data[1], 16)?; let endtime = i64::from_str_radix(data[1], 16)?;
Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned())))) let state = TaskState::from_endtime_and_message(endtime, data[2])?;
Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
} }
_ => bail!("wrong number of components"), _ => bail!("wrong number of components"),
} }
@ -193,9 +194,9 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
/// Read endtime (time of last log line) and exitstatus from task log file /// Read endtime (time of last log line) and exitstatus from task log file
/// If there is not a single line with at valid datetime, we assume the /// If there is not a single line with at valid datetime, we assume the
/// starttime to be the endtime /// starttime to be the endtime
pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> { pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
let mut status = TaskState::Unknown; let mut endtime = upid.starttime;
let mut time = upid.starttime; let mut status = TaskState::Unknown { endtime };
let path = upid.log_path(); let path = upid.log_path();
@ -213,7 +214,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
let mut iter = line.splitn(2, ": "); let mut iter = line.splitn(2, ": ");
if let Some(time_str) = iter.next() { if let Some(time_str) = iter.next() {
time = chrono::DateTime::parse_from_rfc3339(time_str) endtime = chrono::DateTime::parse_from_rfc3339(time_str)
.map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))? .map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))?
.timestamp(); .timestamp();
} else { } else {
@ -222,65 +223,82 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
None => continue, None => continue,
Some(rest) => { Some(rest) => {
if let Ok(state) = rest.parse() { if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
status = state; status = state;
} }
} }
} }
} }
Ok((time, status)) Ok(status)
} }
/// Task State /// Task State
#[derive(Debug, PartialEq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskState { pub enum TaskState {
/// The Task ended with an undefined state /// The Task ended with an undefined state
Unknown, Unknown { endtime: i64 },
/// The Task ended and there were no errors or warnings /// The Task ended and there were no errors or warnings
OK, OK { endtime: i64 },
/// The Task had 'count' amount of warnings and no errors /// The Task had 'count' amount of warnings and no errors
Warning { count: u64 }, Warning { count: u64, endtime: i64 },
/// The Task ended with the error described in 'message' /// The Task ended with the error described in 'message'
Error { message: String }, Error { message: String, endtime: i64 },
} }
impl TaskState { impl TaskState {
pub fn endtime(&self) -> i64 {
match *self {
TaskState::Unknown { endtime } => endtime,
TaskState::OK { endtime } => endtime,
TaskState::Warning { endtime, .. } => endtime,
TaskState::Error { endtime, .. } => endtime,
}
}
fn result_text(&self) -> String { fn result_text(&self) -> String {
match self { match self {
TaskState::Error { message } => format!("TASK ERROR: {}", message), TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
other => format!("TASK {}", other), other => format!("TASK {}", other),
} }
} }
fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
if s == "unknown" {
Ok(TaskState::Unknown { endtime })
} else if s == "OK" {
Ok(TaskState::OK { endtime })
} else if s.starts_with("WARNINGS: ") {
let count: u64 = s[10..].parse()?;
Ok(TaskState::Warning{ count, endtime })
} else if s.len() > 0 {
let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
Ok(TaskState::Error{ message, endtime })
} else {
bail!("unable to parse Task Status '{}'", s);
}
}
}
impl std::cmp::PartialOrd for TaskState {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.endtime().cmp(&other.endtime()))
}
}
impl std::cmp::Ord for TaskState {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.endtime().cmp(&other.endtime())
}
} }
impl std::fmt::Display for TaskState { impl std::fmt::Display for TaskState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
TaskState::Unknown => write!(f, "unknown"), TaskState::Unknown { .. } => write!(f, "unknown"),
TaskState::OK => write!(f, "OK"), TaskState::OK { .. }=> write!(f, "OK"),
TaskState::Warning { count } => write!(f, "WARNINGS: {}", count), TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
TaskState::Error { message } => write!(f, "{}", message), TaskState::Error { message, .. } => write!(f, "{}", message),
}
}
}
impl std::str::FromStr for TaskState {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "unknown" {
Ok(TaskState::Unknown)
} else if s == "OK" {
Ok(TaskState::OK)
} else if s.starts_with("WARNINGS: ") {
let count: u64 = s[10..].parse()?;
Ok(TaskState::Warning{ count })
} else if s.len() > 0 {
let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
Ok(TaskState::Error{ message })
} else {
bail!("unable to parse Task Status '{}'", s);
} }
} }
} }
@ -295,7 +313,7 @@ pub struct TaskListInfo {
/// UPID string representation /// UPID string representation
pub upid_str: String, pub upid_str: String,
/// Task `(endtime, status)` if already finished /// Task `(endtime, status)` if already finished
pub state: Option<(i64, TaskState)>, // endtime, status pub state: Option<TaskState>, // endtime, status
} }
// atomically read/update the task list, update status of finished tasks // atomically read/update the task list, update status of finished tasks
@ -334,15 +352,15 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
}, },
None => { None => {
println!("Detected stopped UPID {}", upid_str); println!("Detected stopped UPID {}", upid_str);
let (time, status) = upid_read_status(&upid) let status = upid_read_status(&upid)
.unwrap_or_else(|_| (Local::now().timestamp(), TaskState::Unknown)); .unwrap_or_else(|_| TaskState::Unknown { endtime: Local::now().timestamp() });
finish_list.push(TaskListInfo { finish_list.push(TaskListInfo {
upid, upid_str, state: Some((time, status)) upid, upid_str, state: Some(status)
}); });
}, },
Some((endtime, status)) => { Some(status) => {
finish_list.push(TaskListInfo { finish_list.push(TaskListInfo {
upid, upid_str, state: Some((endtime, status.parse()?)) upid, upid_str, state: Some(status)
}) })
} }
} }
@ -378,7 +396,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
task_list.sort_unstable_by(|b, a| { // lastest on top task_list.sort_unstable_by(|b, a| { // lastest on top
match (&a.state, &b.state) { match (&a.state, &b.state) {
(Some(s1), Some(s2)) => s1.0.cmp(&s2.0), (Some(s1), Some(s2)) => s1.cmp(&s2),
(Some(_), None) => std::cmp::Ordering::Less, (Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater, (None, Some(_)) => std::cmp::Ordering::Greater,
_ => a.upid.starttime.cmp(&b.upid.starttime), _ => a.upid.starttime.cmp(&b.upid.starttime),
@ -387,8 +405,8 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
let mut raw = String::new(); let mut raw = String::new();
for info in &task_list { for info in &task_list {
if let Some((endtime, status)) = &info.state { if let Some(status) = &info.state {
raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
} else { } else {
raw.push_str(&info.upid_str); raw.push_str(&info.upid_str);
raw.push('\n'); raw.push('\n');
@ -559,12 +577,14 @@ impl WorkerTask {
pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
let warn_count = self.data.lock().unwrap().warn_count; let warn_count = self.data.lock().unwrap().warn_count;
let endtime = Local::now().timestamp();
if let Err(err) = result { if let Err(err) = result {
TaskState::Error { message: err.to_string() } TaskState::Error { message: err.to_string(), endtime }
} else if warn_count > 0 { } else if warn_count > 0 {
TaskState::Warning { count: warn_count } TaskState::Warning { count: warn_count, endtime }
} else { } else {
TaskState::OK TaskState::OK { endtime }
} }
} }