src/rrd/rrd.rs: restructure whole code
This commit is contained in:
parent
daca4f7888
commit
4fb05fde17
@ -606,7 +606,7 @@ async fn generate_host_stats() {
|
||||
|
||||
match read_proc_stat() {
|
||||
Ok(stat) => {
|
||||
if let Err(err) = rrd::update_value("host/cpu", stat.cpu) {
|
||||
if let Err(err) = rrd::update_value("host/cpu", stat.cpu, rrd::DST::Gauge) {
|
||||
eprintln!("rrd::update_value 'host/cpu' failed - {}", err);
|
||||
}
|
||||
}
|
||||
@ -616,10 +616,10 @@ async fn generate_host_stats() {
|
||||
}
|
||||
match read_meminfo() {
|
||||
Ok(meminfo) => {
|
||||
if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64) {
|
||||
if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64, rrd::DST::Gauge) {
|
||||
eprintln!("rrd::update_value 'host/memtotal' failed - {}", err);
|
||||
}
|
||||
if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64) {
|
||||
if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64, rrd::DST::Gauge) {
|
||||
eprintln!("rrd::update_value 'host/memused' failed - {}", err);
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ fn now() -> Result<u64, Error> {
|
||||
Ok(epoch.as_secs())
|
||||
}
|
||||
|
||||
pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> {
|
||||
pub fn update_value(rel_path: &str, value: f64, dst: DST) -> Result<(), Error> {
|
||||
|
||||
let mut path = PathBuf::from(PBS_RRD_BASEDIR);
|
||||
path.push(rel_path);
|
||||
@ -56,7 +56,7 @@ pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> {
|
||||
} else {
|
||||
let mut rrd = match RRD::load(&path) {
|
||||
Ok(rrd) => rrd,
|
||||
Err(_) => RRD::new(),
|
||||
Err(_) => RRD::new(dst),
|
||||
};
|
||||
rrd.update(now, value);
|
||||
rrd.save(&path)?;
|
||||
@ -77,12 +77,13 @@ pub fn extract_data(
|
||||
|
||||
let map = RRD_CACHE.read().unwrap();
|
||||
|
||||
let empty_rrd = RRD::new();
|
||||
|
||||
let mut result = Vec::new();
|
||||
|
||||
for name in items.iter() {
|
||||
let rrd = map.get(&format!("{}/{}", base, name)).unwrap_or(&empty_rrd);
|
||||
let rrd = match map.get(&format!("{}/{}", base, name)) {
|
||||
Some(rrd) => rrd,
|
||||
None => continue,
|
||||
};
|
||||
let (start, reso, list) = rrd.extract_data(now, timeframe, mode);
|
||||
let mut t = start;
|
||||
for index in 0..RRD_DATA_ENTRIES {
|
||||
|
301
src/rrd/rrd.rs
301
src/rrd/rrd.rs
@ -7,50 +7,174 @@ use crate::api2::types::{RRDMode, RRDTimeFrameResolution};
|
||||
|
||||
pub const RRD_DATA_ENTRIES: usize = 70;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone)]
|
||||
struct RRDEntry {
|
||||
max: f64,
|
||||
average: f64,
|
||||
use bitflags::bitflags;
|
||||
|
||||
bitflags!{
|
||||
pub struct RRAFlags: u64 {
|
||||
// Data Source Types
|
||||
const DST_GAUGE = 1;
|
||||
const DST_DERIVE = 2;
|
||||
const DST_MASK = 255; // first 8 bits
|
||||
|
||||
// Consolidation Functions
|
||||
const CF_AVERAGE = 1 << 8;
|
||||
const CF_MAX = 2 << 8;
|
||||
const CF_MASK = 255 << 8;
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RRDEntry {
|
||||
fn default() -> Self {
|
||||
Self { max: f64::NAN, average: f64::NAN }
|
||||
pub enum DST {
|
||||
Gauge,
|
||||
Derive,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
struct RRA {
|
||||
flags: RRAFlags,
|
||||
resolution: u64,
|
||||
last_update: u64,
|
||||
last_count: u64,
|
||||
data: [f64; RRD_DATA_ENTRIES],
|
||||
}
|
||||
|
||||
impl RRA {
|
||||
fn new(flags: RRAFlags, resolution: u64) -> Self {
|
||||
Self {
|
||||
flags, resolution,
|
||||
last_update: 0,
|
||||
last_count: 0,
|
||||
data: [f64::NAN; RRD_DATA_ENTRIES],
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_old(&mut self, epoch: u64) {
|
||||
let reso = self.resolution;
|
||||
let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
|
||||
let min_time = (min_time/reso + 1)*reso;
|
||||
let mut t = self.last_update - (RRD_DATA_ENTRIES as u64)*reso;
|
||||
let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
for _ in 0..RRD_DATA_ENTRIES {
|
||||
t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
|
||||
if t < min_time {
|
||||
self.data[index] = f64::NAN;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_new_value(&mut self, epoch: u64, value: f64) {
|
||||
let reso = self.resolution;
|
||||
let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
let last_index = ((self.last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
|
||||
if (epoch - self.last_update) > reso || index != last_index {
|
||||
self.last_count = 0;
|
||||
}
|
||||
|
||||
let last_value = self.data[index];
|
||||
if last_value.is_nan() {
|
||||
self.last_count = 0;
|
||||
}
|
||||
|
||||
let new_count = self.last_count + 1; // fixme: check overflow?
|
||||
if self.last_count == 0 {
|
||||
self.data[index] = value;
|
||||
self.last_count = 1;
|
||||
} else {
|
||||
let new_value = if self.flags.contains(RRAFlags::CF_MAX) {
|
||||
if last_value > value { last_value } else { value }
|
||||
} else if self.flags.contains(RRAFlags::CF_AVERAGE) {
|
||||
(last_value*(self.last_count as f64))/(new_count as f64)
|
||||
+ value/(new_count as f64)
|
||||
} else {
|
||||
eprintln!("rrdb update failed - unknown CF");
|
||||
return;
|
||||
};
|
||||
self.data[index] = new_value;
|
||||
self.last_count = new_count;
|
||||
}
|
||||
self.last_update = epoch;
|
||||
}
|
||||
|
||||
fn update(&mut self, epoch: u64, value: f64) {
|
||||
if epoch < self.last_update {
|
||||
eprintln!("rrdb update failed - time in past ({} < {})", epoch, self.last_update);
|
||||
}
|
||||
if value.is_nan() {
|
||||
eprintln!("rrdb update failed - new value is NAN");
|
||||
return;
|
||||
}
|
||||
|
||||
self.delete_old(epoch);
|
||||
self.compute_new_value(epoch, value);
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
// Note: Avoid alignment problems by using 8byte types only
|
||||
pub struct RRD {
|
||||
last_update: u64,
|
||||
last_hour_count: u64,
|
||||
hour: [RRDEntry; RRD_DATA_ENTRIES],
|
||||
last_day_count: u64,
|
||||
day: [RRDEntry; RRD_DATA_ENTRIES],
|
||||
last_week_count: u64,
|
||||
week: [RRDEntry; RRD_DATA_ENTRIES],
|
||||
last_month_count: u64,
|
||||
month: [RRDEntry; RRD_DATA_ENTRIES],
|
||||
last_year_count: u64,
|
||||
year: [RRDEntry; RRD_DATA_ENTRIES],
|
||||
hour_avg: RRA,
|
||||
hour_max: RRA,
|
||||
day_avg: RRA,
|
||||
day_max: RRA,
|
||||
week_avg: RRA,
|
||||
week_max: RRA,
|
||||
month_avg: RRA,
|
||||
month_max: RRA,
|
||||
year_avg: RRA,
|
||||
year_max: RRA,
|
||||
}
|
||||
|
||||
impl RRD {
|
||||
|
||||
pub fn new() -> Self {
|
||||
pub fn new(dst: DST) -> Self {
|
||||
let flags = match dst {
|
||||
DST::Gauge => RRAFlags::DST_GAUGE,
|
||||
DST::Derive => RRAFlags::DST_DERIVE,
|
||||
};
|
||||
|
||||
Self {
|
||||
last_update: 0,
|
||||
last_hour_count: 0,
|
||||
hour: [RRDEntry::default(); RRD_DATA_ENTRIES],
|
||||
last_day_count: 0,
|
||||
day: [RRDEntry::default(); RRD_DATA_ENTRIES],
|
||||
last_week_count: 0,
|
||||
week: [RRDEntry::default(); RRD_DATA_ENTRIES],
|
||||
last_month_count: 0,
|
||||
month: [RRDEntry::default(); RRD_DATA_ENTRIES],
|
||||
last_year_count: 0,
|
||||
year: [RRDEntry::default(); RRD_DATA_ENTRIES],
|
||||
hour_avg: RRA::new(
|
||||
flags | RRAFlags::CF_AVERAGE,
|
||||
RRDTimeFrameResolution::Hour as u64,
|
||||
),
|
||||
hour_max: RRA::new(
|
||||
flags | RRAFlags::CF_MAX,
|
||||
RRDTimeFrameResolution::Hour as u64,
|
||||
),
|
||||
day_avg: RRA::new(
|
||||
flags | RRAFlags::CF_AVERAGE,
|
||||
RRDTimeFrameResolution::Day as u64,
|
||||
),
|
||||
day_max: RRA::new(
|
||||
flags | RRAFlags::CF_MAX,
|
||||
RRDTimeFrameResolution::Day as u64,
|
||||
),
|
||||
week_avg: RRA::new(
|
||||
flags | RRAFlags::CF_AVERAGE,
|
||||
RRDTimeFrameResolution::Week as u64,
|
||||
),
|
||||
week_max: RRA::new(
|
||||
flags | RRAFlags::CF_MAX,
|
||||
RRDTimeFrameResolution::Week as u64,
|
||||
),
|
||||
month_avg: RRA::new(
|
||||
flags | RRAFlags::CF_AVERAGE,
|
||||
RRDTimeFrameResolution::Month as u64,
|
||||
),
|
||||
month_max: RRA::new(
|
||||
flags | RRAFlags::CF_MAX,
|
||||
RRDTimeFrameResolution::Month as u64,
|
||||
),
|
||||
year_avg: RRA::new(
|
||||
flags | RRAFlags::CF_AVERAGE,
|
||||
RRDTimeFrameResolution::Year as u64,
|
||||
),
|
||||
year_max: RRA::new(
|
||||
flags | RRAFlags::CF_MAX,
|
||||
RRDTimeFrameResolution::Year as u64,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,30 +190,31 @@ impl RRD {
|
||||
let end = reso*(epoch/reso + 1);
|
||||
let start = end - reso*(RRD_DATA_ENTRIES as u64);
|
||||
|
||||
let rrd_end = reso*(self.last_update/reso);
|
||||
let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
|
||||
|
||||
let mut list = Vec::new();
|
||||
|
||||
let data = match timeframe {
|
||||
RRDTimeFrameResolution::Hour => &self.hour,
|
||||
RRDTimeFrameResolution::Day => &self.day,
|
||||
RRDTimeFrameResolution::Week => &self.week,
|
||||
RRDTimeFrameResolution::Month => &self.month,
|
||||
RRDTimeFrameResolution::Year => &self.year,
|
||||
let raa = match (mode, timeframe) {
|
||||
(RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg,
|
||||
(RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max,
|
||||
(RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg,
|
||||
(RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max,
|
||||
(RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg,
|
||||
(RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max,
|
||||
(RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg,
|
||||
(RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max,
|
||||
(RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg,
|
||||
(RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max,
|
||||
};
|
||||
|
||||
let rrd_end = reso*(raa.last_update/reso);
|
||||
let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
|
||||
|
||||
let mut t = start;
|
||||
let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
for _ in 0..RRD_DATA_ENTRIES {
|
||||
if t < rrd_start || t > rrd_end {
|
||||
list.push(None);
|
||||
} else {
|
||||
let entry = data[index];
|
||||
let value = match mode {
|
||||
RRDMode::Max => entry.max,
|
||||
RRDMode::Average => entry.average,
|
||||
};
|
||||
let value = raa.data[index];
|
||||
if value.is_nan() {
|
||||
list.push(None);
|
||||
} else {
|
||||
@ -143,87 +268,21 @@ impl RRD {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_new_value(
|
||||
data: &mut [RRDEntry; RRD_DATA_ENTRIES],
|
||||
count: &mut u64,
|
||||
epoch: u64,
|
||||
last: u64,
|
||||
reso: u64,
|
||||
value: f64,
|
||||
) {
|
||||
if value.is_nan() {
|
||||
eprintln!("rrdb update failed - new value is NAN");
|
||||
return;
|
||||
}
|
||||
|
||||
let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
let last_index = ((last/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
|
||||
if (epoch - last) > reso || index != last_index {
|
||||
*count = 0;
|
||||
}
|
||||
|
||||
let RRDEntry { max, average } = data[index];
|
||||
if max.is_nan() || average.is_nan() {
|
||||
*count = 0;
|
||||
}
|
||||
|
||||
let new_count = *count + 1; // fixme: check overflow?
|
||||
if *count == 0 {
|
||||
data[index] = RRDEntry { max: value, average: value };
|
||||
*count = 1;
|
||||
} else {
|
||||
let new_max = if max > value { max } else { value };
|
||||
// let new_average = (average*(count as f64) + value)/(new_count as f64);
|
||||
// Note: Try to avoid numeric errors
|
||||
let new_average = (average*(*count as f64))/(new_count as f64)
|
||||
+ value/(new_count as f64);
|
||||
data[index] = RRDEntry { max: new_max, average: new_average };
|
||||
*count = new_count;
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_old(data: &mut [RRDEntry], epoch: u64, last: u64, reso: u64) {
|
||||
let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
|
||||
let min_time = (min_time/reso + 1)*reso;
|
||||
let mut t = last - (RRD_DATA_ENTRIES as u64)*reso;
|
||||
let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
|
||||
for _ in 0..RRD_DATA_ENTRIES {
|
||||
t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
|
||||
if t < min_time {
|
||||
data[index] = RRDEntry::default();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&mut self, epoch: u64, value: f64) {
|
||||
let last = self.last_update;
|
||||
if epoch < last {
|
||||
eprintln!("rrdb update failed - time in past ({} < {})", epoch, last);
|
||||
}
|
||||
self.hour_avg.update(epoch, value);
|
||||
self.hour_max.update(epoch, value);
|
||||
|
||||
let reso = RRDTimeFrameResolution::Hour as u64;
|
||||
Self::delete_old(&mut self.hour, epoch, last, reso);
|
||||
Self::compute_new_value(&mut self.hour, &mut self.last_hour_count, epoch, last, reso, value);
|
||||
self.day_avg.update(epoch, value);
|
||||
self.day_max.update(epoch, value);
|
||||
|
||||
let reso = RRDTimeFrameResolution::Day as u64;
|
||||
Self::delete_old(&mut self.day, epoch, last, reso);
|
||||
Self::compute_new_value(&mut self.day, &mut self.last_day_count, epoch, last, reso, value);
|
||||
self.week_avg.update(epoch, value);
|
||||
self.week_max.update(epoch, value);
|
||||
|
||||
let reso = RRDTimeFrameResolution::Week as u64;
|
||||
Self::delete_old(&mut self.week, epoch, last, reso);
|
||||
Self::compute_new_value(&mut self.week, &mut self.last_week_count, epoch, last, reso, value);
|
||||
self.month_avg.update(epoch, value);
|
||||
self.month_max.update(epoch, value);
|
||||
|
||||
let reso = RRDTimeFrameResolution::Month as u64;
|
||||
Self::delete_old(&mut self.month, epoch, last, reso);
|
||||
Self::compute_new_value(&mut self.month, &mut self.last_month_count, epoch, last, reso, value);
|
||||
|
||||
let reso = RRDTimeFrameResolution::Year as u64;
|
||||
Self::delete_old(&mut self.year, epoch, last, reso);
|
||||
Self::compute_new_value(&mut self.year, &mut self.last_year_count, epoch, last, reso, value);
|
||||
|
||||
self.last_update = epoch;
|
||||
self.year_avg.update(epoch, value);
|
||||
self.year_max.update(epoch, value);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user