proxmox-rrd: use a journal to reduce amount of bytes written

Append pending changes in a simple text based format that allows for
lockless appends as long as we stay below 4 KiB data per write.

Apply the journal every 30 minutes and on daemon startup.

Note that we do not ensure that the journal is synced, this is a
perfomance optimization we can make as the kernel defaults to
writeback in-flight data every 30s (sysctl vm/dirty_expire_centisecs)
anyway, so we lose at max half a minute of data on a crash, here one
should have in mind that we normally expose 1 minute as finest
granularity anyway, so not really much lost.

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Dietmar Maurer
2021-10-13 10:24:38 +02:00
committed by Thomas Lamprecht
parent 890b88cbef
commit 1d44f175c6
7 changed files with 262 additions and 61 deletions

View File

@ -1,24 +1,46 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::sync::{RwLock};
use std::sync::RwLock;
use std::io::Write;
use std::io::{BufRead, BufReader};
use std::os::unix::io::AsRawFd;
use anyhow::{format_err, Error};
use anyhow::{format_err, bail, Error};
use nix::fcntl::OFlag;
use proxmox::tools::fs::{create_path, CreateOptions};
use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
use crate::{DST, rrd::RRD};
const RRD_JOURNAL_NAME: &str = "rrd.journal";
/// RRD cache - keep RRD data in RAM, but write updates to disk
///
/// This cache is designed to run as single instance (no concurrent
/// access from other processes).
pub struct RRDCache {
apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
dir_options: CreateOptions,
cache: RwLock<HashMap<String, RRD>>,
state: RwLock<RRDCacheState>,
}
// shared state behind RwLock
struct RRDCacheState {
rrd_map: HashMap<String, RRD>,
journal: File,
last_journal_flush: f64,
}
struct JournalEntry {
time: f64,
value: f64,
dst: DST,
rel_path: String,
}
impl RRDCache {
@ -28,21 +50,166 @@ impl RRDCache {
basedir: P,
file_options: Option<CreateOptions>,
dir_options: Option<CreateOptions>,
) -> Self {
apply_interval: f64,
) -> Result<Self, Error> {
let basedir = basedir.as_ref().to_owned();
Self {
let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
let mut journal_path = basedir.clone();
journal_path.push(RRD_JOURNAL_NAME);
let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
let state = RRDCacheState {
journal,
rrd_map: HashMap::new(),
last_journal_flush: 0.0,
};
Ok(Self {
basedir,
file_options: file_options.unwrap_or_else(|| CreateOptions::new()),
dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()),
cache: RwLock::new(HashMap::new()),
}
file_options,
dir_options,
apply_interval,
state: RwLock::new(state),
})
}
/// Create rrdd stat dir with correct permission
pub fn create_rrdb_dir(&self) -> Result<(), Error> {
fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.dir_options.clone()))
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
let line = line.trim();
let parts: Vec<&str> = line.splitn(4, ':').collect();
if parts.len() != 4 {
bail!("wrong numper of components");
}
let time: f64 = parts[0].parse()
.map_err(|_| format_err!("unable to parse time"))?;
let value: f64 = parts[1].parse()
.map_err(|_| format_err!("unable to parse value"))?;
let dst: u8 = parts[2].parse()
.map_err(|_| format_err!("unable to parse data source type"))?;
let dst = match dst {
0 => DST::Gauge,
1 => DST::Derive,
_ => bail!("got strange value for data source type '{}'", dst),
};
let rel_path = parts[3].to_string();
Ok(JournalEntry { time, value, dst, rel_path })
}
fn append_journal_entry(
state: &mut RRDCacheState,
time: f64,
value: f64,
dst: DST,
rel_path: &str,
) -> Result<(), Error> {
let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
state.journal.write_all(journal_entry.as_bytes())?;
Ok(())
}
pub fn apply_journal(&self) -> Result<(), Error> {
let mut state = self.state.write().unwrap(); // block writers
self.apply_journal_locked(&mut state)
}
fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
log::info!("applying rrd journal");
state.last_journal_flush = proxmox_time::epoch_f64();
let mut journal_path = self.basedir.clone();
journal_path.push(RRD_JOURNAL_NAME);
let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
let mut journal = BufReader::new(journal);
let mut last_update_map = HashMap::new();
let mut get_last_update = |rel_path: &str, rrd: &RRD| {
if let Some(time) = last_update_map.get(rel_path) {
return *time;
}
let last_update = rrd.last_update();
last_update_map.insert(rel_path.to_string(), last_update);
last_update
};
let mut linenr = 0;
loop {
linenr += 1;
let mut line = String::new();
let len = journal.read_line(&mut line)?;
if len == 0 { break; }
let entry = match Self::parse_journal_line(&line) {
Ok(entry) => entry,
Err(err) => {
log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
continue; // skip unparsable lines
}
};
if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
if entry.time > get_last_update(&entry.rel_path, &rrd) {
rrd.update(entry.time, entry.value);
}
} else {
let mut path = self.basedir.clone();
path.push(&entry.rel_path);
create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
let mut rrd = match RRD::load(&path) {
Ok(rrd) => rrd,
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
}
RRD::new(entry.dst)
},
};
if entry.time > get_last_update(&entry.rel_path, &rrd) {
rrd.update(entry.time, entry.value);
}
state.rrd_map.insert(entry.rel_path.clone(), rrd);
}
}
// save all RRDs
let mut errors = 0;
for (rel_path, rrd) in state.rrd_map.iter() {
let mut path = self.basedir.clone();
path.push(&rel_path);
if let Err(err) = rrd.save(&path, self.file_options.clone()) {
errors += 1;
log::error!("unable to save {:?}: {}", path, err);
}
}
// if everything went ok, commit the journal
if errors == 0 {
nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
.map_err(|err| format_err!("unable to truncate journal - {}", err))?;
log::info!("rrd journal successfully committed");
} else {
log::error!("errors during rrd flush - unable to commit rrd journal");
}
Ok(())
}
@ -53,21 +220,26 @@ impl RRDCache {
rel_path: &str,
value: f64,
dst: DST,
save: bool,
) -> Result<(), Error> {
let mut path = self.basedir.clone();
path.push(rel_path);
let mut state = self.state.write().unwrap(); // block other writers
create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.file_options.clone()))?;
let mut map = self.cache.write().unwrap();
let now = proxmox_time::epoch_f64();
if let Some(rrd) = map.get_mut(rel_path) {
if (now - state.last_journal_flush) > self.apply_interval {
if let Err(err) = self.apply_journal_locked(&mut state) {
log::error!("apply journal failed: {}", err);
}
}
Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
rrd.update(now, value);
if save { rrd.save(&path, self.file_options.clone())?; }
} else {
let mut path = self.basedir.clone();
path.push(rel_path);
create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
let mut rrd = match RRD::load(&path) {
Ok(rrd) => rrd,
Err(err) => {
@ -78,10 +250,7 @@ impl RRDCache {
},
};
rrd.update(now, value);
if save {
rrd.save(&path, self.file_options.clone())?;
}
map.insert(rel_path.into(), rrd);
state.rrd_map.insert(rel_path.into(), rrd);
}
Ok(())
@ -97,9 +266,9 @@ impl RRDCache {
mode: RRDMode,
) -> Option<(u64, u64, Vec<Option<f64>>)> {
let map = self.cache.read().unwrap();
let state = self.state.read().unwrap();
match map.get(&format!("{}/{}", base, name)) {
match state.rrd_map.get(&format!("{}/{}", base, name)) {
Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
None => None,
}

View File

@ -13,9 +13,11 @@ mod cache;
pub use cache::*;
/// RRD data source tyoe
#[repr(u8)]
#[derive(Copy, Clone)]
pub enum DST {
/// Gauge values are stored unmodified.
Gauge,
Gauge = 0,
/// Stores the difference to the previous value.
Derive,
Derive = 1,
}

View File

@ -336,6 +336,36 @@ impl RRD {
replace_file(filename, rrd_slice, options)
}
pub fn last_update(&self) -> f64 {
let mut last_update = 0.0;
{
let mut check_last_update = |rra: &RRA| {
if rra.last_update > last_update {
last_update = rra.last_update;
}
};
check_last_update(&self.hour_avg);
check_last_update(&self.hour_max);
check_last_update(&self.day_avg);
check_last_update(&self.day_max);
check_last_update(&self.week_avg);
check_last_update(&self.week_max);
check_last_update(&self.month_avg);
check_last_update(&self.month_max);
check_last_update(&self.year_avg);
check_last_update(&self.year_max);
}
last_update
}
/// Update the value (in memory)
///
/// Note: This does not call [Self::save].