api: add get_active_operations endpoint

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
This commit is contained in:
Hannes Laimer
2022-04-12 05:26:00 +00:00
committed by Thomas Lamprecht
parent 758c6ed588
commit 5fd823c3f2
2 changed files with 75 additions and 17 deletions

View File

@ -1,6 +1,7 @@
use anyhow::Error;
use libc::pid_t;
use nix::unistd::Pid;
use std::iter::Sum;
use std::path::PathBuf;
use pbs_api_types::Operation;
@ -8,12 +9,46 @@ use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file,
use proxmox_sys::linux::procfs;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Copy, Default)]
pub struct ActiveOperationStats {
pub read: i64,
pub write: i64,
}
impl Sum<Self> for ActiveOperationStats {
fn sum<I>(iter: I) -> Self
where
I: Iterator<Item = Self>,
{
iter.fold(Self::default(), |a, b| Self {
read: a.read + b.read,
write: a.write + b.write,
})
}
}
#[derive(Deserialize, Serialize, Clone)]
struct TaskOperations {
pid: u32,
starttime: u64,
reading_operations: i64,
writing_operations: i64,
active_operations: ActiveOperationStats,
}
pub fn get_active_operations(name: &str) -> Result<ActiveOperationStats, Error> {
let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
Ok(match file_read_optional_string(&path)? {
Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)?
.iter()
.filter_map(
|task| match procfs::check_process_running(task.pid as pid_t) {
Some(stat) if task.starttime == stat.starttime => Some(task.active_operations),
_ => None,
},
)
.sum(),
None => ActiveOperationStats::default(),
})
}
pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
@ -43,8 +78,8 @@ pub fn update_active_operations(name: &str, operation: Operation, count: i64) ->
if pid == task.pid {
updated = true;
match operation {
Operation::Read => task.reading_operations += count,
Operation::Write => task.writing_operations += count,
Operation::Read => task.active_operations.read += count,
Operation::Write => task.active_operations.write += count,
};
}
Some(task.clone())
@ -57,18 +92,12 @@ pub fn update_active_operations(name: &str, operation: Operation, count: i64) ->
};
if !updated {
updated_tasks.push(match operation {
Operation::Read => TaskOperations {
pid,
starttime,
reading_operations: 1,
writing_operations: 0,
},
Operation::Write => TaskOperations {
pid,
starttime,
reading_operations: 0,
writing_operations: 1,
updated_tasks.push(TaskOperations {
pid,
starttime,
active_operations: match operation {
Operation::Read => ActiveOperationStats { read: 1, write: 0 },
Operation::Write => ActiveOperationStats { read: 0, write: 1 },
},
})
}