use std::fs::File; use std::path::{Path, PathBuf}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::io::Write; use std::io::{BufRead, BufReader}; use std::time::SystemTime; use std::ffi::OsStr; use std::thread::spawn; use crossbeam_channel::{bounded, Receiver, TryRecvError}; use anyhow::{format_err, bail, Error}; use nix::fcntl::OFlag; use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions}; use crate::rrd::{DST, CF, RRD, RRA}; const RRD_JOURNAL_NAME: &str = "rrd.journal"; /// RRD cache - keep RRD data in RAM, but write updates to disk /// /// This cache is designed to run as single instance (no concurrent /// access from other processes). pub struct RRDCache { config: Arc, state: Arc>, rrd_map: Arc>, } struct CacheConfig { apply_interval: f64, basedir: PathBuf, file_options: CreateOptions, dir_options: CreateOptions, } struct RRDMap { config: Arc, map: HashMap, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, } impl RRDMap { fn new( config: Arc, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, ) -> Self { Self { config, map: HashMap::new(), load_rrd_cb, } } fn update( &mut self, rel_path: &str, time: f64, value: f64, dst: DST, new_only: bool, ) -> Result<(), Error> { if let Some(rrd) = self.map.get_mut(rel_path) { if !new_only || time > rrd.last_update() { rrd.update(time, value); } } else { let mut path = self.config.basedir.clone(); path.push(rel_path); create_path( path.parent().unwrap(), Some(self.config.dir_options.clone()), Some(self.config.dir_options.clone()), )?; let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst); if !new_only || time > rrd.last_update() { rrd.update(time, value); } self.map.insert(rel_path.to_string(), rrd); } Ok(()) } fn flush_rrd_files(&self) -> Result { let mut rrd_file_count = 0; let mut errors = 0; for (rel_path, rrd) in self.map.iter() { rrd_file_count += 1; let mut path = self.config.basedir.clone(); path.push(&rel_path); if let Err(err) = rrd.save(&path, self.config.file_options.clone()) { errors += 1; log::error!("unable to save {:?}: {}", path, err); } } if errors != 0 { bail!("errors during rrd flush - unable to commit rrd journal"); } Ok(rrd_file_count) } fn extract_cached_data( &self, base: &str, name: &str, cf: CF, resolution: u64, start: Option, end: Option, ) -> Result>)>, Error> { match self.map.get(&format!("{}/{}", base, name)) { Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)), None => Ok(None), } } } // shared state behind RwLock struct JournalState { config: Arc, journal: File, last_journal_flush: f64, journal_applied: bool, apply_thread_result: Option>>, } struct JournalEntry { time: f64, value: f64, dst: DST, rel_path: String, } impl JournalState { fn new(config: Arc) -> Result { let journal = JournalState::open_journal_writer(&config)?; Ok(Self { config, journal, last_journal_flush: 0.0, journal_applied: false, apply_thread_result: None, }) } fn open_journal_reader(&self) -> Result, Error> { // fixme : dup self.journal instead?? let mut journal_path = self.config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; let journal = atomic_open_or_create_file( &journal_path, flags, &[], self.config.file_options.clone(), )?; Ok(BufReader::new(journal)) } fn open_journal_writer(config: &CacheConfig) -> Result { let mut journal_path = config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; let journal = atomic_open_or_create_file( &journal_path, flags, &[], config.file_options.clone(), )?; Ok(journal) } fn rotate_journal(&mut self) -> Result<(), Error> { let mut journal_path = self.config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let mut new_name = journal_path.clone(); let now = proxmox_time::epoch_i64(); new_name.set_extension(format!("journal-{:08x}", now)); std::fs::rename(journal_path, new_name)?; self.journal = Self::open_journal_writer(&self.config)?; Ok(()) } fn remove_old_journals(&self) -> Result<(), Error> { let journal_list = self.list_old_journals()?; for (_time, _filename, path) in journal_list { std::fs::remove_file(path)?; } Ok(()) } fn list_old_journals(&self) -> Result, Error> { let mut list = Vec::new(); for entry in std::fs::read_dir(&self.config.basedir)? { let entry = entry?; let path = entry.path(); if path.is_file() { if let Some(stem) = path.file_stem() { if stem != OsStr::new("rrd") { continue; } if let Some(extension) = path.extension() { if let Some(extension) = extension.to_str() { if let Some(rest) = extension.strip_prefix("journal-") { if let Ok(time) = u64::from_str_radix(rest, 16) { list.push((time, format!("rrd.{}", extension), path.to_owned())); } } } } } } } list.sort_unstable_by_key(|t| t.0); Ok(list) } } impl RRDCache { /// Creates a new instance /// /// `basedir`: All files are stored relative to this path. /// /// `file_options`: Files are created with this options. /// /// `dir_options`: Directories are created with this options. /// /// `apply_interval`: Commit journal after `apply_interval` seconds. /// /// `load_rrd_cb`; The callback function is used to load RRD files, /// and should return a newly generated RRD if the file does not /// exists (or is unreadable). This may generate RRDs with /// different configurations (dependent on `rel_path`). pub fn new>( basedir: P, file_options: Option, dir_options: Option, apply_interval: f64, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, ) -> Result { let basedir = basedir.as_ref().to_owned(); let file_options = file_options.unwrap_or_else(|| CreateOptions::new()); let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new()); create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; let config = Arc::new(CacheConfig { basedir: basedir.clone(), file_options: file_options.clone(), dir_options: dir_options, apply_interval, }); let state = JournalState::new(Arc::clone(&config))?; let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb); Ok(Self { config: Arc::clone(&config), state: Arc::new(RwLock::new(state)), rrd_map: Arc::new(RwLock::new(rrd_map)), }) } /// Create a new RRD as used by the proxmox backup server /// /// It contains the following RRAs: /// /// * cf=average,r=60,n=1440 => 1day /// * cf=maximum,r=60,n=1440 => 1day /// * cf=average,r=30*60,n=1440 => 1month /// * cf=maximum,r=30*60,n=1440 => 1month /// * cf=average,r=6*3600,n=1440 => 1year /// * cf=maximum,r=6*3600,n=1440 => 1year /// * cf=average,r=7*86400,n=570 => 10years /// * cf=maximum,r=7*86400,n=570 => 10year /// /// The resultion data file size is about 80KB. pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD { let mut rra_list = Vec::new(); // 1min * 1440 => 1day rra_list.push(RRA::new(CF::Average, 60, 1440)); rra_list.push(RRA::new(CF::Maximum, 60, 1440)); // 30min * 1440 => 30days = 1month rra_list.push(RRA::new(CF::Average, 30*60, 1440)); rra_list.push(RRA::new(CF::Maximum, 30*60, 1440)); // 6h * 1440 => 360days = 1year rra_list.push(RRA::new(CF::Average, 6*3600, 1440)); rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440)); // 1week * 570 => 10years rra_list.push(RRA::new(CF::Average, 7*86400, 570)); rra_list.push(RRA::new(CF::Maximum, 7*86400, 570)); RRD::new(dst, rra_list) } fn parse_journal_line(line: &str) -> Result { let line = line.trim(); let parts: Vec<&str> = line.splitn(4, ':').collect(); if parts.len() != 4 { bail!("wrong numper of components"); } let time: f64 = parts[0].parse() .map_err(|_| format_err!("unable to parse time"))?; let value: f64 = parts[1].parse() .map_err(|_| format_err!("unable to parse value"))?; let dst: u8 = parts[2].parse() .map_err(|_| format_err!("unable to parse data source type"))?; let dst = match dst { 0 => DST::Gauge, 1 => DST::Derive, _ => bail!("got strange value for data source type '{}'", dst), }; let rel_path = parts[3].to_string(); Ok(JournalEntry { time, value, dst, rel_path }) } fn append_journal_entry( &self, time: f64, value: f64, dst: DST, rel_path: &str, ) -> Result<(), Error> { let mut state = self.state.write().unwrap(); // block other writers let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); state.journal.write_all(journal_entry.as_bytes())?; Ok(()) } /// Apply and commit the journal. Should be used at server startup. pub fn apply_journal(&self) -> Result { let state = Arc::clone(&self.state); let rrd_map = Arc::clone(&self.rrd_map); let mut state_guard = self.state.write().unwrap(); let journal_applied = state_guard.journal_applied; let now = proxmox_time::epoch_f64(); let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; if journal_applied && !wants_commit { return Ok(journal_applied); } if let Some(ref recv) = state_guard.apply_thread_result { match recv.try_recv() { Ok(Ok(())) => { // finished without errors, OK } Ok(Err(err)) => { // finished with errors, log them log::error!("{}", err); } Err(TryRecvError::Empty) => { // still running return Ok(journal_applied); } Err(TryRecvError::Disconnected) => { // crashed, start again log::error!("apply journal thread crashed - try again"); } } } state_guard.last_journal_flush = proxmox_time::epoch_f64(); let (sender, receiver) = bounded(1); state_guard.apply_thread_result = Some(receiver); spawn(move || { let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied) .map_err(|err| err.to_string()); sender.send(result).unwrap(); }); Ok(journal_applied) } /// Update data in RAM and write file back to disk (journal) pub fn update_value( &self, rel_path: &str, time: f64, value: f64, dst: DST, ) -> Result<(), Error> { let journal_applied = self.apply_journal()?; self.append_journal_entry(time, value, dst, rel_path)?; if journal_applied { self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; } Ok(()) } /// Extract data from cached RRD /// /// `start`: Start time. If not sepecified, we simply extract 10 data points. /// /// `end`: End time. Default is to use the current time. pub fn extract_cached_data( &self, base: &str, name: &str, cf: CF, resolution: u64, start: Option, end: Option, ) -> Result>)>, Error> { self.rrd_map.read().unwrap() .extract_cached_data(base, name, cf, resolution, start, end) } } fn apply_and_commit_journal_thread( state: Arc>, rrd_map: Arc>, commit_only: bool, ) -> Result<(), Error> { if commit_only { state.write().unwrap().rotate_journal()?; // start new journal, keep old one } else { let start_time = SystemTime::now(); log::debug!("applying rrd journal"); match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { Ok(entries) => { let elapsed = start_time.elapsed().unwrap().as_secs_f64(); log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); } Err(err) => bail!("apply rrd journal failed - {}", err), } } let start_time = SystemTime::now(); log::debug!("commit rrd journal"); match commit_journal_impl(state, rrd_map) { Ok(rrd_file_count) => { let elapsed = start_time.elapsed().unwrap().as_secs_f64(); log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", rrd_file_count, elapsed); } Err(err) => bail!("rrd journal commit failed: {}", err), } Ok(()) } fn apply_journal_lines( state: Arc>, rrd_map: Arc>, journal_name: &str, // used for logging reader: &mut BufReader, lock_read_line: bool, ) -> Result { let mut linenr = 0; loop { linenr += 1; let mut line = String::new(); let len = if lock_read_line { let _lock = state.read().unwrap(); // make sure we read entire lines reader.read_line(&mut line)? } else { reader.read_line(&mut line)? }; if len == 0 { break; } let entry = match RRDCache::parse_journal_line(&line) { Ok(entry) => entry, Err(err) => { log::warn!( "unable to parse rrd journal '{}' line {} (skip) - {}", journal_name, linenr, err, ); continue; // skip unparsable lines } }; rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; } Ok(linenr) } fn apply_journal_impl( state: Arc>, rrd_map: Arc>, ) -> Result { let mut lines = 0; // Apply old journals first let journal_list = state.read().unwrap().list_old_journals()?; for (_time, filename, path) in journal_list { log::info!("apply old journal log {}", filename); let file = std::fs::OpenOptions::new().read(true).open(path)?; let mut reader = BufReader::new(file); lines += apply_journal_lines( Arc::clone(&state), Arc::clone(&rrd_map), &filename, &mut reader, false, )?; } let mut journal = state.read().unwrap().open_journal_reader()?; lines += apply_journal_lines( Arc::clone(&state), Arc::clone(&rrd_map), "rrd.journal", &mut journal, true, )?; { let mut state_guard = state.write().unwrap(); // block other writers lines += apply_journal_lines( Arc::clone(&state), Arc::clone(&rrd_map), "rrd.journal", &mut journal, false, )?; state_guard.rotate_journal()?; // start new journal, keep old one // We need to apply the journal only once, because further updates // are always directly applied. state_guard.journal_applied = true; } Ok(lines) } fn commit_journal_impl( state: Arc>, rrd_map: Arc>, ) -> Result { // save all RRDs - we only need a read lock here let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?; // if everything went ok, remove the old journal files state.write().unwrap().remove_old_journals()?; Ok(rrd_file_count) }