add simple rrd implementation
This commit is contained in:
		| @ -21,3 +21,5 @@ pub mod client; | ||||
| pub mod auth_helpers; | ||||
|  | ||||
| pub mod auth; | ||||
|  | ||||
| pub mod rrd; | ||||
|  | ||||
							
								
								
									
										82
									
								
								src/rrd/cache.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										82
									
								
								src/rrd/cache.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,82 @@ | ||||
| use std::time::{SystemTime, UNIX_EPOCH}; | ||||
| use std::path::PathBuf; | ||||
| use std::collections::HashMap; | ||||
| use std::sync::{RwLock}; | ||||
|  | ||||
| use anyhow::{format_err, Error}; | ||||
| use lazy_static::lazy_static; | ||||
| use serde_json::Value; | ||||
|  | ||||
| use proxmox::tools::fs::{create_path, CreateOptions}; | ||||
|  | ||||
| use super::*; | ||||
|  | ||||
| const PBS_RRD_BASEDIR: &str = "/var/lib/proxmox-backup/rrdb"; | ||||
|  | ||||
| lazy_static!{ | ||||
|     static ref RRD_CACHE: RwLock<HashMap<String, RRD>> = { | ||||
|         RwLock::new(HashMap::new()) | ||||
|     }; | ||||
| } | ||||
|  | ||||
| /// Create rrdd stat dir with correct permission | ||||
| pub fn create_rrdb_dir() -> Result<(), Error> { | ||||
|  | ||||
|     let backup_user = crate::backup::backup_user()?; | ||||
|     let opts = CreateOptions::new() | ||||
|         .owner(backup_user.uid) | ||||
|         .group(backup_user.gid); | ||||
|  | ||||
|     create_path(PBS_RRD_BASEDIR, None, Some(opts)) | ||||
|         .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn now() -> Result<u64, Error> { | ||||
|     let epoch = SystemTime::now().duration_since(UNIX_EPOCH)?; | ||||
|     Ok(epoch.as_secs()) | ||||
| } | ||||
|  | ||||
| pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> { | ||||
|  | ||||
|     let mut path = PathBuf::from(PBS_RRD_BASEDIR); | ||||
|     path.push(rel_path); | ||||
|  | ||||
|     std::fs::create_dir_all(path.parent().unwrap())?; | ||||
|  | ||||
|     let mut map = RRD_CACHE.write().unwrap(); | ||||
|     let now = now()?; | ||||
|      | ||||
|     if let Some(rrd) = map.get_mut(rel_path) { | ||||
|         rrd.update(now, value); | ||||
|         rrd.save(&path)?; | ||||
|     } else { | ||||
|         let mut rrd = match RRD::load(&path) { | ||||
|             Ok(rrd) => rrd, | ||||
|             Err(_) => RRD::new(), | ||||
|         }; | ||||
|         rrd.update(now, value); | ||||
|         rrd.save(&path)?; | ||||
|         map.insert(rel_path.into(), rrd); | ||||
|     } | ||||
|     | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn extract_data( | ||||
|     rel_path: &str, | ||||
|     timeframe: RRDTimeFrameResolution, | ||||
|     mode: RRDMode, | ||||
| ) -> Result<Value, Error> { | ||||
|  | ||||
|     let now = now()?; | ||||
|  | ||||
|     let map = RRD_CACHE.read().unwrap(); | ||||
|      | ||||
|     if let Some(rrd) = map.get(rel_path) { | ||||
|         Ok(rrd.extract_data(now, timeframe, mode)) | ||||
|     } else { | ||||
|         Ok(RRD::new().extract_data(now, timeframe, mode)) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										4
									
								
								src/rrd/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								src/rrd/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,4 @@ | ||||
| mod rrd; | ||||
| pub use rrd::*; | ||||
| mod cache; | ||||
| pub use cache::*; | ||||
							
								
								
									
										224
									
								
								src/rrd/rrd.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										224
									
								
								src/rrd/rrd.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,224 @@ | ||||
| use std::io::Read; | ||||
| use std::path::Path; | ||||
|  | ||||
| use anyhow::{bail, Error}; | ||||
| use serde_json::{json, Value}; | ||||
|  | ||||
| const RRD_DATA_ENTRIES: usize = 70; | ||||
|  | ||||
| #[derive(Copy, Clone)] | ||||
| pub enum RRDMode { | ||||
|     Max, | ||||
|     Average, | ||||
| } | ||||
|  | ||||
| #[repr(u64)] | ||||
| #[derive(Copy, Clone)] | ||||
| pub enum RRDTimeFrameResolution { | ||||
|     Hour = 60,       // 1 min => last 70 minutes | ||||
|     Day = 60*30,     // 30 min => last 35 hours | ||||
|     Week = 60*180,   // 3 hours => about 8 days | ||||
|     Month = 60*720,  // 12 hours => last 35 days | ||||
|     Year = 60*10080, // 1 week => last 490 days | ||||
| } | ||||
|  | ||||
| #[repr(C)] | ||||
| #[derive(Default, Copy, Clone)] | ||||
| struct RRDEntry { | ||||
|     max: f64, | ||||
|     average: f64, | ||||
|     count: u64, | ||||
| } | ||||
|  | ||||
| #[repr(C)] | ||||
| // Note: Avoid alignment problems by using 8byte types only | ||||
| pub struct RRD { | ||||
|     last_update: u64, | ||||
|     hour: [RRDEntry; RRD_DATA_ENTRIES], | ||||
|     day: [RRDEntry; RRD_DATA_ENTRIES], | ||||
|     week: [RRDEntry; RRD_DATA_ENTRIES], | ||||
|     month: [RRDEntry; RRD_DATA_ENTRIES], | ||||
|     year: [RRDEntry; RRD_DATA_ENTRIES], | ||||
| } | ||||
|  | ||||
| impl RRD { | ||||
|  | ||||
|     pub fn new() -> Self { | ||||
|         Self { | ||||
|             last_update: 0, | ||||
|             hour: [RRDEntry::default(); RRD_DATA_ENTRIES], | ||||
|             day: [RRDEntry::default(); RRD_DATA_ENTRIES], | ||||
|             week: [RRDEntry::default(); RRD_DATA_ENTRIES], | ||||
|             month: [RRDEntry::default(); RRD_DATA_ENTRIES], | ||||
|             year: [RRDEntry::default(); RRD_DATA_ENTRIES], | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn extract_data( | ||||
|         &self, | ||||
|         epoch: u64, | ||||
|         timeframe: RRDTimeFrameResolution, | ||||
|         mode: RRDMode, | ||||
|     ) -> Value { | ||||
|  | ||||
|         let reso = timeframe as u64; | ||||
|  | ||||
|         let end = reso*(epoch/reso); | ||||
|         let start = end - reso*(RRD_DATA_ENTRIES as u64); | ||||
|  | ||||
|         let rrd_end = reso*(self.last_update/reso); | ||||
|         let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64); | ||||
|  | ||||
|         let mut list = Vec::new(); | ||||
|  | ||||
|         let data = match timeframe { | ||||
|             RRDTimeFrameResolution::Hour => &self.hour, | ||||
|             RRDTimeFrameResolution::Day => &self.day, | ||||
|             RRDTimeFrameResolution::Week => &self.week, | ||||
|             RRDTimeFrameResolution::Month => &self.month, | ||||
|             RRDTimeFrameResolution::Year => &self.year, | ||||
|         }; | ||||
|          | ||||
|         let mut t = start; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < rrd_start || t > rrd_end { | ||||
|                 list.push(json!({ "time": t })); | ||||
|             } else { | ||||
|                 let entry = data[index]; | ||||
|                 if entry.count == 0 { | ||||
|                     list.push(json!({ "time": t })); | ||||
|                 } else { | ||||
|                     let value = match mode { | ||||
|                         RRDMode::Max => entry.max, | ||||
|                         RRDMode::Average => entry.average, | ||||
|                     }; | ||||
|                     list.push(json!({ "time": t, "value": value })); | ||||
|                 } | ||||
|             } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|  | ||||
|         list.into() | ||||
|     } | ||||
|  | ||||
|     pub fn from_raw(mut raw: &[u8]) -> Result<Self, Error> { | ||||
|         let expected_len = std::mem::size_of::<RRD>(); | ||||
|         if raw.len() != expected_len { | ||||
|             bail!("RRD::from_raw failed - wrong data size ({} != {})", raw.len(), expected_len); | ||||
|         } | ||||
|          | ||||
|         let mut rrd: RRD = unsafe { std::mem::zeroed() }; | ||||
|         unsafe { | ||||
|             let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); | ||||
|             raw.read_exact(rrd_slice)?; | ||||
|         } | ||||
|  | ||||
|         Ok(rrd) | ||||
|     } | ||||
|  | ||||
|     pub fn load(filename: &Path) -> Result<Self, Error> { | ||||
|         let raw = proxmox::tools::fs::file_get_contents(filename)?; | ||||
|         Self::from_raw(&raw) | ||||
|     } | ||||
|      | ||||
|     pub fn save(&self, filename: &Path) -> Result<(), Error> { | ||||
|         use proxmox::tools::{fs::replace_file, fs::CreateOptions}; | ||||
|  | ||||
|         let rrd_slice = unsafe { | ||||
|             std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>()) | ||||
|         }; | ||||
|  | ||||
|         let backup_user = crate::backup::backup_user()?; | ||||
|         let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644); | ||||
|         // set the correct owner/group/permissions while saving file | ||||
|         // owner(rw) = backup, group(r)= backup | ||||
|         let options = CreateOptions::new() | ||||
|             .perm(mode) | ||||
|             .owner(backup_user.uid) | ||||
|             .group(backup_user.gid); | ||||
|  | ||||
|         replace_file(filename, rrd_slice, options)?; | ||||
|          | ||||
|         Ok(()) | ||||
|     } | ||||
|      | ||||
|     fn compute_new_value( | ||||
|         data: &[RRDEntry; RRD_DATA_ENTRIES], | ||||
|         index: usize, | ||||
|         value: f64, | ||||
|     ) -> RRDEntry {         | ||||
|         let RRDEntry { max, average, count } = data[index]; | ||||
|         let new_count = count + 1; // fixme: check overflow? | ||||
|         if count == 0 { | ||||
|             RRDEntry { max: value, average: value,  count: 1 } | ||||
|         } else { | ||||
|             let new_max = if max > value { max } else { value }; | ||||
|             let new_average = (average*(count as f64) + value)/(new_count as f64); | ||||
|             RRDEntry { max: new_max, average: new_average, count: new_count } | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     pub fn update(&mut self, epoch: u64, value: f64) { | ||||
|         // fixme: check time progress (epoch  last) | ||||
|         let last = self.last_update; | ||||
|  | ||||
|         let reso = RRDTimeFrameResolution::Hour as u64; | ||||
|  | ||||
|         let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; | ||||
|         let mut t = last; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < min_time { self.hour[index] = RRDEntry::default(); } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|         let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         self.hour[index] = Self::compute_new_value(&self.hour, index, value); | ||||
|  | ||||
|         let reso = RRDTimeFrameResolution::Day as u64;  | ||||
|         let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; | ||||
|         let mut t = last; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < min_time { self.day[index] = RRDEntry::default(); } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|         let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         self.day[index] = Self::compute_new_value(&self.day, index, value); | ||||
|   | ||||
|         let reso = RRDTimeFrameResolution::Week as u64;  | ||||
|         let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; | ||||
|         let mut t = last; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < min_time { self.week[index] = RRDEntry::default(); } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|         let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         self.week[index] = Self::compute_new_value(&self.week, index, value); | ||||
|  | ||||
|         let reso = RRDTimeFrameResolution::Month as u64;  | ||||
|         let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; | ||||
|         let mut t = last; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < min_time { self.month[index] = RRDEntry::default(); } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|         let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         self.month[index] = Self::compute_new_value(&self.month, index, value); | ||||
|          | ||||
|         let reso = RRDTimeFrameResolution::Year as u64;  | ||||
|         let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; | ||||
|         let mut t = last; | ||||
|         let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         for _ in 0..RRD_DATA_ENTRIES { | ||||
|             if t < min_time { self.year[index] = RRDEntry::default(); } | ||||
|             t += reso; index = (index + 1) % RRD_DATA_ENTRIES; | ||||
|         } | ||||
|         let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; | ||||
|         self.year[index] = Self::compute_new_value(&self.year, index, value); | ||||
|  | ||||
|         self.last_update = epoch; | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user