proxmox-rrd: log journal apply/flush times, split apply and flush
We need to apply the journal only once.
This commit is contained in:
		| @ -5,6 +5,7 @@ use std::sync::RwLock; | ||||
| use std::io::Write; | ||||
| use std::io::{BufRead, BufReader}; | ||||
| use std::os::unix::io::AsRawFd; | ||||
| use std::time::SystemTime; | ||||
|  | ||||
| use anyhow::{format_err, bail, Error}; | ||||
| use nix::fcntl::OFlag; | ||||
| @ -33,6 +34,7 @@ struct RRDCacheState { | ||||
|     rrd_map: HashMap<String, RRD>, | ||||
|     journal: File, | ||||
|     last_journal_flush: f64, | ||||
|     journal_applied: bool, | ||||
| } | ||||
|  | ||||
| struct JournalEntry { | ||||
| @ -83,6 +85,7 @@ impl RRDCache { | ||||
|             journal, | ||||
|             rrd_map: HashMap::new(), | ||||
|             last_journal_flush: 0.0, | ||||
|             journal_applied: false, | ||||
|         }; | ||||
|  | ||||
|         Ok(Self { | ||||
| @ -171,18 +174,46 @@ impl RRDCache { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Apply journal. Should be used at server startup. | ||||
|     /// Apply and commit the journal. Should be used at server startup. | ||||
|     pub fn apply_journal(&self) -> Result<(), Error> { | ||||
|         let mut state = self.state.write().unwrap(); // block writers | ||||
|         self.apply_journal_locked(&mut state) | ||||
|         self.apply_and_commit_journal_locked(&mut state) | ||||
|     } | ||||
|  | ||||
|     fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> { | ||||
|  | ||||
|         log::info!("applying rrd journal"); | ||||
|     fn apply_and_commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> { | ||||
|  | ||||
|         state.last_journal_flush = proxmox_time::epoch_f64(); | ||||
|  | ||||
|         if !state.journal_applied { | ||||
|             let start_time = SystemTime::now(); | ||||
|             log::debug!("applying rrd journal"); | ||||
|  | ||||
|             match self.apply_journal_locked(state) { | ||||
|                 Ok(entries) => { | ||||
|                     let elapsed = start_time.elapsed()?.as_secs_f64(); | ||||
|                     log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); | ||||
|                 } | ||||
|                 Err(err) => bail!("apply rrd journal failed - {}", err), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let start_time = SystemTime::now(); | ||||
|         log::debug!("commit rrd journal"); | ||||
|  | ||||
|         match self.commit_journal_locked(state) { | ||||
|             Ok(rrd_file_count) => { | ||||
|                 let elapsed = start_time.elapsed()?.as_secs_f64(); | ||||
|                 log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", | ||||
|                            rrd_file_count, elapsed); | ||||
|             } | ||||
|             Err(err) => bail!("rrd journal commit failed: {}", err), | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> { | ||||
|  | ||||
|         let mut journal_path = self.basedir.clone(); | ||||
|         journal_path.push(RRD_JOURNAL_NAME); | ||||
|  | ||||
| @ -234,10 +265,22 @@ impl RRDCache { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|  | ||||
|         // We need to apply the journal only once, because further updates | ||||
|         // are always directly applied. | ||||
|         state.journal_applied = true; | ||||
|  | ||||
|         Ok(linenr) | ||||
|     } | ||||
|  | ||||
|     fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> { | ||||
|  | ||||
|         // save all RRDs | ||||
|         let mut rrd_file_count = 0; | ||||
|  | ||||
|         let mut errors = 0; | ||||
|         for (rel_path, rrd) in state.rrd_map.iter() { | ||||
|             rrd_file_count += 1; | ||||
|             let mut path = self.basedir.clone(); | ||||
|             path.push(&rel_path); | ||||
|             if let Err(err) = rrd.save(&path, self.file_options.clone()) { | ||||
| @ -246,17 +289,16 @@ impl RRDCache { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // if everything went ok, commit the journal | ||||
|  | ||||
|         if errors == 0 { | ||||
|             nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) | ||||
|                 .map_err(|err| format_err!("unable to truncate journal - {}", err))?; | ||||
|             log::info!("rrd journal successfully committed"); | ||||
|         } else { | ||||
|             log::error!("errors during rrd flush - unable to commit rrd journal"); | ||||
|        if errors != 0 { | ||||
|             bail!("errors during rrd flush - unable to commit rrd journal"); | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|         // if everything went ok, commit the journal | ||||
|  | ||||
|         nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) | ||||
|             .map_err(|err| format_err!("unable to truncate journal - {}", err))?; | ||||
|  | ||||
|         Ok(rrd_file_count) | ||||
|     } | ||||
|  | ||||
|     /// Update data in RAM and write file back to disk (journal) | ||||
| @ -270,10 +312,8 @@ impl RRDCache { | ||||
|  | ||||
|         let mut state = self.state.write().unwrap(); // block other writers | ||||
|  | ||||
|         if (time - state.last_journal_flush) > self.apply_interval { | ||||
|             if let Err(err) = self.apply_journal_locked(&mut state) { | ||||
|                 log::error!("apply journal failed: {}", err); | ||||
|             } | ||||
|         if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval { | ||||
|             self.apply_and_commit_journal_locked(&mut state)?; | ||||
|         } | ||||
|  | ||||
|         Self::append_journal_entry(&mut state, time, value, dst, rel_path)?; | ||||
|  | ||||
		Reference in New Issue
	
	Block a user