use std::fs::File; use std::path::PathBuf; use std::sync::Arc; use std::io::{Write, BufReader}; use std::ffi::OsStr; use anyhow::Error; use nix::fcntl::OFlag; use crossbeam_channel::Receiver; use proxmox::tools::fs::atomic_open_or_create_file; const RRD_JOURNAL_NAME: &str = "rrd.journal"; use crate::rrd::DST; use crate::cache::CacheConfig; // shared state behind RwLock pub struct JournalState { config: Arc, journal: File, pub last_journal_flush: f64, pub journal_applied: bool, pub apply_thread_result: Option>>, } pub struct JournalEntry { pub time: f64, pub value: f64, pub dst: DST, pub rel_path: String, } impl JournalState { pub(crate) fn new(config: Arc) -> Result { let journal = JournalState::open_journal_writer(&config)?; Ok(Self { config, journal, last_journal_flush: 0.0, journal_applied: false, apply_thread_result: None, }) } pub fn append_journal_entry( &mut self, time: f64, value: f64, dst: DST, rel_path: &str, ) -> Result<(), Error> { let journal_entry = format!( "{}:{}:{}:{}\n", time, value, dst as u8, rel_path); self.journal.write_all(journal_entry.as_bytes())?; Ok(()) } pub fn open_journal_reader(&self) -> Result, Error> { // fixme : dup self.journal instead?? let mut journal_path = self.config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; let journal = atomic_open_or_create_file( &journal_path, flags, &[], self.config.file_options.clone(), )?; Ok(BufReader::new(journal)) } fn open_journal_writer(config: &CacheConfig) -> Result { let mut journal_path = config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; let journal = atomic_open_or_create_file( &journal_path, flags, &[], config.file_options.clone(), )?; Ok(journal) } pub fn rotate_journal(&mut self) -> Result<(), Error> { let mut journal_path = self.config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); let mut new_name = journal_path.clone(); let now = proxmox_time::epoch_i64(); new_name.set_extension(format!("journal-{:08x}", now)); std::fs::rename(journal_path, new_name)?; self.journal = Self::open_journal_writer(&self.config)?; Ok(()) } pub fn remove_old_journals(&self) -> Result<(), Error> { let journal_list = self.list_old_journals()?; for (_time, _filename, path) in journal_list { std::fs::remove_file(path)?; } Ok(()) } pub fn list_old_journals(&self) -> Result, Error> { let mut list = Vec::new(); for entry in std::fs::read_dir(&self.config.basedir)? { let entry = entry?; let path = entry.path(); if path.is_file() { if let Some(stem) = path.file_stem() { if stem != OsStr::new("rrd") { continue; } if let Some(extension) = path.extension() { if let Some(extension) = extension.to_str() { if let Some(rest) = extension.strip_prefix("journal-") { if let Ok(time) = u64::from_str_radix(rest, 16) { list.push((time, format!("rrd.{}", extension), path.to_owned())); } } } } } } } list.sort_unstable_by_key(|t| t.0); Ok(list) } }