datastore: use new ProcessLocker

To make sure only one process runs garbage collection while having active writers.
This commit is contained in:
Dietmar Maurer 2019-03-22 09:42:15 +01:00
parent abfc001f25
commit 43b1303398
4 changed files with 25 additions and 10 deletions

View File

@ -1,13 +1,11 @@
use failure::*; use failure::*;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::time::Duration; use std::sync::{Arc, Mutex};
use std::os::unix::io::AsRawFd;
use openssl::sha; use openssl::sha;
use std::sync::Mutex;
use std::fs::File;
use std::os::unix::io::AsRawFd;
use crate::tools; use crate::tools;
@ -35,7 +33,7 @@ pub struct ChunkStore {
pub (crate) base: PathBuf, pub (crate) base: PathBuf,
chunk_dir: PathBuf, chunk_dir: PathBuf,
mutex: Mutex<bool>, mutex: Mutex<bool>,
_lockfile: File, locker: Arc<Mutex<tools::ProcessLocker>>,
} }
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ? // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
@ -131,15 +129,13 @@ impl ChunkStore {
let mut lockfile_path = base.clone(); let mut lockfile_path = base.clone();
lockfile_path.push(".lock"); lockfile_path.push(".lock");
// make sure only one process/thread/task can use it let locker = tools::ProcessLocker::new(&lockfile_path)?;
let lockfile = tools::open_file_locked(
lockfile_path, Duration::from_secs(10))?;
Ok(ChunkStore { Ok(ChunkStore {
name: name.to_owned(), name: name.to_owned(),
base, base,
chunk_dir, chunk_dir,
_lockfile: lockfile, locker,
mutex: Mutex::new(false) mutex: Mutex::new(false)
}) })
} }
@ -369,6 +365,14 @@ impl ChunkStore {
pub fn base_path(&self) -> PathBuf { pub fn base_path(&self) -> PathBuf {
self.base.clone() self.base.clone()
} }
pub fn try_shared_lock(&self) -> Result<tools::ProcessLockSharedGuard, Error> {
tools::ProcessLocker::try_shared_lock(self.locker.clone())
}
pub fn try_exclusive_lock(&self) -> Result<tools::ProcessLockExclusiveGuard, Error> {
tools::ProcessLocker::try_exclusive_lock(self.locker.clone())
}
} }

View File

@ -227,6 +227,8 @@ impl DataStore {
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
let mut gc_status = GarbageCollectionStatus::default(); let mut gc_status = GarbageCollectionStatus::default();
gc_status.used_bytes = 0; gc_status.used_bytes = 0;

View File

@ -340,6 +340,8 @@ impl std::io::Seek for BufferedDynamicReader {
pub struct DynamicIndexWriter { pub struct DynamicIndexWriter {
store: Arc<ChunkStore>, store: Arc<ChunkStore>,
_lock: tools::ProcessLockSharedGuard,
chunker: Chunker, chunker: Chunker,
writer: BufWriter<File>, writer: BufWriter<File>,
closed: bool, closed: bool,
@ -366,6 +368,8 @@ impl DynamicIndexWriter {
pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> { pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> {
let shared_lock = store.try_shared_lock()?;
let full_path = store.relative_path(path); let full_path = store.relative_path(path);
let mut tmp_path = full_path.clone(); let mut tmp_path = full_path.clone();
tmp_path.set_extension("tmp_didx"); tmp_path.set_extension("tmp_didx");
@ -400,6 +404,7 @@ impl DynamicIndexWriter {
Ok(Self { Ok(Self {
store, store,
_lock: shared_lock,
chunker: Chunker::new(chunk_size), chunker: Chunker::new(chunk_size),
writer: writer, writer: writer,
closed: false, closed: false,

View File

@ -175,6 +175,7 @@ impl IndexFile for FixedIndexReader {
pub struct FixedIndexWriter { pub struct FixedIndexWriter {
store: Arc<ChunkStore>, store: Arc<ChunkStore>,
_lock: tools::ProcessLockSharedGuard,
filename: PathBuf, filename: PathBuf,
tmp_filename: PathBuf, tmp_filename: PathBuf,
chunk_size: usize, chunk_size: usize,
@ -204,6 +205,8 @@ impl FixedIndexWriter {
pub fn create(store: Arc<ChunkStore>, path: &Path, size: usize, chunk_size: usize) -> Result<Self, Error> { pub fn create(store: Arc<ChunkStore>, path: &Path, size: usize, chunk_size: usize) -> Result<Self, Error> {
let shared_lock = store.try_shared_lock()?;
let full_path = store.relative_path(path); let full_path = store.relative_path(path);
let mut tmp_path = full_path.clone(); let mut tmp_path = full_path.clone();
tmp_path.set_extension("tmp_fidx"); tmp_path.set_extension("tmp_fidx");
@ -250,6 +253,7 @@ impl FixedIndexWriter {
Ok(Self { Ok(Self {
store, store,
_lock: shared_lock,
filename: full_path, filename: full_path,
tmp_filename: tmp_path, tmp_filename: tmp_path,
chunk_size, chunk_size,