From 4fb05fde1765eda5c1e98a0958fb09a384b78013 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sun, 24 May 2020 16:51:28 +0200 Subject: [PATCH] src/rrd/rrd.rs: restructure whole code --- src/bin/proxmox-backup-proxy.rs | 6 +- src/rrd/cache.rs | 11 +- src/rrd/rrd.rs | 301 +++++++++++++++++++------------- 3 files changed, 189 insertions(+), 129 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 42249e18..6249366a 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -606,7 +606,7 @@ async fn generate_host_stats() { match read_proc_stat() { Ok(stat) => { - if let Err(err) = rrd::update_value("host/cpu", stat.cpu) { + if let Err(err) = rrd::update_value("host/cpu", stat.cpu, rrd::DST::Gauge) { eprintln!("rrd::update_value 'host/cpu' failed - {}", err); } } @@ -616,10 +616,10 @@ async fn generate_host_stats() { } match read_meminfo() { Ok(meminfo) => { - if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64) { + if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64, rrd::DST::Gauge) { eprintln!("rrd::update_value 'host/memtotal' failed - {}", err); } - if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64) { + if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64, rrd::DST::Gauge) { eprintln!("rrd::update_value 'host/memused' failed - {}", err); } } diff --git a/src/rrd/cache.rs b/src/rrd/cache.rs index e1209aff..4e61c949 100644 --- a/src/rrd/cache.rs +++ b/src/rrd/cache.rs @@ -40,7 +40,7 @@ fn now() -> Result { Ok(epoch.as_secs()) } -pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> { +pub fn update_value(rel_path: &str, value: f64, dst: DST) -> Result<(), Error> { let mut path = PathBuf::from(PBS_RRD_BASEDIR); path.push(rel_path); @@ -56,7 +56,7 @@ pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> { } else { let mut rrd = match RRD::load(&path) { Ok(rrd) => rrd, - Err(_) => RRD::new(), + Err(_) => RRD::new(dst), }; rrd.update(now, value); rrd.save(&path)?; @@ -77,12 +77,13 @@ pub fn extract_data( let map = RRD_CACHE.read().unwrap(); - let empty_rrd = RRD::new(); - let mut result = Vec::new(); for name in items.iter() { - let rrd = map.get(&format!("{}/{}", base, name)).unwrap_or(&empty_rrd); + let rrd = match map.get(&format!("{}/{}", base, name)) { + Some(rrd) => rrd, + None => continue, + }; let (start, reso, list) = rrd.extract_data(now, timeframe, mode); let mut t = start; for index in 0..RRD_DATA_ENTRIES { diff --git a/src/rrd/rrd.rs b/src/rrd/rrd.rs index 0edfe1cb..a48b7b04 100644 --- a/src/rrd/rrd.rs +++ b/src/rrd/rrd.rs @@ -7,50 +7,174 @@ use crate::api2::types::{RRDMode, RRDTimeFrameResolution}; pub const RRD_DATA_ENTRIES: usize = 70; -#[repr(C)] -#[derive(Copy, Clone)] -struct RRDEntry { - max: f64, - average: f64, +use bitflags::bitflags; + +bitflags!{ + pub struct RRAFlags: u64 { + // Data Source Types + const DST_GAUGE = 1; + const DST_DERIVE = 2; + const DST_MASK = 255; // first 8 bits + + // Consolidation Functions + const CF_AVERAGE = 1 << 8; + const CF_MAX = 2 << 8; + const CF_MASK = 255 << 8; + } } -impl Default for RRDEntry { - fn default() -> Self { - Self { max: f64::NAN, average: f64::NAN } +pub enum DST { + Gauge, + Derive, +} + +#[repr(C)] +struct RRA { + flags: RRAFlags, + resolution: u64, + last_update: u64, + last_count: u64, + data: [f64; RRD_DATA_ENTRIES], +} + +impl RRA { + fn new(flags: RRAFlags, resolution: u64) -> Self { + Self { + flags, resolution, + last_update: 0, + last_count: 0, + data: [f64::NAN; RRD_DATA_ENTRIES], + } + } + + fn delete_old(&mut self, epoch: u64) { + let reso = self.resolution; + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let min_time = (min_time/reso + 1)*reso; + let mut t = self.last_update - (RRD_DATA_ENTRIES as u64)*reso; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + if t < min_time { + self.data[index] = f64::NAN; + } else { + break; + } + } + } + + fn compute_new_value(&mut self, epoch: u64, value: f64) { + let reso = self.resolution; + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + let last_index = ((self.last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + + if (epoch - self.last_update) > reso || index != last_index { + self.last_count = 0; + } + + let last_value = self.data[index]; + if last_value.is_nan() { + self.last_count = 0; + } + + let new_count = self.last_count + 1; // fixme: check overflow? + if self.last_count == 0 { + self.data[index] = value; + self.last_count = 1; + } else { + let new_value = if self.flags.contains(RRAFlags::CF_MAX) { + if last_value > value { last_value } else { value } + } else if self.flags.contains(RRAFlags::CF_AVERAGE) { + (last_value*(self.last_count as f64))/(new_count as f64) + + value/(new_count as f64) + } else { + eprintln!("rrdb update failed - unknown CF"); + return; + }; + self.data[index] = new_value; + self.last_count = new_count; + } + self.last_update = epoch; + } + + fn update(&mut self, epoch: u64, value: f64) { + if epoch < self.last_update { + eprintln!("rrdb update failed - time in past ({} < {})", epoch, self.last_update); + } + if value.is_nan() { + eprintln!("rrdb update failed - new value is NAN"); + return; + } + + self.delete_old(epoch); + self.compute_new_value(epoch, value); } } #[repr(C)] // Note: Avoid alignment problems by using 8byte types only pub struct RRD { - last_update: u64, - last_hour_count: u64, - hour: [RRDEntry; RRD_DATA_ENTRIES], - last_day_count: u64, - day: [RRDEntry; RRD_DATA_ENTRIES], - last_week_count: u64, - week: [RRDEntry; RRD_DATA_ENTRIES], - last_month_count: u64, - month: [RRDEntry; RRD_DATA_ENTRIES], - last_year_count: u64, - year: [RRDEntry; RRD_DATA_ENTRIES], + hour_avg: RRA, + hour_max: RRA, + day_avg: RRA, + day_max: RRA, + week_avg: RRA, + week_max: RRA, + month_avg: RRA, + month_max: RRA, + year_avg: RRA, + year_max: RRA, } impl RRD { - pub fn new() -> Self { + pub fn new(dst: DST) -> Self { + let flags = match dst { + DST::Gauge => RRAFlags::DST_GAUGE, + DST::Derive => RRAFlags::DST_DERIVE, + }; + Self { - last_update: 0, - last_hour_count: 0, - hour: [RRDEntry::default(); RRD_DATA_ENTRIES], - last_day_count: 0, - day: [RRDEntry::default(); RRD_DATA_ENTRIES], - last_week_count: 0, - week: [RRDEntry::default(); RRD_DATA_ENTRIES], - last_month_count: 0, - month: [RRDEntry::default(); RRD_DATA_ENTRIES], - last_year_count: 0, - year: [RRDEntry::default(); RRD_DATA_ENTRIES], + hour_avg: RRA::new( + flags | RRAFlags::CF_AVERAGE, + RRDTimeFrameResolution::Hour as u64, + ), + hour_max: RRA::new( + flags | RRAFlags::CF_MAX, + RRDTimeFrameResolution::Hour as u64, + ), + day_avg: RRA::new( + flags | RRAFlags::CF_AVERAGE, + RRDTimeFrameResolution::Day as u64, + ), + day_max: RRA::new( + flags | RRAFlags::CF_MAX, + RRDTimeFrameResolution::Day as u64, + ), + week_avg: RRA::new( + flags | RRAFlags::CF_AVERAGE, + RRDTimeFrameResolution::Week as u64, + ), + week_max: RRA::new( + flags | RRAFlags::CF_MAX, + RRDTimeFrameResolution::Week as u64, + ), + month_avg: RRA::new( + flags | RRAFlags::CF_AVERAGE, + RRDTimeFrameResolution::Month as u64, + ), + month_max: RRA::new( + flags | RRAFlags::CF_MAX, + RRDTimeFrameResolution::Month as u64, + ), + year_avg: RRA::new( + flags | RRAFlags::CF_AVERAGE, + RRDTimeFrameResolution::Year as u64, + ), + year_max: RRA::new( + flags | RRAFlags::CF_MAX, + RRDTimeFrameResolution::Year as u64, + ), } } @@ -66,30 +190,31 @@ impl RRD { let end = reso*(epoch/reso + 1); let start = end - reso*(RRD_DATA_ENTRIES as u64); - let rrd_end = reso*(self.last_update/reso); - let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64); - let mut list = Vec::new(); - let data = match timeframe { - RRDTimeFrameResolution::Hour => &self.hour, - RRDTimeFrameResolution::Day => &self.day, - RRDTimeFrameResolution::Week => &self.week, - RRDTimeFrameResolution::Month => &self.month, - RRDTimeFrameResolution::Year => &self.year, + let raa = match (mode, timeframe) { + (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg, + (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max, + (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg, + (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max, + (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg, + (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max, + (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg, + (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max, + (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg, + (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max, }; + let rrd_end = reso*(raa.last_update/reso); + let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64); + let mut t = start; let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; for _ in 0..RRD_DATA_ENTRIES { if t < rrd_start || t > rrd_end { list.push(None); } else { - let entry = data[index]; - let value = match mode { - RRDMode::Max => entry.max, - RRDMode::Average => entry.average, - }; + let value = raa.data[index]; if value.is_nan() { list.push(None); } else { @@ -143,87 +268,21 @@ impl RRD { Ok(()) } - fn compute_new_value( - data: &mut [RRDEntry; RRD_DATA_ENTRIES], - count: &mut u64, - epoch: u64, - last: u64, - reso: u64, - value: f64, - ) { - if value.is_nan() { - eprintln!("rrdb update failed - new value is NAN"); - return; - } - - let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - let last_index = ((last/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - - if (epoch - last) > reso || index != last_index { - *count = 0; - } - - let RRDEntry { max, average } = data[index]; - if max.is_nan() || average.is_nan() { - *count = 0; - } - - let new_count = *count + 1; // fixme: check overflow? - if *count == 0 { - data[index] = RRDEntry { max: value, average: value }; - *count = 1; - } else { - let new_max = if max > value { max } else { value }; - // let new_average = (average*(count as f64) + value)/(new_count as f64); - // Note: Try to avoid numeric errors - let new_average = (average*(*count as f64))/(new_count as f64) - + value/(new_count as f64); - data[index] = RRDEntry { max: new_max, average: new_average }; - *count = new_count; - } - } - - fn delete_old(data: &mut [RRDEntry], epoch: u64, last: u64, reso: u64) { - let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; - let min_time = (min_time/reso + 1)*reso; - let mut t = last - (RRD_DATA_ENTRIES as u64)*reso; - let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - for _ in 0..RRD_DATA_ENTRIES { - t += reso; index = (index + 1) % RRD_DATA_ENTRIES; - if t < min_time { - data[index] = RRDEntry::default(); - } else { - break; - } - } - } pub fn update(&mut self, epoch: u64, value: f64) { - let last = self.last_update; - if epoch < last { - eprintln!("rrdb update failed - time in past ({} < {})", epoch, last); - } + self.hour_avg.update(epoch, value); + self.hour_max.update(epoch, value); - let reso = RRDTimeFrameResolution::Hour as u64; - Self::delete_old(&mut self.hour, epoch, last, reso); - Self::compute_new_value(&mut self.hour, &mut self.last_hour_count, epoch, last, reso, value); + self.day_avg.update(epoch, value); + self.day_max.update(epoch, value); - let reso = RRDTimeFrameResolution::Day as u64; - Self::delete_old(&mut self.day, epoch, last, reso); - Self::compute_new_value(&mut self.day, &mut self.last_day_count, epoch, last, reso, value); + self.week_avg.update(epoch, value); + self.week_max.update(epoch, value); - let reso = RRDTimeFrameResolution::Week as u64; - Self::delete_old(&mut self.week, epoch, last, reso); - Self::compute_new_value(&mut self.week, &mut self.last_week_count, epoch, last, reso, value); + self.month_avg.update(epoch, value); + self.month_max.update(epoch, value); - let reso = RRDTimeFrameResolution::Month as u64; - Self::delete_old(&mut self.month, epoch, last, reso); - Self::compute_new_value(&mut self.month, &mut self.last_month_count, epoch, last, reso, value); - - let reso = RRDTimeFrameResolution::Year as u64; - Self::delete_old(&mut self.year, epoch, last, reso); - Self::compute_new_value(&mut self.year, &mut self.last_year_count, epoch, last, reso, value); - - self.last_update = epoch; + self.year_avg.update(epoch, value); + self.year_max.update(epoch, value); } }