src/backup/chunk_store.rs: fix GC

Added option to get oldest_writer timestamp from ProcessLocker.
This commit is contained in:
Dietmar Maurer 2019-03-31 17:21:36 +02:00
parent d85987aeeb
commit 11861a482d
3 changed files with 59 additions and 4 deletions

View File

@ -247,11 +247,29 @@ impl ChunkStore {
})) }))
} }
pub fn sweep_unused_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> { pub fn oldest_writer(&self) -> Option<i64> {
tools::ProcessLocker::oldest_shared_lock(self.locker.clone())
}
pub fn sweep_unused_chunks(
&self,
oldest_writer: Option<i64>,
status: &mut GarbageCollectionStatus
) -> Result<(), Error> {
use nix::sys::stat::fstatat; use nix::sys::stat::fstatat;
let now = unsafe { libc::time(std::ptr::null_mut()) }; let now = unsafe { libc::time(std::ptr::null_mut()) };
let mut min_atime = now - 3600*24; // at least 24h (see mount option relatime)
if let Some(stamp) = oldest_writer {
if stamp < min_atime {
min_atime = stamp;
}
}
min_atime -= 300; // add 5 mins gap for safety
for entry in self.get_chunk_iterator(true)? { for entry in self.get_chunk_iterator(true)? {
let (dirfd, entry) = match entry { let (dirfd, entry) = match entry {
Ok(entry) => (entry.parent_fd(), entry), Ok(entry) => (entry.parent_fd(), entry),
@ -273,7 +291,7 @@ impl ChunkStore {
if let Ok(stat) = fstatat(dirfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) { if let Ok(stat) = fstatat(dirfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
let age = now - stat.st_atime; let age = now - stat.st_atime;
//println!("FOUND {} {:?}", age/(3600*24), filename); //println!("FOUND {} {:?}", age/(3600*24), filename);
if age/(3600*24) >= 2 { if stat.st_atime < min_atime {
println!("UNLINK {} {:?}", age/(3600*24), filename); println!("UNLINK {} {:?}", age/(3600*24), filename);
let res = unsafe { libc::unlinkat(dirfd, filename.as_ptr(), 0) }; let res = unsafe { libc::unlinkat(dirfd, filename.as_ptr(), 0) };
if res != 0 { if res != 0 {

View File

@ -229,6 +229,8 @@ impl DataStore {
let _exclusive_lock = self.chunk_store.try_exclusive_lock()?; let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
let oldest_writer = self.chunk_store.oldest_writer();
let mut gc_status = GarbageCollectionStatus::default(); let mut gc_status = GarbageCollectionStatus::default();
gc_status.used_bytes = 0; gc_status.used_bytes = 0;
@ -237,7 +239,7 @@ impl DataStore {
self.mark_used_chunks(&mut gc_status)?; self.mark_used_chunks(&mut gc_status)?;
println!("Start GC phase2 (sweep unused chunks)"); println!("Start GC phase2 (sweep unused chunks)");
self.chunk_store.sweep_unused_chunks(&mut gc_status)?; self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status)?;
println!("Used bytes: {}", gc_status.used_bytes); println!("Used bytes: {}", gc_status.used_bytes);
println!("Used chunks: {}", gc_status.used_chunks); println!("Used chunks: {}", gc_status.used_chunks);

View File

@ -2,11 +2,16 @@
//! //!
//! This implemenation uses fcntl record locks with non-blocking //! This implemenation uses fcntl record locks with non-blocking
//! F_SETLK command (never blocks). //! F_SETLK command (never blocks).
//!
//! We maintain a map of shared locks with time stamps, so you can get
//! the timestamp for the oldest open lock with
//! `oldest_shared_lock()`.
use failure::*; use failure::*;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::collections::HashMap;
// fixme: use F_OFD_ locks when implemented with nix::fcntl // fixme: use F_OFD_ locks when implemented with nix::fcntl
@ -17,12 +22,15 @@ pub struct ProcessLocker {
file: std::fs::File, file: std::fs::File,
exclusive: bool, exclusive: bool,
writers: usize, writers: usize,
next_guard_id: u64,
shared_guard_list: HashMap<u64, i64>, // guard_id => timestamp
} }
/// Lock guard for shared locks /// Lock guard for shared locks
/// ///
/// Release the lock when it goes out of scope. /// Release the lock when it goes out of scope.
pub struct ProcessLockSharedGuard { pub struct ProcessLockSharedGuard {
guard_id: u64,
locker: Arc<Mutex<ProcessLocker>>, locker: Arc<Mutex<ProcessLocker>>,
} }
@ -32,6 +40,8 @@ impl Drop for ProcessLockSharedGuard {
if data.writers == 0 { panic!("unexpected ProcessLocker state"); } if data.writers == 0 { panic!("unexpected ProcessLocker state"); }
data.shared_guard_list.remove(&self.guard_id);
if data.writers == 1 && !data.exclusive { if data.writers == 1 && !data.exclusive {
let op = libc::flock { let op = libc::flock {
@ -97,6 +107,8 @@ impl ProcessLocker {
file: file, file: file,
exclusive: false, exclusive: false,
writers: 0, writers: 0,
next_guard_id: 0,
shared_guard_list: HashMap::new(),
}))) })))
} }
@ -130,7 +142,30 @@ impl ProcessLocker {
data.writers += 1; data.writers += 1;
Ok(ProcessLockSharedGuard { locker: locker.clone() }) let guard = ProcessLockSharedGuard { locker: locker.clone(), guard_id: data.next_guard_id };
data.next_guard_id += 1;
let now = unsafe { libc::time(std::ptr::null_mut()) };
data.shared_guard_list.insert(guard.guard_id, now);
Ok(guard)
}
/// Get oldest shared lock timestamp
pub fn oldest_shared_lock(locker: Arc<Mutex<Self>>) -> Option<i64> {
let mut result = None;
let data = locker.lock().unwrap();
for (_k, v) in &data.shared_guard_list {
result = match result {
None => Some(*v),
Some(x) => if x < *v { Some(x) } else { Some(*v) },
};
}
result
} }
/// Try to aquire a exclusive lock /// Try to aquire a exclusive lock