pbs-datastore: add active operations tracking
Saves the currently active read/write operation counts in a file. The file is updated whenever a reference returned by lookup_datastore is dropped and whenever a reference is returned by lookup_datastore. The files are locked before every access, there is one file per datastore. Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
This commit is contained in:
parent
e9d2fc9362
commit
4bc84a6549
|
@ -35,5 +35,6 @@ proxmox-uuid = "1"
|
||||||
proxmox-sys = "0.2"
|
proxmox-sys = "0.2"
|
||||||
|
|
||||||
pbs-api-types = { path = "../pbs-api-types" }
|
pbs-api-types = { path = "../pbs-api-types" }
|
||||||
|
pbs-buildcfg = { path = "../pbs-buildcfg" }
|
||||||
pbs-tools = { path = "../pbs-tools" }
|
pbs-tools = { path = "../pbs-tools" }
|
||||||
pbs-config = { path = "../pbs-config" }
|
pbs-config = { path = "../pbs-config" }
|
||||||
|
|
|
@ -34,9 +34,10 @@ use crate::manifest::{
|
||||||
ArchiveType, BackupManifest,
|
ArchiveType, BackupManifest,
|
||||||
archive_type,
|
archive_type,
|
||||||
};
|
};
|
||||||
|
use crate::task_tracking::update_active_operations;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
|
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// checks if auth_id is owner, or, if owner is a token, if
|
/// checks if auth_id is owner, or, if owner is a token, if
|
||||||
|
@ -57,7 +58,7 @@ pub fn check_backup_owner(
|
||||||
///
|
///
|
||||||
/// A Datastore can store severals backups, and provides the
|
/// A Datastore can store severals backups, and provides the
|
||||||
/// management interface for backup.
|
/// management interface for backup.
|
||||||
pub struct DataStore {
|
pub struct DataStoreImpl {
|
||||||
chunk_store: Arc<ChunkStore>,
|
chunk_store: Arc<ChunkStore>,
|
||||||
gc_mutex: Mutex<()>,
|
gc_mutex: Mutex<()>,
|
||||||
last_gc_status: Mutex<GarbageCollectionStatus>,
|
last_gc_status: Mutex<GarbageCollectionStatus>,
|
||||||
|
@ -67,6 +68,38 @@ pub struct DataStore {
|
||||||
last_update: i64,
|
last_update: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct DataStore {
|
||||||
|
inner: Arc<DataStoreImpl>,
|
||||||
|
operation: Option<Operation>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for DataStore {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let mut new_operation = self.operation;
|
||||||
|
if let Some(operation) = self.operation {
|
||||||
|
if let Err(e) = update_active_operations(self.name(), operation, 1) {
|
||||||
|
log::error!("could not update active operations - {}", e);
|
||||||
|
new_operation = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DataStore {
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
operation: new_operation,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for DataStore {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(operation) = self.operation {
|
||||||
|
if let Err(e) = update_active_operations(self.name(), operation, -1) {
|
||||||
|
log::error!("could not update active operations - {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl DataStore {
|
impl DataStore {
|
||||||
pub fn lookup_datastore(
|
pub fn lookup_datastore(
|
||||||
name: &str,
|
name: &str,
|
||||||
|
@ -86,12 +119,19 @@ impl DataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(operation) = operation {
|
||||||
|
update_active_operations(name, operation, 1)?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut map = DATASTORE_MAP.lock().unwrap();
|
let mut map = DATASTORE_MAP.lock().unwrap();
|
||||||
let entry = map.get(name);
|
let entry = map.get(name);
|
||||||
|
|
||||||
if let Some(datastore) = &entry {
|
if let Some(datastore) = &entry {
|
||||||
if datastore.last_generation == generation && now < (datastore.last_update + 60) {
|
if datastore.last_generation == generation && now < (datastore.last_update + 60) {
|
||||||
return Ok(Arc::clone(datastore));
|
return Ok(Arc::new(Self {
|
||||||
|
inner: Arc::clone(datastore),
|
||||||
|
operation,
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +140,10 @@ impl DataStore {
|
||||||
let datastore = Arc::new(datastore);
|
let datastore = Arc::new(datastore);
|
||||||
map.insert(name.to_string(), datastore.clone());
|
map.insert(name.to_string(), datastore.clone());
|
||||||
|
|
||||||
Ok(datastore)
|
Ok(Arc::new(Self {
|
||||||
|
inner: datastore,
|
||||||
|
operation,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// removes all datastores that are not configured anymore
|
/// removes all datastores that are not configured anymore
|
||||||
|
@ -121,7 +164,7 @@ impl DataStore {
|
||||||
config: DataStoreConfig,
|
config: DataStoreConfig,
|
||||||
last_generation: usize,
|
last_generation: usize,
|
||||||
last_update: i64,
|
last_update: i64,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<DataStoreImpl, Error> {
|
||||||
let chunk_store = ChunkStore::open(store_name, path)?;
|
let chunk_store = ChunkStore::open(store_name, path)?;
|
||||||
|
|
||||||
let mut gc_status_path = chunk_store.base_path();
|
let mut gc_status_path = chunk_store.base_path();
|
||||||
|
@ -144,7 +187,7 @@ impl DataStore {
|
||||||
)?;
|
)?;
|
||||||
let chunk_order = tuning.chunk_order.unwrap_or(ChunkOrder::Inode);
|
let chunk_order = tuning.chunk_order.unwrap_or(ChunkOrder::Inode);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(DataStoreImpl {
|
||||||
chunk_store: Arc::new(chunk_store),
|
chunk_store: Arc::new(chunk_store),
|
||||||
gc_mutex: Mutex::new(()),
|
gc_mutex: Mutex::new(()),
|
||||||
last_gc_status: Mutex::new(gc_status),
|
last_gc_status: Mutex::new(gc_status),
|
||||||
|
@ -161,19 +204,19 @@ impl DataStore {
|
||||||
impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
|
impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
|
||||||
Error
|
Error
|
||||||
> {
|
> {
|
||||||
self.chunk_store.get_chunk_iterator()
|
self.inner.chunk_store.get_chunk_iterator()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
|
pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
|
||||||
|
|
||||||
let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
|
let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
|
pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
|
||||||
|
|
||||||
let full_path = self.chunk_store.relative_path(filename.as_ref());
|
let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
|
||||||
|
|
||||||
let index = FixedIndexReader::open(&full_path)?;
|
let index = FixedIndexReader::open(&full_path)?;
|
||||||
|
|
||||||
|
@ -185,14 +228,14 @@ impl DataStore {
|
||||||
) -> Result<DynamicIndexWriter, Error> {
|
) -> Result<DynamicIndexWriter, Error> {
|
||||||
|
|
||||||
let index = DynamicIndexWriter::create(
|
let index = DynamicIndexWriter::create(
|
||||||
self.chunk_store.clone(), filename.as_ref())?;
|
self.inner.chunk_store.clone(), filename.as_ref())?;
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
|
pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
|
||||||
|
|
||||||
let full_path = self.chunk_store.relative_path(filename.as_ref());
|
let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
|
||||||
|
|
||||||
let index = DynamicIndexReader::open(&full_path)?;
|
let index = DynamicIndexReader::open(&full_path)?;
|
||||||
|
|
||||||
|
@ -242,11 +285,11 @@ impl DataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn name(&self) -> &str {
|
pub fn name(&self) -> &str {
|
||||||
self.chunk_store.name()
|
self.inner.chunk_store.name()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn base_path(&self) -> PathBuf {
|
pub fn base_path(&self) -> PathBuf {
|
||||||
self.chunk_store.base_path()
|
self.inner.chunk_store.base_path()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cleanup a backup directory
|
/// Cleanup a backup directory
|
||||||
|
@ -549,7 +592,7 @@ impl DataStore {
|
||||||
worker.check_abort()?;
|
worker.check_abort()?;
|
||||||
worker.fail_on_shutdown()?;
|
worker.fail_on_shutdown()?;
|
||||||
let digest = index.index_digest(pos).unwrap();
|
let digest = index.index_digest(pos).unwrap();
|
||||||
if !self.chunk_store.cond_touch_chunk(digest, false)? {
|
if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
|
||||||
task_warn!(
|
task_warn!(
|
||||||
worker,
|
worker,
|
||||||
"warning: unable to access non-existent chunk {}, required by {:?}",
|
"warning: unable to access non-existent chunk {}, required by {:?}",
|
||||||
|
@ -565,7 +608,7 @@ impl DataStore {
|
||||||
let mut bad_path = PathBuf::new();
|
let mut bad_path = PathBuf::new();
|
||||||
bad_path.push(self.chunk_path(digest).0);
|
bad_path.push(self.chunk_path(digest).0);
|
||||||
bad_path.set_extension(bad_ext);
|
bad_path.set_extension(bad_ext);
|
||||||
self.chunk_store.cond_touch_path(&bad_path, false)?;
|
self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -645,24 +688,24 @@ impl DataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn last_gc_status(&self) -> GarbageCollectionStatus {
|
pub fn last_gc_status(&self) -> GarbageCollectionStatus {
|
||||||
self.last_gc_status.lock().unwrap().clone()
|
self.inner.last_gc_status.lock().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn garbage_collection_running(&self) -> bool {
|
pub fn garbage_collection_running(&self) -> bool {
|
||||||
!matches!(self.gc_mutex.try_lock(), Ok(_))
|
!matches!(self.inner.gc_mutex.try_lock(), Ok(_))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
|
pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
|
||||||
|
|
||||||
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
|
if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
|
||||||
|
|
||||||
// avoids that we run GC if an old daemon process has still a
|
// avoids that we run GC if an old daemon process has still a
|
||||||
// running backup writer, which is not save as we have no "oldest
|
// running backup writer, which is not save as we have no "oldest
|
||||||
// writer" information and thus no safe atime cutoff
|
// writer" information and thus no safe atime cutoff
|
||||||
let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
|
let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?;
|
||||||
|
|
||||||
let phase1_start_time = proxmox_time::epoch_i64();
|
let phase1_start_time = proxmox_time::epoch_i64();
|
||||||
let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
|
let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
|
||||||
|
|
||||||
let mut gc_status = GarbageCollectionStatus::default();
|
let mut gc_status = GarbageCollectionStatus::default();
|
||||||
gc_status.upid = Some(upid.to_string());
|
gc_status.upid = Some(upid.to_string());
|
||||||
|
@ -672,7 +715,7 @@ impl DataStore {
|
||||||
self.mark_used_chunks(&mut gc_status, worker)?;
|
self.mark_used_chunks(&mut gc_status, worker)?;
|
||||||
|
|
||||||
task_log!(worker, "Start GC phase2 (sweep unused chunks)");
|
task_log!(worker, "Start GC phase2 (sweep unused chunks)");
|
||||||
self.chunk_store.sweep_unused_chunks(
|
self.inner.chunk_store.sweep_unused_chunks(
|
||||||
oldest_writer,
|
oldest_writer,
|
||||||
phase1_start_time,
|
phase1_start_time,
|
||||||
&mut gc_status,
|
&mut gc_status,
|
||||||
|
@ -749,7 +792,7 @@ impl DataStore {
|
||||||
let _ = replace_file(path, serialized.as_bytes(), options, false);
|
let _ = replace_file(path, serialized.as_bytes(), options, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
*self.last_gc_status.lock().unwrap() = gc_status;
|
*self.inner.last_gc_status.lock().unwrap() = gc_status;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
bail!("Start GC failed - (already running/locked)");
|
bail!("Start GC failed - (already running/locked)");
|
||||||
|
@ -759,15 +802,15 @@ impl DataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
|
pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
|
||||||
self.chunk_store.try_shared_lock()
|
self.inner.chunk_store.try_shared_lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
|
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
|
||||||
self.chunk_store.chunk_path(digest)
|
self.inner.chunk_store.chunk_path(digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
|
pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
|
||||||
self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
|
self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert_chunk(
|
pub fn insert_chunk(
|
||||||
|
@ -775,7 +818,7 @@ impl DataStore {
|
||||||
chunk: &DataBlob,
|
chunk: &DataBlob,
|
||||||
digest: &[u8; 32],
|
digest: &[u8; 32],
|
||||||
) -> Result<(bool, u64), Error> {
|
) -> Result<(bool, u64), Error> {
|
||||||
self.chunk_store.insert_chunk(chunk, digest)
|
self.inner.chunk_store.insert_chunk(chunk, digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
|
pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
|
||||||
|
@ -791,13 +834,13 @@ impl DataStore {
|
||||||
|
|
||||||
|
|
||||||
pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
|
pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
|
||||||
let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
|
let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
|
||||||
std::fs::metadata(chunk_path).map_err(Error::from)
|
std::fs::metadata(chunk_path).map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
||||||
|
|
||||||
let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
|
let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
|
||||||
|
|
||||||
proxmox_lang::try_block!({
|
proxmox_lang::try_block!({
|
||||||
let mut file = std::fs::File::open(&chunk_path)?;
|
let mut file = std::fs::File::open(&chunk_path)?;
|
||||||
|
@ -911,7 +954,7 @@ impl DataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn verify_new(&self) -> bool {
|
pub fn verify_new(&self) -> bool {
|
||||||
self.verify_new
|
self.inner.verify_new
|
||||||
}
|
}
|
||||||
|
|
||||||
/// returns a list of chunks sorted by their inode number on disk
|
/// returns a list of chunks sorted by their inode number on disk
|
||||||
|
@ -938,7 +981,7 @@ impl DataStore {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let ino = match self.chunk_order {
|
let ino = match self.inner.chunk_order {
|
||||||
ChunkOrder::Inode => {
|
ChunkOrder::Inode => {
|
||||||
match self.stat_chunk(&info.digest) {
|
match self.stat_chunk(&info.digest) {
|
||||||
Err(_) => u64::MAX, // could not stat, move to end of list
|
Err(_) => u64::MAX, // could not stat, move to end of list
|
||||||
|
@ -951,7 +994,7 @@ impl DataStore {
|
||||||
chunk_list.push((pos, ino));
|
chunk_list.push((pos, ino));
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.chunk_order {
|
match self.inner.chunk_order {
|
||||||
// sorting by inode improves data locality, which makes it lots faster on spinners
|
// sorting by inode improves data locality, which makes it lots faster on spinners
|
||||||
ChunkOrder::Inode => {
|
ChunkOrder::Inode => {
|
||||||
chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(ino_b))
|
chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(ino_b))
|
||||||
|
|
|
@ -145,6 +145,9 @@
|
||||||
// Note: .pcat1 => Proxmox Catalog Format version 1
|
// Note: .pcat1 => Proxmox Catalog Format version 1
|
||||||
pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
|
pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
|
||||||
|
|
||||||
|
/// Directory path where active operations counters are saved.
|
||||||
|
pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
|
macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
|
||||||
() => {
|
() => {
|
||||||
|
@ -179,6 +182,7 @@ pub mod paperkey;
|
||||||
pub mod prune;
|
pub mod prune;
|
||||||
pub mod read_chunk;
|
pub mod read_chunk;
|
||||||
pub mod store_progress;
|
pub mod store_progress;
|
||||||
|
pub mod task_tracking;
|
||||||
|
|
||||||
pub mod dynamic_index;
|
pub mod dynamic_index;
|
||||||
pub mod fixed_index;
|
pub mod fixed_index;
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
use anyhow::Error;
|
||||||
|
use libc::pid_t;
|
||||||
|
use nix::unistd::Pid;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use pbs_api_types::Operation;
|
||||||
|
use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file, CreateOptions};
|
||||||
|
use proxmox_sys::linux::procfs;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone)]
|
||||||
|
struct TaskOperations {
|
||||||
|
pid: u32,
|
||||||
|
starttime: u64,
|
||||||
|
reading_operations: i64,
|
||||||
|
writing_operations: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
|
||||||
|
let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
|
||||||
|
let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
|
||||||
|
|
||||||
|
let user = pbs_config::backup_user()?;
|
||||||
|
let options = CreateOptions::new()
|
||||||
|
.group(user.gid)
|
||||||
|
.owner(user.uid)
|
||||||
|
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
|
||||||
|
|
||||||
|
let timeout = std::time::Duration::new(10, 0);
|
||||||
|
open_file_locked(&lock_path, timeout, true, options.clone())?;
|
||||||
|
|
||||||
|
let pid = std::process::id();
|
||||||
|
let starttime = procfs::PidStat::read_from_pid(Pid::from_raw(pid as pid_t))?.starttime;
|
||||||
|
let mut updated = false;
|
||||||
|
|
||||||
|
let mut updated_tasks: Vec<TaskOperations> = match file_read_optional_string(&path)? {
|
||||||
|
Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)?
|
||||||
|
.iter_mut()
|
||||||
|
.filter_map(
|
||||||
|
|task| match procfs::check_process_running(task.pid as pid_t) {
|
||||||
|
Some(stat) if pid == task.pid && stat.starttime != task.starttime => None,
|
||||||
|
Some(_) => {
|
||||||
|
if pid == task.pid {
|
||||||
|
updated = true;
|
||||||
|
match operation {
|
||||||
|
Operation::Read => task.reading_operations += count,
|
||||||
|
Operation::Write => task.writing_operations += count,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Some(task.clone())
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.collect(),
|
||||||
|
None => Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if !updated {
|
||||||
|
updated_tasks.push(match operation {
|
||||||
|
Operation::Read => TaskOperations {
|
||||||
|
pid,
|
||||||
|
starttime,
|
||||||
|
reading_operations: 1,
|
||||||
|
writing_operations: 0,
|
||||||
|
},
|
||||||
|
Operation::Write => TaskOperations {
|
||||||
|
pid,
|
||||||
|
starttime,
|
||||||
|
reading_operations: 0,
|
||||||
|
writing_operations: 1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
replace_file(
|
||||||
|
&path,
|
||||||
|
serde_json::to_string(&updated_tasks)?.as_bytes(),
|
||||||
|
options,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
}
|
|
@ -75,6 +75,7 @@ async fn run() -> Result<(), Error> {
|
||||||
|
|
||||||
proxmox_backup::server::create_run_dir()?;
|
proxmox_backup::server::create_run_dir()?;
|
||||||
proxmox_backup::server::create_state_dir()?;
|
proxmox_backup::server::create_state_dir()?;
|
||||||
|
proxmox_backup::server::create_active_operations_dir()?;
|
||||||
proxmox_backup::server::jobstate::create_jobstate_dir()?;
|
proxmox_backup::server::jobstate::create_jobstate_dir()?;
|
||||||
proxmox_backup::tape::create_tape_status_dir()?;
|
proxmox_backup::tape::create_tape_status_dir()?;
|
||||||
proxmox_backup::tape::create_drive_state_dir()?;
|
proxmox_backup::tape::create_drive_state_dir()?;
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
//! services. We want async IO, so this is built on top of
|
//! services. We want async IO, so this is built on top of
|
||||||
//! tokio/hyper.
|
//! tokio/hyper.
|
||||||
|
|
||||||
use anyhow::Error;
|
use anyhow::{format_err, Error};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use proxmox_sys::fs::{create_path, CreateOptions};
|
use proxmox_sys::fs::{create_path, CreateOptions};
|
||||||
|
@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
|
||||||
create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
|
create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create active operations dir with correct permission.
|
||||||
|
pub fn create_active_operations_dir() -> Result<(), Error> {
|
||||||
|
let backup_user = pbs_config::backup_user()?;
|
||||||
|
let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
|
||||||
|
let options = CreateOptions::new()
|
||||||
|
.perm(mode)
|
||||||
|
.owner(backup_user.uid)
|
||||||
|
.group(backup_user.gid);
|
||||||
|
|
||||||
|
create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
|
||||||
|
.map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue