rrd: rust fmt

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-04-06 16:56:33 +02:00
parent 41583796b1
commit aaaa10894d
7 changed files with 226 additions and 209 deletions

View File

@ -3,24 +3,22 @@
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use proxmox_router::cli::{
complete_file_name, run_cli_command, CliCommand, CliCommandMap, CliEnvironment,
};
use proxmox_router::RpcEnvironment; use proxmox_router::RpcEnvironment;
use proxmox_router::cli::{run_cli_command, complete_file_name, CliCommand, CliCommandMap, CliEnvironment};
use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema}; use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema};
use proxmox_sys::fs::CreateOptions; use proxmox_sys::fs::CreateOptions;
use proxmox_rrd::rrd::{CF, DST, RRA, RRD}; use proxmox_rrd::rrd::{CF, DST, RRA, RRD};
pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new( pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new("Index of the RRA.").minimum(0).schema();
"Index of the RRA.")
.minimum(0)
.schema();
pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new( pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new("RRA configuration")
"RRA configuration")
.format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA)) .format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA))
.schema(); .schema();
@ -49,7 +47,6 @@ pub struct RRAConfig {
)] )]
/// Dump the RRD file in JSON format /// Dump the RRD file in JSON format
pub fn dump_rrd(path: String) -> Result<(), Error> { pub fn dump_rrd(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
serde_json::to_writer_pretty(std::io::stdout(), &rrd)?; serde_json::to_writer_pretty(std::io::stdout(), &rrd)?;
println!(); println!();
@ -67,14 +64,19 @@ pub fn dump_rrd(path: String) -> Result<(), Error> {
)] )]
/// RRD file information /// RRD file information
pub fn rrd_info(path: String) -> Result<(), Error> { pub fn rrd_info(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
println!("DST: {:?}", rrd.source.dst); println!("DST: {:?}", rrd.source.dst);
for (i, rra) in rrd.rra_list.iter().enumerate() { for (i, rra) in rrd.rra_list.iter().enumerate() {
// use RRAConfig property string format // use RRAConfig property string format
println!("RRA[{}]: {:?},r={},n={}", i, rra.cf, rra.resolution, rra.data.len()); println!(
"RRA[{}]: {:?},r={},n={}",
i,
rra.cf,
rra.resolution,
rra.data.len()
);
} }
Ok(()) Ok(())
@ -97,15 +99,11 @@ pub fn rrd_info(path: String) -> Result<(), Error> {
}, },
)] )]
/// Update the RRD database /// Update the RRD database
pub fn update_rrd( pub fn update_rrd(path: String, time: Option<u64>, value: f64) -> Result<(), Error> {
path: String,
time: Option<u64>,
value: f64,
) -> Result<(), Error> {
let path = PathBuf::from(path); let path = PathBuf::from(path);
let time = time.map(|v| v as f64) let time = time
.map(|v| v as f64)
.unwrap_or_else(proxmox_time::epoch_f64); .unwrap_or_else(proxmox_time::epoch_f64);
let mut rrd = RRD::load(&path, false)?; let mut rrd = RRD::load(&path, false)?;
@ -147,7 +145,6 @@ pub fn fetch_rrd(
start: Option<u64>, start: Option<u64>,
end: Option<u64>, end: Option<u64>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
let data = rrd.extract_data(cf, resolution, start, end)?; let data = rrd.extract_data(cf, resolution, start, end)?;
@ -171,11 +168,7 @@ pub fn fetch_rrd(
)] )]
/// Return the Unix timestamp of the first time slot inside the /// Return the Unix timestamp of the first time slot inside the
/// specified RRA (slot start time) /// specified RRA (slot start time)
pub fn first_update_time( pub fn first_update_time(path: String, rra_index: usize) -> Result<(), Error> {
path: String,
rra_index: usize,
) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
if rra_index >= rrd.rra_list.len() { if rra_index >= rrd.rra_list.len() {
@ -200,7 +193,6 @@ pub fn first_update_time(
)] )]
/// Return the Unix timestamp of the last update /// Return the Unix timestamp of the last update
pub fn last_update_time(path: String) -> Result<(), Error> { pub fn last_update_time(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
println!("{}", rrd.source.last_update); println!("{}", rrd.source.last_update);
@ -218,7 +210,6 @@ pub fn last_update_time(path: String) -> Result<(), Error> {
)] )]
/// Return the time and value from the last update /// Return the time and value from the last update
pub fn last_update(path: String) -> Result<(), Error> { pub fn last_update(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?; let rrd = RRD::load(&PathBuf::from(path), false)?;
let result = json!({ let result = json!({
@ -251,18 +242,12 @@ pub fn last_update(path: String) -> Result<(), Error> {
}, },
)] )]
/// Create a new RRD file /// Create a new RRD file
pub fn create_rrd( pub fn create_rrd(dst: DST, path: String, rra: Vec<String>) -> Result<(), Error> {
dst: DST,
path: String,
rra: Vec<String>,
) -> Result<(), Error> {
let mut rra_list = Vec::new(); let mut rra_list = Vec::new();
for item in rra.iter() { for item in rra.iter() {
let rra: RRAConfig = serde_json::from_value( let rra: RRAConfig =
RRAConfig::API_SCHEMA.parse_property_string(item)? serde_json::from_value(RRAConfig::API_SCHEMA.parse_property_string(item)?)?;
)?;
println!("GOT {:?}", rra); println!("GOT {:?}", rra);
rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize)); rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize));
} }
@ -293,12 +278,7 @@ pub fn create_rrd(
}, },
)] )]
/// Resize. Change the number of data slots for the specified RRA. /// Resize. Change the number of data slots for the specified RRA.
pub fn resize_rrd( pub fn resize_rrd(path: String, rra_index: usize, slots: i64) -> Result<(), Error> {
path: String,
rra_index: usize,
slots: i64,
) -> Result<(), Error> {
let path = PathBuf::from(&path); let path = PathBuf::from(&path);
let mut rrd = RRD::load(&path, false)?; let mut rrd = RRD::load(&path, false)?;
@ -336,7 +316,6 @@ pub fn resize_rrd(
} }
fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
let uid = nix::unistd::Uid::current(); let uid = nix::unistd::Uid::current();
let username = match nix::unistd::User::from_uid(uid)? { let username = match nix::unistd::User::from_uid(uid)? {
@ -349,57 +328,56 @@ fn main() -> Result<(), Error> {
"create", "create",
CliCommand::new(&API_METHOD_CREATE_RRD) CliCommand::new(&API_METHOD_CREATE_RRD)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"dump", "dump",
CliCommand::new(&API_METHOD_DUMP_RRD) CliCommand::new(&API_METHOD_DUMP_RRD)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"fetch", "fetch",
CliCommand::new(&API_METHOD_FETCH_RRD) CliCommand::new(&API_METHOD_FETCH_RRD)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"first", "first",
CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME) CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"info", "info",
CliCommand::new(&API_METHOD_RRD_INFO) CliCommand::new(&API_METHOD_RRD_INFO)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"last", "last",
CliCommand::new(&API_METHOD_LAST_UPDATE_TIME) CliCommand::new(&API_METHOD_LAST_UPDATE_TIME)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"lastupdate", "lastupdate",
CliCommand::new(&API_METHOD_LAST_UPDATE) CliCommand::new(&API_METHOD_LAST_UPDATE)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"resize", "resize",
CliCommand::new(&API_METHOD_RESIZE_RRD) CliCommand::new(&API_METHOD_RESIZE_RRD)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) )
.insert( .insert(
"update", "update",
CliCommand::new(&API_METHOD_UPDATE_RRD) CliCommand::new(&API_METHOD_UPDATE_RRD)
.arg_param(&["path"]) .arg_param(&["path"])
.completion_cb("path", complete_file_name) .completion_cb("path", complete_file_name),
) );
;
let mut rpcenv = CliEnvironment::new(); let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(format!("{}@pam", username))); rpcenv.set_auth_id(Some(format!("{}@pam", username)));
@ -407,5 +385,4 @@ fn main() -> Result<(), Error> {
run_cli_command(cmd_def, rpcenv, None); run_cli_command(cmd_def, rpcenv, None);
Ok(()) Ok(())
} }

View File

@ -1,18 +1,18 @@
use std::collections::BTreeSet;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::io::{BufRead, BufReader};
use std::time::SystemTime;
use std::thread::spawn; use std::thread::spawn;
use std::os::unix::io::AsRawFd; use std::time::SystemTime;
use std::collections::BTreeSet;
use anyhow::{bail, format_err, Error};
use crossbeam_channel::{bounded, TryRecvError}; use crossbeam_channel::{bounded, TryRecvError};
use anyhow::{format_err, bail, Error};
use proxmox_sys::fs::{create_path, CreateOptions}; use proxmox_sys::fs::{create_path, CreateOptions};
use crate::rrd::{DST, CF, RRD, RRA}; use crate::rrd::{CF, DST, RRA, RRD};
mod journal; mod journal;
use journal::*; use journal::*;
@ -37,9 +37,7 @@ pub(crate) struct CacheConfig {
dir_options: CreateOptions, dir_options: CreateOptions,
} }
impl RRDCache { impl RRDCache {
/// Creates a new instance /// Creates a new instance
/// ///
/// `basedir`: All files are stored relative to this path. /// `basedir`: All files are stored relative to this path.
@ -66,7 +64,11 @@ impl RRDCache {
let file_options = file_options.unwrap_or_else(CreateOptions::new); let file_options = file_options.unwrap_or_else(CreateOptions::new);
let dir_options = dir_options.unwrap_or_else(CreateOptions::new); let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) create_path(
&basedir,
Some(dir_options.clone()),
Some(dir_options.clone()),
)
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
let config = Arc::new(CacheConfig { let config = Arc::new(CacheConfig {
@ -130,7 +132,6 @@ impl RRDCache {
let state = Arc::clone(&self.state); let state = Arc::clone(&self.state);
let rrd_map = Arc::clone(&self.rrd_map); let rrd_map = Arc::clone(&self.rrd_map);
let mut state_guard = self.state.write().unwrap(); let mut state_guard = self.state.write().unwrap();
let journal_applied = state_guard.journal_applied; let journal_applied = state_guard.journal_applied;
@ -160,7 +161,9 @@ impl RRDCache {
let now = proxmox_time::epoch_f64(); let now = proxmox_time::epoch_f64();
let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
if journal_applied && !wants_commit { return Ok(journal_applied); } if journal_applied && !wants_commit {
return Ok(journal_applied);
}
state_guard.last_journal_flush = proxmox_time::epoch_f64(); state_guard.last_journal_flush = proxmox_time::epoch_f64();
@ -176,7 +179,6 @@ impl RRDCache {
Ok(journal_applied) Ok(journal_applied)
} }
/// Update data in RAM and write file back to disk (journal) /// Update data in RAM and write file back to disk (journal)
pub fn update_value( pub fn update_value(
&self, &self,
@ -185,14 +187,18 @@ impl RRDCache {
value: f64, value: f64,
dst: DST, dst: DST,
) -> Result<(), Error> { ) -> Result<(), Error> {
let journal_applied = self.apply_journal()?; let journal_applied = self.apply_journal()?;
self.state.write().unwrap() self.state
.write()
.unwrap()
.append_journal_entry(time, value, dst, rel_path)?; .append_journal_entry(time, value, dst, rel_path)?;
if journal_applied { if journal_applied {
self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; self.rrd_map
.write()
.unwrap()
.update(rel_path, time, value, dst, false)?;
} }
Ok(()) Ok(())
@ -212,19 +218,19 @@ impl RRDCache {
start: Option<u64>, start: Option<u64>,
end: Option<u64>, end: Option<u64>,
) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> { ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
self.rrd_map.read().unwrap() self.rrd_map
.read()
.unwrap()
.extract_cached_data(base, name, cf, resolution, start, end) .extract_cached_data(base, name, cf, resolution, start, end)
} }
} }
fn apply_and_commit_journal_thread( fn apply_and_commit_journal_thread(
config: Arc<CacheConfig>, config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>, state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>, rrd_map: Arc<RwLock<RRDMap>>,
commit_only: bool, commit_only: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
if commit_only { if commit_only {
state.write().unwrap().rotate_journal()?; // start new journal, keep old one state.write().unwrap().rotate_journal()?; // start new journal, keep old one
} else { } else {
@ -234,7 +240,11 @@ fn apply_and_commit_journal_thread(
match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
Ok(entries) => { Ok(entries) => {
let elapsed = start_time.elapsed().unwrap().as_secs_f64(); let elapsed = start_time.elapsed().unwrap().as_secs_f64();
log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); log::info!(
"applied rrd journal ({} entries in {:.3} seconds)",
entries,
elapsed
);
} }
Err(err) => bail!("apply rrd journal failed - {}", err), Err(err) => bail!("apply rrd journal failed - {}", err),
} }
@ -246,8 +256,11 @@ fn apply_and_commit_journal_thread(
match commit_journal_impl(config, state, rrd_map) { match commit_journal_impl(config, state, rrd_map) {
Ok(rrd_file_count) => { Ok(rrd_file_count) => {
let elapsed = start_time.elapsed().unwrap().as_secs_f64(); let elapsed = start_time.elapsed().unwrap().as_secs_f64();
log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", log::info!(
rrd_file_count, elapsed); "rrd journal successfully committed ({} files in {:.3} seconds)",
rrd_file_count,
elapsed
);
} }
Err(err) => bail!("rrd journal commit failed: {}", err), Err(err) => bail!("rrd journal commit failed: {}", err),
} }
@ -261,7 +274,6 @@ fn apply_journal_lines(
reader: &mut BufReader<File>, reader: &mut BufReader<File>,
lock_read_line: bool, lock_read_line: bool,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let mut linenr = 0; let mut linenr = 0;
loop { loop {
@ -274,20 +286,30 @@ fn apply_journal_lines(
reader.read_line(&mut line)? reader.read_line(&mut line)?
}; };
if len == 0 { break; } if len == 0 {
break;
}
let entry: JournalEntry = match line.parse() { let entry: JournalEntry = match line.parse() {
Ok(entry) => entry, Ok(entry) => entry,
Err(err) => { Err(err) => {
log::warn!( log::warn!(
"unable to parse rrd journal '{}' line {} (skip) - {}", "unable to parse rrd journal '{}' line {} (skip) - {}",
journal_name, linenr, err, journal_name,
linenr,
err,
); );
continue; // skip unparsable lines continue; // skip unparsable lines
} }
}; };
rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; rrd_map.write().unwrap().update(
&entry.rel_path,
entry.time,
entry.value,
entry.dst,
true,
)?;
} }
Ok(linenr) Ok(linenr)
} }
@ -296,7 +318,6 @@ fn apply_journal_impl(
state: Arc<RwLock<JournalState>>, state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>, rrd_map: Arc<RwLock<RRDMap>>,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let mut lines = 0; let mut lines = 0;
// Apply old journals first // Apply old journals first
@ -343,7 +364,6 @@ fn apply_journal_impl(
state_guard.journal_applied = true; state_guard.journal_applied = true;
} }
Ok(lines) Ok(lines)
} }
@ -376,7 +396,6 @@ fn commit_journal_impl(
state: Arc<RwLock<JournalState>>, state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>, rrd_map: Arc<RwLock<RRDMap>>,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let files = rrd_map.read().unwrap().file_list(); let files = rrd_map.read().unwrap().file_list();
let mut rrd_file_count = 0; let mut rrd_file_count = 0;

View File

@ -1,21 +1,21 @@
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use std::io::{Write, BufReader};
use std::ffi::OsStr; use std::ffi::OsStr;
use std::fs::File;
use std::io::{BufReader, Write};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use nix::fcntl::OFlag;
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
use nix::fcntl::OFlag;
use proxmox_sys::fs::atomic_open_or_create_file; use proxmox_sys::fs::atomic_open_or_create_file;
const RRD_JOURNAL_NAME: &str = "rrd.journal"; const RRD_JOURNAL_NAME: &str = "rrd.journal";
use crate::rrd::DST;
use crate::cache::CacheConfig; use crate::cache::CacheConfig;
use crate::rrd::DST;
// shared state behind RwLock // shared state behind RwLock
pub struct JournalState { pub struct JournalState {
@ -37,7 +37,6 @@ impl FromStr for JournalEntry {
type Err = Error; type Err = Error;
fn from_str(line: &str) -> Result<Self, Self::Err> { fn from_str(line: &str) -> Result<Self, Self::Err> {
let line = line.trim(); let line = line.trim();
let parts: Vec<&str> = line.splitn(4, ':').collect(); let parts: Vec<&str> = line.splitn(4, ':').collect();
@ -45,11 +44,14 @@ impl FromStr for JournalEntry {
bail!("wrong numper of components"); bail!("wrong numper of components");
} }
let time: f64 = parts[0].parse() let time: f64 = parts[0]
.parse()
.map_err(|_| format_err!("unable to parse time"))?; .map_err(|_| format_err!("unable to parse time"))?;
let value: f64 = parts[1].parse() let value: f64 = parts[1]
.parse()
.map_err(|_| format_err!("unable to parse value"))?; .map_err(|_| format_err!("unable to parse value"))?;
let dst: u8 = parts[2].parse() let dst: u8 = parts[2]
.parse()
.map_err(|_| format_err!("unable to parse data source type"))?; .map_err(|_| format_err!("unable to parse data source type"))?;
let dst = match dst { let dst = match dst {
@ -60,7 +62,12 @@ impl FromStr for JournalEntry {
let rel_path = parts[3].to_string(); let rel_path = parts[3].to_string();
Ok(JournalEntry { time, value, dst, rel_path }) Ok(JournalEntry {
time,
value,
dst,
rel_path,
})
} }
} }
@ -71,7 +78,6 @@ pub struct JournalFileInfo {
} }
impl JournalState { impl JournalState {
pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> { pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
let journal = JournalState::open_journal_writer(&config)?; let journal = JournalState::open_journal_writer(&config)?;
Ok(Self { Ok(Self {
@ -95,14 +101,12 @@ impl JournalState {
dst: DST, dst: DST,
rel_path: &str, rel_path: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
let journal_entry = format!( let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
"{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
self.journal.write_all(journal_entry.as_bytes())?; self.journal.write_all(journal_entry.as_bytes())?;
Ok(()) Ok(())
} }
pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> { pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
// fixme : dup self.journal instead?? // fixme : dup self.journal instead??
let mut journal_path = self.config.basedir.clone(); let mut journal_path = self.config.basedir.clone();
journal_path.push(RRD_JOURNAL_NAME); journal_path.push(RRD_JOURNAL_NAME);
@ -151,7 +155,6 @@ impl JournalState {
} }
pub fn remove_old_journals(&self) -> Result<(), Error> { pub fn remove_old_journals(&self) -> Result<(), Error> {
let journal_list = self.list_old_journals()?; let journal_list = self.list_old_journals()?;
for entry in journal_list { for entry in journal_list {
@ -167,7 +170,9 @@ impl JournalState {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if !path.is_file() { continue; } if !path.is_file() {
continue;
}
match path.file_stem() { match path.file_stem() {
None => continue, None => continue,

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use anyhow::{bail, Error}; use anyhow::{bail, Error};
@ -17,7 +17,6 @@ pub struct RRDMap {
} }
impl RRDMap { impl RRDMap {
pub(crate) fn new( pub(crate) fn new(
config: Arc<CacheConfig>, config: Arc<CacheConfig>,
load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,

View File

@ -11,15 +11,15 @@
//! * Plattform independent (big endian f64, hopefully a standard format?) //! * Plattform independent (big endian f64, hopefully a standard format?)
//! * Arbitrary number of RRAs (dynamically changeable) //! * Arbitrary number of RRAs (dynamically changeable)
use std::path::Path;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::path::Path;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
use proxmox_sys::fs::{make_tmp_file, CreateOptions};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::fs::{make_tmp_file, CreateOptions};
use crate::rrd_v1; use crate::rrd_v1;
@ -69,7 +69,6 @@ pub struct DataSource {
} }
impl DataSource { impl DataSource {
/// Create a new Instance /// Create a new Instance
pub fn new(dst: DST) -> Self { pub fn new(dst: DST) -> Self {
Self { Self {
@ -118,8 +117,6 @@ impl DataSource {
Ok(value) Ok(value)
} }
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -136,7 +133,6 @@ pub struct RRA {
} }
impl RRA { impl RRA {
/// Creates a new instance /// Creates a new instance
pub fn new(cf: CF, resolution: u64, points: usize) -> Self { pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
Self { Self {
@ -181,7 +177,10 @@ impl RRA {
if let Some(v) = item { if let Some(v) = item {
self.data[index] = v; self.data[index] = v;
} }
index += 1; if index >= self.data.len() { index = 0; } index += 1;
if index >= self.data.len() {
index = 0;
}
} }
Ok(()) Ok(())
} }
@ -200,7 +199,10 @@ impl RRA {
for _ in 0..num_entries { for _ in 0..num_entries {
t += reso; t += reso;
index += 1; if index >= self.data.len() { index = 0; } index += 1;
if index >= self.data.len() {
index = 0;
}
if t < min_time { if t < min_time {
self.data[index] = f64::NAN; self.data[index] = f64::NAN;
} else { } else {
@ -233,8 +235,20 @@ impl RRA {
self.last_count = 1; self.last_count = 1;
} else { } else {
let new_value = match self.cf { let new_value = match self.cf {
CF::Maximum => if last_value > value { last_value } else { value }, CF::Maximum => {
CF::Minimum => if last_value < value { last_value } else { value }, if last_value > value {
last_value
} else {
value
}
}
CF::Minimum => {
if last_value < value {
last_value
} else {
value
}
}
CF::Last => value, CF::Last => value,
CF::Average => { CF::Average => {
(last_value * (self.last_count as f64)) / (new_count as f64) (last_value * (self.last_count as f64)) / (new_count as f64)
@ -269,7 +283,9 @@ impl RRA {
let mut t = start; let mut t = start;
let mut index = self.slot(t); let mut index = self.slot(t);
for _ in 0..num_entries { for _ in 0..num_entries {
if t > end { break; }; if t > end {
break;
};
if t < rrd_start || t >= rrd_end { if t < rrd_start || t >= rrd_end {
list.push(None); list.push(None);
} else { } else {
@ -281,7 +297,10 @@ impl RRA {
} }
} }
t += reso; t += reso;
index += 1; if index >= self.data.len() { index = 0; } index += 1;
if index >= self.data.len() {
index = 0;
}
} }
(start, reso, list) (start, reso, list)
@ -298,17 +317,11 @@ pub struct RRD {
} }
impl RRD { impl RRD {
/// Creates a new Instance /// Creates a new Instance
pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD { pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
let source = DataSource::new(dst); let source = DataSource::new(dst);
RRD { RRD { source, rra_list }
source,
rra_list,
}
} }
fn from_raw(raw: &[u8]) -> Result<Self, Error> { fn from_raw(raw: &[u8]) -> Result<Self, Error> {
@ -340,7 +353,6 @@ impl RRD {
/// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
/// the linux page cache. /// the linux page cache.
pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> { pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
let mut file = std::fs::File::open(path)?; let mut file = std::fs::File::open(path)?;
let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0); let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
let mut raw = Vec::with_capacity(buffer_size); let mut raw = Vec::with_capacity(buffer_size);
@ -352,12 +364,16 @@ impl RRD {
0, 0,
buffer_size as i64, buffer_size as i64,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
).map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?; )
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
} }
match Self::from_raw(&raw) { match Self::from_raw(&raw) {
Ok(rrd) => Ok(rrd), Ok(rrd) => Ok(rrd),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())), Err(err) => Err(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)),
} }
} }
@ -372,7 +388,6 @@ impl RRD {
options: CreateOptions, options: CreateOptions,
avoid_page_cache: bool, avoid_page_cache: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (fd, tmp_path) = make_tmp_file(&path, options)?; let (fd, tmp_path) = make_tmp_file(&path, options)?;
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
@ -419,7 +434,6 @@ impl RRD {
/// ///
/// Note: This does not call [Self::save]. /// Note: This does not call [Self::save].
pub fn update(&mut self, time: f64, value: f64) { pub fn update(&mut self, time: f64, value: f64) {
let value = match self.source.compute_new_value(time, value) { let value = match self.source.compute_new_value(time, value) {
Ok(value) => value, Ok(value) => value,
Err(err) => { Err(err) => {
@ -451,11 +465,14 @@ impl RRD {
start: Option<u64>, start: Option<u64>,
end: Option<u64>, end: Option<u64>,
) -> Result<(u64, u64, Vec<Option<f64>>), Error> { ) -> Result<(u64, u64, Vec<Option<f64>>), Error> {
let mut rra: Option<&RRA> = None; let mut rra: Option<&RRA> = None;
for item in self.rra_list.iter() { for item in self.rra_list.iter() {
if item.cf != cf { continue; } if item.cf != cf {
if item.resolution > resolution { continue; } continue;
}
if item.resolution > resolution {
continue;
}
if let Some(current) = rra { if let Some(current) = rra {
if item.resolution > current.resolution { if item.resolution > current.resolution {
@ -475,10 +492,8 @@ impl RRD {
None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution), None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -526,7 +541,11 @@ mod tests {
rrd.update((i as f64) * 30.0, i as f64); rrd.update((i as f64) * 30.0, i as f64);
} }
assert!(rrd.extract_data(CF::Average, 60, Some(0), Some(5*60)).is_err(), "CF::Average should not exist"); assert!(
rrd.extract_data(CF::Average, 60, Some(0), Some(5 * 60))
.is_err(),
"CF::Average should not exist"
);
let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?; let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?;
assert_eq!(start, 0); assert_eq!(start, 0);
@ -582,10 +601,12 @@ mod tests {
assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]); assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]);
// add much newer vaule (should delete all previous/outdated value) // add much newer vaule (should delete all previous/outdated value)
let i = 100; rrd.update((i as f64)*30.0, i as f64); let i = 100;
rrd.update((i as f64) * 30.0, i as f64);
println!("TEST {:?}", serde_json::to_string_pretty(&rrd)); println!("TEST {:?}", serde_json::to_string_pretty(&rrd));
let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(100*30 + 5*60))?; let (start, reso, data) =
rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(100 * 30 + 5 * 60))?;
assert_eq!(start, 100 * 30); assert_eq!(start, 100 * 30);
assert_eq!(reso, 60); assert_eq!(reso, 60);
assert_eq!(data, [Some(100.0), None, None, None, None]); assert_eq!(data, [Some(100.0), None, None, None, None]);

View File

@ -10,7 +10,7 @@ pub const RRD_DATA_ENTRIES: usize = 70;
// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8]; // openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186];
use crate::rrd::{RRD, RRA, CF, DST, DataSource}; use crate::rrd::{DataSource, CF, DST, RRA, RRD};
bitflags! { bitflags! {
/// Flags to specify the data soure type and consolidation function /// Flags to specify the data soure type and consolidation function
@ -49,10 +49,7 @@ pub struct RRAv1 {
} }
impl RRAv1 { impl RRAv1 {
fn extract_data(&self) -> (u64, u64, Vec<Option<f64>>) {
fn extract_data(
&self,
) -> (u64, u64, Vec<Option<f64>>) {
let reso = self.resolution; let reso = self.resolution;
let mut list = Vec::new(); let mut list = Vec::new();
@ -70,7 +67,8 @@ impl RRAv1 {
list.push(Some(value)); list.push(Some(value));
} }
t += reso; index = (index + 1) % RRD_DATA_ENTRIES; t += reso;
index = (index + 1) % RRD_DATA_ENTRIES;
} }
(rra_start, reso, list) (rra_start, reso, list)
@ -106,9 +104,7 @@ pub struct RRDv1 {
} }
impl RRDv1 { impl RRDv1 {
pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> { pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
let expected_len = std::mem::size_of::<RRDv1>(); let expected_len = std::mem::size_of::<RRDv1>();
if raw.len() != expected_len { if raw.len() != expected_len {
@ -118,7 +114,8 @@ impl RRDv1 {
let mut rrd: RRDv1 = unsafe { std::mem::zeroed() }; let mut rrd: RRDv1 = unsafe { std::mem::zeroed() };
unsafe { unsafe {
let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); let rrd_slice =
std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
raw.read_exact(rrd_slice)?; raw.read_exact(rrd_slice)?;
} }
@ -131,7 +128,6 @@ impl RRDv1 {
} }
pub fn to_rrd_v2(&self) -> Result<RRD, Error> { pub fn to_rrd_v2(&self) -> Result<RRD, Error> {
let mut rra_list = Vec::new(); let mut rra_list = Vec::new();
// old format v1: // old format v1:
@ -150,13 +146,19 @@ impl RRDv1 {
// decade 1 week, 570 points // decade 1 week, 570 points
// Linear extrapolation // Linear extrapolation
fn extrapolate_data(start: u64, reso: u64, factor: u64, data: Vec<Option<f64>>) -> (u64, u64, Vec<Option<f64>>) { fn extrapolate_data(
start: u64,
reso: u64,
factor: u64,
data: Vec<Option<f64>>,
) -> (u64, u64, Vec<Option<f64>>) {
let mut new = Vec::new(); let mut new = Vec::new();
for i in 0..data.len() { for i in 0..data.len() {
let mut next = i + 1; let mut next = i + 1;
if next >= data.len() { next = 0 }; if next >= data.len() {
next = 0
};
let v = data[i]; let v = data[i];
let v1 = data[next]; let v1 = data[next];
match (v, v1) { match (v, v1) {
@ -288,9 +290,6 @@ impl RRDv1 {
last_value: f64::NAN, last_value: f64::NAN,
last_update: self.hour_avg.last_update, // IMPORTANT! last_update: self.hour_avg.last_update, // IMPORTANT!
}; };
Ok(RRD { Ok(RRD { source, rra_list })
source,
rra_list,
})
} }
} }

View File

@ -7,7 +7,6 @@ use proxmox_rrd::rrd::RRD;
use proxmox_sys::fs::CreateOptions; use proxmox_sys::fs::CreateOptions;
fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> { fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> {
let status = Command::new("/usr/bin/cmp") let status = Command::new("/usr/bin/cmp")
.arg(fn1) .arg(fn1)
.arg(fn2) .arg(fn2)
@ -27,7 +26,6 @@ const RRD_V2_FN: &str = "./tests/testdata/cpu.rrd_v2";
// make sure we can load and convert RRD v1 // make sure we can load and convert RRD v1
#[test] #[test]
fn upgrade_from_rrd_v1() -> Result<(), Error> { fn upgrade_from_rrd_v1() -> Result<(), Error> {
let rrd = RRD::load(Path::new(RRD_V1_FN), true)?; let rrd = RRD::load(Path::new(RRD_V1_FN), true)?;
const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded"; const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded";
@ -44,7 +42,6 @@ fn upgrade_from_rrd_v1() -> Result<(), Error> {
// make sure we can load and save RRD v2 // make sure we can load and save RRD v2
#[test] #[test]
fn load_and_save_rrd_v2() -> Result<(), Error> { fn load_and_save_rrd_v2() -> Result<(), Error> {
let rrd = RRD::load(Path::new(RRD_V2_FN), true)?; let rrd = RRD::load(Path::new(RRD_V2_FN), true)?;
const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved"; const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved";