proxmox-rrd: move JournalState into extra file
This commit is contained in:
parent
0ca41155b2
commit
f84304235b
|
@ -2,20 +2,18 @@ use std::fs::File;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::io::Write;
|
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use std::ffi::OsStr;
|
|
||||||
use std::thread::spawn;
|
use std::thread::spawn;
|
||||||
use crossbeam_channel::{bounded, Receiver, TryRecvError};
|
use crossbeam_channel::{bounded, TryRecvError};
|
||||||
use anyhow::{format_err, bail, Error};
|
use anyhow::{format_err, bail, Error};
|
||||||
use nix::fcntl::OFlag;
|
|
||||||
|
|
||||||
use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
|
use proxmox::tools::fs::{create_path, CreateOptions};
|
||||||
|
|
||||||
use crate::rrd::{DST, CF, RRD, RRA};
|
use crate::rrd::{DST, CF, RRD, RRA};
|
||||||
|
|
||||||
const RRD_JOURNAL_NAME: &str = "rrd.journal";
|
mod journal;
|
||||||
|
use journal::*;
|
||||||
|
|
||||||
/// RRD cache - keep RRD data in RAM, but write updates to disk
|
/// RRD cache - keep RRD data in RAM, but write updates to disk
|
||||||
///
|
///
|
||||||
|
@ -27,7 +25,7 @@ pub struct RRDCache {
|
||||||
rrd_map: Arc<RwLock<RRDMap>>,
|
rrd_map: Arc<RwLock<RRDMap>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CacheConfig {
|
pub(crate) struct CacheConfig {
|
||||||
apply_interval: f64,
|
apply_interval: f64,
|
||||||
basedir: PathBuf,
|
basedir: PathBuf,
|
||||||
file_options: CreateOptions,
|
file_options: CreateOptions,
|
||||||
|
@ -123,113 +121,6 @@ impl RRDMap {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// shared state behind RwLock
|
|
||||||
struct JournalState {
|
|
||||||
config: Arc<CacheConfig>,
|
|
||||||
journal: File,
|
|
||||||
last_journal_flush: f64,
|
|
||||||
journal_applied: bool,
|
|
||||||
apply_thread_result: Option<Receiver<Result<(), String>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct JournalEntry {
|
|
||||||
time: f64,
|
|
||||||
value: f64,
|
|
||||||
dst: DST,
|
|
||||||
rel_path: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JournalState {
|
|
||||||
|
|
||||||
fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
|
|
||||||
let journal = JournalState::open_journal_writer(&config)?;
|
|
||||||
Ok(Self {
|
|
||||||
config,
|
|
||||||
journal,
|
|
||||||
last_journal_flush: 0.0,
|
|
||||||
journal_applied: false,
|
|
||||||
apply_thread_result: None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
|
|
||||||
|
|
||||||
// fixme : dup self.journal instead??
|
|
||||||
let mut journal_path = self.config.basedir.clone();
|
|
||||||
journal_path.push(RRD_JOURNAL_NAME);
|
|
||||||
|
|
||||||
let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
|
|
||||||
let journal = atomic_open_or_create_file(
|
|
||||||
&journal_path,
|
|
||||||
flags,
|
|
||||||
&[],
|
|
||||||
self.config.file_options.clone(),
|
|
||||||
)?;
|
|
||||||
Ok(BufReader::new(journal))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
|
|
||||||
let mut journal_path = config.basedir.clone();
|
|
||||||
journal_path.push(RRD_JOURNAL_NAME);
|
|
||||||
|
|
||||||
let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
|
|
||||||
let journal = atomic_open_or_create_file(
|
|
||||||
&journal_path,
|
|
||||||
flags,
|
|
||||||
&[],
|
|
||||||
config.file_options.clone(),
|
|
||||||
)?;
|
|
||||||
Ok(journal)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rotate_journal(&mut self) -> Result<(), Error> {
|
|
||||||
let mut journal_path = self.config.basedir.clone();
|
|
||||||
journal_path.push(RRD_JOURNAL_NAME);
|
|
||||||
|
|
||||||
let mut new_name = journal_path.clone();
|
|
||||||
let now = proxmox_time::epoch_i64();
|
|
||||||
new_name.set_extension(format!("journal-{:08x}", now));
|
|
||||||
std::fs::rename(journal_path, new_name)?;
|
|
||||||
|
|
||||||
self.journal = Self::open_journal_writer(&self.config)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_old_journals(&self) -> Result<(), Error> {
|
|
||||||
|
|
||||||
let journal_list = self.list_old_journals()?;
|
|
||||||
|
|
||||||
for (_time, _filename, path) in journal_list {
|
|
||||||
std::fs::remove_file(path)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn list_old_journals(&self) -> Result<Vec<(u64, String, PathBuf)>, Error> {
|
|
||||||
let mut list = Vec::new();
|
|
||||||
for entry in std::fs::read_dir(&self.config.basedir)? {
|
|
||||||
let entry = entry?;
|
|
||||||
let path = entry.path();
|
|
||||||
if path.is_file() {
|
|
||||||
if let Some(stem) = path.file_stem() {
|
|
||||||
if stem != OsStr::new("rrd") { continue; }
|
|
||||||
if let Some(extension) = path.extension() {
|
|
||||||
if let Some(extension) = extension.to_str() {
|
|
||||||
if let Some(rest) = extension.strip_prefix("journal-") {
|
|
||||||
if let Ok(time) = u64::from_str_radix(rest, 16) {
|
|
||||||
list.push((time, format!("rrd.{}", extension), path.to_owned()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
list.sort_unstable_by_key(|t| t.0);
|
|
||||||
Ok(list)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RRDCache {
|
impl RRDCache {
|
||||||
|
|
||||||
|
@ -343,19 +234,6 @@ impl RRDCache {
|
||||||
Ok(JournalEntry { time, value, dst, rel_path })
|
Ok(JournalEntry { time, value, dst, rel_path })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn append_journal_entry(
|
|
||||||
&self,
|
|
||||||
time: f64,
|
|
||||||
value: f64,
|
|
||||||
dst: DST,
|
|
||||||
rel_path: &str,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut state = self.state.write().unwrap(); // block other writers
|
|
||||||
let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
|
|
||||||
state.journal.write_all(journal_entry.as_bytes())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Apply and commit the journal. Should be used at server startup.
|
/// Apply and commit the journal. Should be used at server startup.
|
||||||
pub fn apply_journal(&self) -> Result<bool, Error> {
|
pub fn apply_journal(&self) -> Result<bool, Error> {
|
||||||
let state = Arc::clone(&self.state);
|
let state = Arc::clone(&self.state);
|
||||||
|
@ -414,7 +292,8 @@ impl RRDCache {
|
||||||
|
|
||||||
let journal_applied = self.apply_journal()?;
|
let journal_applied = self.apply_journal()?;
|
||||||
|
|
||||||
self.append_journal_entry(time, value, dst, rel_path)?;
|
self.state.write().unwrap()
|
||||||
|
.append_journal_entry(time, value, dst, rel_path)?;
|
||||||
|
|
||||||
if journal_applied {
|
if journal_applied {
|
||||||
self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
|
self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
|
||||||
|
|
|
@ -0,0 +1,137 @@
|
||||||
|
use std::fs::File;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::io::{Write, BufReader};
|
||||||
|
use std::ffi::OsStr;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use nix::fcntl::OFlag;
|
||||||
|
use crossbeam_channel::Receiver;
|
||||||
|
|
||||||
|
use proxmox::tools::fs::atomic_open_or_create_file;
|
||||||
|
|
||||||
|
const RRD_JOURNAL_NAME: &str = "rrd.journal";
|
||||||
|
|
||||||
|
use crate::rrd::DST;
|
||||||
|
use crate::cache::CacheConfig;
|
||||||
|
|
||||||
|
// shared state behind RwLock
|
||||||
|
pub struct JournalState {
|
||||||
|
config: Arc<CacheConfig>,
|
||||||
|
journal: File,
|
||||||
|
pub last_journal_flush: f64,
|
||||||
|
pub journal_applied: bool,
|
||||||
|
pub apply_thread_result: Option<Receiver<Result<(), String>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JournalEntry {
|
||||||
|
pub time: f64,
|
||||||
|
pub value: f64,
|
||||||
|
pub dst: DST,
|
||||||
|
pub rel_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JournalState {
|
||||||
|
|
||||||
|
pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
|
||||||
|
let journal = JournalState::open_journal_writer(&config)?;
|
||||||
|
Ok(Self {
|
||||||
|
config,
|
||||||
|
journal,
|
||||||
|
last_journal_flush: 0.0,
|
||||||
|
journal_applied: false,
|
||||||
|
apply_thread_result: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn append_journal_entry(
|
||||||
|
&mut self,
|
||||||
|
time: f64,
|
||||||
|
value: f64,
|
||||||
|
dst: DST,
|
||||||
|
rel_path: &str,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let journal_entry = format!(
|
||||||
|
"{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
|
||||||
|
self.journal.write_all(journal_entry.as_bytes())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
|
||||||
|
|
||||||
|
// fixme : dup self.journal instead??
|
||||||
|
let mut journal_path = self.config.basedir.clone();
|
||||||
|
journal_path.push(RRD_JOURNAL_NAME);
|
||||||
|
|
||||||
|
let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
|
||||||
|
let journal = atomic_open_or_create_file(
|
||||||
|
&journal_path,
|
||||||
|
flags,
|
||||||
|
&[],
|
||||||
|
self.config.file_options.clone(),
|
||||||
|
)?;
|
||||||
|
Ok(BufReader::new(journal))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
|
||||||
|
let mut journal_path = config.basedir.clone();
|
||||||
|
journal_path.push(RRD_JOURNAL_NAME);
|
||||||
|
|
||||||
|
let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
|
||||||
|
let journal = atomic_open_or_create_file(
|
||||||
|
&journal_path,
|
||||||
|
flags,
|
||||||
|
&[],
|
||||||
|
config.file_options.clone(),
|
||||||
|
)?;
|
||||||
|
Ok(journal)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rotate_journal(&mut self) -> Result<(), Error> {
|
||||||
|
let mut journal_path = self.config.basedir.clone();
|
||||||
|
journal_path.push(RRD_JOURNAL_NAME);
|
||||||
|
|
||||||
|
let mut new_name = journal_path.clone();
|
||||||
|
let now = proxmox_time::epoch_i64();
|
||||||
|
new_name.set_extension(format!("journal-{:08x}", now));
|
||||||
|
std::fs::rename(journal_path, new_name)?;
|
||||||
|
|
||||||
|
self.journal = Self::open_journal_writer(&self.config)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_old_journals(&self) -> Result<(), Error> {
|
||||||
|
|
||||||
|
let journal_list = self.list_old_journals()?;
|
||||||
|
|
||||||
|
for (_time, _filename, path) in journal_list {
|
||||||
|
std::fs::remove_file(path)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_old_journals(&self) -> Result<Vec<(u64, String, PathBuf)>, Error> {
|
||||||
|
let mut list = Vec::new();
|
||||||
|
for entry in std::fs::read_dir(&self.config.basedir)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let path = entry.path();
|
||||||
|
if path.is_file() {
|
||||||
|
if let Some(stem) = path.file_stem() {
|
||||||
|
if stem != OsStr::new("rrd") { continue; }
|
||||||
|
if let Some(extension) = path.extension() {
|
||||||
|
if let Some(extension) = extension.to_str() {
|
||||||
|
if let Some(rest) = extension.strip_prefix("journal-") {
|
||||||
|
if let Ok(time) = u64::from_str_radix(rest, 16) {
|
||||||
|
list.push((time, format!("rrd.{}", extension), path.to_owned()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list.sort_unstable_by_key(|t| t.0);
|
||||||
|
Ok(list)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue