proxmox-rrd: use fsync instead of syncfs
syncfs can sync unrelated data, and we do not want that. Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
parent
f5f9ec81d2
commit
d6473f5359
@ -4,6 +4,9 @@ use std::sync::{Arc, RwLock};
|
|||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use std::thread::spawn;
|
use std::thread::spawn;
|
||||||
|
use std::os::unix::io::AsRawFd;
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
use crossbeam_channel::{bounded, TryRecvError};
|
use crossbeam_channel::{bounded, TryRecvError};
|
||||||
use anyhow::{format_err, bail, Error};
|
use anyhow::{format_err, bail, Error};
|
||||||
|
|
||||||
@ -127,6 +130,7 @@ impl RRDCache {
|
|||||||
|
|
||||||
/// Apply and commit the journal. Should be used at server startup.
|
/// Apply and commit the journal. Should be used at server startup.
|
||||||
pub fn apply_journal(&self) -> Result<bool, Error> {
|
pub fn apply_journal(&self) -> Result<bool, Error> {
|
||||||
|
let config = Arc::clone(&self.config);
|
||||||
let state = Arc::clone(&self.state);
|
let state = Arc::clone(&self.state);
|
||||||
let rrd_map = Arc::clone(&self.rrd_map);
|
let rrd_map = Arc::clone(&self.rrd_map);
|
||||||
|
|
||||||
@ -168,7 +172,7 @@ impl RRDCache {
|
|||||||
state_guard.apply_thread_result = Some(receiver);
|
state_guard.apply_thread_result = Some(receiver);
|
||||||
|
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
|
let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
|
||||||
.map_err(|err| err.to_string());
|
.map_err(|err| err.to_string());
|
||||||
sender.send(result).unwrap();
|
sender.send(result).unwrap();
|
||||||
});
|
});
|
||||||
@ -219,6 +223,7 @@ impl RRDCache {
|
|||||||
|
|
||||||
|
|
||||||
fn apply_and_commit_journal_thread(
|
fn apply_and_commit_journal_thread(
|
||||||
|
config: Arc<CacheConfig>,
|
||||||
state: Arc<RwLock<JournalState>>,
|
state: Arc<RwLock<JournalState>>,
|
||||||
rrd_map: Arc<RwLock<RRDMap>>,
|
rrd_map: Arc<RwLock<RRDMap>>,
|
||||||
commit_only: bool,
|
commit_only: bool,
|
||||||
@ -242,7 +247,7 @@ fn apply_and_commit_journal_thread(
|
|||||||
let start_time = SystemTime::now();
|
let start_time = SystemTime::now();
|
||||||
log::debug!("commit rrd journal");
|
log::debug!("commit rrd journal");
|
||||||
|
|
||||||
match commit_journal_impl(state, rrd_map) {
|
match commit_journal_impl(config, state, rrd_map) {
|
||||||
Ok(rrd_file_count) => {
|
Ok(rrd_file_count) => {
|
||||||
let elapsed = start_time.elapsed().unwrap().as_secs_f64();
|
let elapsed = start_time.elapsed().unwrap().as_secs_f64();
|
||||||
log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
|
log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
|
||||||
@ -346,7 +351,32 @@ fn apply_journal_impl(
|
|||||||
Ok(lines)
|
Ok(lines)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
|
||||||
|
let file = std::fs::File::open(path)?;
|
||||||
|
nix::unistd::fsync(file.as_raw_fd())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate)fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
|
||||||
|
let file = std::fs::File::open(path)?;
|
||||||
|
nix::unistd::fsync(file.as_raw_fd())?;
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
fsync_file_or_dir(parent)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
|
||||||
|
let mut path = basedir.to_owned();
|
||||||
|
let rel_path = Path::new(rel_path);
|
||||||
|
if let Some(parent) = rel_path.parent() {
|
||||||
|
path.push(parent);
|
||||||
|
}
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
fn commit_journal_impl(
|
fn commit_journal_impl(
|
||||||
|
config: Arc<CacheConfig>,
|
||||||
state: Arc<RwLock<JournalState>>,
|
state: Arc<RwLock<JournalState>>,
|
||||||
rrd_map: Arc<RwLock<RRDMap>>,
|
rrd_map: Arc<RwLock<RRDMap>>,
|
||||||
) -> Result<usize, Error> {
|
) -> Result<usize, Error> {
|
||||||
@ -356,8 +386,15 @@ fn commit_journal_impl(
|
|||||||
let mut rrd_file_count = 0;
|
let mut rrd_file_count = 0;
|
||||||
let mut errors = 0;
|
let mut errors = 0;
|
||||||
|
|
||||||
|
let mut dir_set = BTreeSet::new();
|
||||||
|
|
||||||
|
log::info!("write rrd data back to disk");
|
||||||
|
|
||||||
// save all RRDs - we only need a read lock here
|
// save all RRDs - we only need a read lock here
|
||||||
|
// Note: no fsync here (we do it afterwards)
|
||||||
for rel_path in files.iter() {
|
for rel_path in files.iter() {
|
||||||
|
let parent_dir = rrd_parent_dir(&config.basedir, &rel_path);
|
||||||
|
dir_set.insert(parent_dir);
|
||||||
rrd_file_count += 1;
|
rrd_file_count += 1;
|
||||||
if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(&rel_path) {
|
if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(&rel_path) {
|
||||||
errors += 1;
|
errors += 1;
|
||||||
@ -365,12 +402,29 @@ fn commit_journal_impl(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
state.read().unwrap().syncfs()?;
|
|
||||||
|
|
||||||
if errors != 0 {
|
if errors != 0 {
|
||||||
bail!("errors during rrd flush - unable to commit rrd journal");
|
bail!("errors during rrd flush - unable to commit rrd journal");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Important: We fsync files after writing all data! This increase
|
||||||
|
// the likelihood that files are already synced, so this is
|
||||||
|
// much faster (although we need to re-open the files).
|
||||||
|
|
||||||
|
log::info!("starting rrd data sync");
|
||||||
|
|
||||||
|
for rel_path in files.iter() {
|
||||||
|
let mut path = config.basedir.clone();
|
||||||
|
path.push(&rel_path);
|
||||||
|
fsync_file_or_dir(&path)
|
||||||
|
.map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// also fsync directories
|
||||||
|
for dir_path in dir_set {
|
||||||
|
fsync_file_or_dir(&dir_path)
|
||||||
|
.map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
|
||||||
|
}
|
||||||
|
|
||||||
// if everything went ok, remove the old journal files
|
// if everything went ok, remove the old journal files
|
||||||
state.write().unwrap().remove_old_journals()?;
|
state.write().unwrap().remove_old_journals()?;
|
||||||
|
|
||||||
|
11
proxmox-rrd/src/cache/journal.rs
vendored
11
proxmox-rrd/src/cache/journal.rs
vendored
@ -88,11 +88,6 @@ impl JournalState {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn syncfs(&self) -> Result<(), nix::Error> {
|
|
||||||
let res = unsafe { libc::syncfs(self.journal.as_raw_fd()) };
|
|
||||||
nix::errno::Errno::result(res).map(drop)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn append_journal_entry(
|
pub fn append_journal_entry(
|
||||||
&mut self,
|
&mut self,
|
||||||
time: f64,
|
time: f64,
|
||||||
@ -143,9 +138,13 @@ impl JournalState {
|
|||||||
let mut new_name = journal_path.clone();
|
let mut new_name = journal_path.clone();
|
||||||
let now = proxmox_time::epoch_i64();
|
let now = proxmox_time::epoch_i64();
|
||||||
new_name.set_extension(format!("journal-{:08x}", now));
|
new_name.set_extension(format!("journal-{:08x}", now));
|
||||||
std::fs::rename(journal_path, new_name)?;
|
std::fs::rename(journal_path, &new_name)?;
|
||||||
|
|
||||||
self.journal = Self::open_journal_writer(&self.config)?;
|
self.journal = Self::open_journal_writer(&self.config)?;
|
||||||
|
|
||||||
|
// make sure the old journal data landed on the disk
|
||||||
|
super::fsync_file_and_parent(&new_name)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user