src/client/remote_chunk_reader.rs: implement simple caching
This commit is contained in:
parent
afb4cd28be
commit
f4bf7dfcc7
@ -1,6 +1,7 @@
|
|||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
/// Trait to get digest list from index files
|
/// Trait to get digest list from index files
|
||||||
///
|
///
|
||||||
@ -9,6 +10,38 @@ pub trait IndexFile: Send {
|
|||||||
fn index_count(&self) -> usize;
|
fn index_count(&self) -> usize;
|
||||||
fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
|
fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
|
||||||
fn index_bytes(&self) -> u64;
|
fn index_bytes(&self) -> u64;
|
||||||
|
|
||||||
|
/// Returns most often used chunks
|
||||||
|
fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
|
||||||
|
for pos in 0..self.index_count() {
|
||||||
|
let digest = self.index_digest(pos).unwrap();
|
||||||
|
|
||||||
|
let count = map.entry(*digest).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut most_used = Vec::new();
|
||||||
|
|
||||||
|
for (digest, count) in map {
|
||||||
|
if count <= 1 { continue; }
|
||||||
|
match most_used.binary_search_by_key(&count, |&(_digest, count)| count) {
|
||||||
|
Ok(p) => most_used.insert(p, (digest, count)),
|
||||||
|
Err(p) => most_used.insert(p, (digest, count)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if most_used.len() > max { let _ = most_used.pop(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
|
||||||
|
for data in most_used {
|
||||||
|
map.insert(data.0, data.1);
|
||||||
|
}
|
||||||
|
|
||||||
|
map
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encode digest list from an `IndexFile` into a binary stream
|
/// Encode digest list from an `IndexFile` into a binary stream
|
||||||
|
@ -635,7 +635,6 @@ fn restore(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?;
|
let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?;
|
||||||
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config);
|
|
||||||
|
|
||||||
use std::os::unix::fs::OpenOptionsExt;
|
use std::os::unix::fs::OpenOptionsExt;
|
||||||
|
|
||||||
@ -651,6 +650,10 @@ fn restore(
|
|||||||
let index = DynamicIndexReader::new(tmpfile)
|
let index = DynamicIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read dynamic index '{}' - {}", archive_name, err))?;
|
.map_err(|err| format_err!("unable to read dynamic index '{}' - {}", archive_name, err))?;
|
||||||
|
|
||||||
|
let most_used = index.find_most_used_chunks(8);
|
||||||
|
|
||||||
|
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
|
||||||
|
|
||||||
let mut reader = BufferedDynamicReader::new(index, chunk_reader);
|
let mut reader = BufferedDynamicReader::new(index, chunk_reader);
|
||||||
|
|
||||||
let feature_flags = pxar::CA_FORMAT_DEFAULT;
|
let feature_flags = pxar::CA_FORMAT_DEFAULT;
|
||||||
@ -669,6 +672,10 @@ fn restore(
|
|||||||
let index = FixedIndexReader::new(tmpfile)
|
let index = FixedIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
|
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
|
||||||
|
|
||||||
|
let most_used = index.find_most_used_chunks(8);
|
||||||
|
|
||||||
|
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
|
||||||
|
|
||||||
let mut reader = BufferedFixedReader::new(index, chunk_reader);
|
let mut reader = BufferedFixedReader::new(index, chunk_reader);
|
||||||
|
|
||||||
let mut writer = std::fs::OpenOptions::new()
|
let mut writer = std::fs::OpenOptions::new()
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use super::BackupReader;
|
use super::BackupReader;
|
||||||
use crate::backup::{ReadChunk, DataChunk, CryptConfig};
|
use crate::backup::{ReadChunk, DataChunk, CryptConfig};
|
||||||
@ -9,12 +10,22 @@ use crate::backup::{ReadChunk, DataChunk, CryptConfig};
|
|||||||
pub struct RemoteChunkReader {
|
pub struct RemoteChunkReader {
|
||||||
client: Arc<BackupReader>,
|
client: Arc<BackupReader>,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
|
cache_hint: HashMap<[u8; 32], usize>,
|
||||||
|
cache: HashMap<[u8; 32], Vec<u8>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RemoteChunkReader {
|
impl RemoteChunkReader {
|
||||||
|
|
||||||
pub fn new(client: Arc<BackupReader>, crypt_config: Option<Arc<CryptConfig>>) -> Self {
|
/// Create a new instance.
|
||||||
Self { client, crypt_config }
|
///
|
||||||
|
/// Chunks listed in ``cache_hint`` are cached and kept in RAM.
|
||||||
|
pub fn new(
|
||||||
|
client: Arc<BackupReader>,
|
||||||
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
|
cache_hint: HashMap<[u8; 32], usize>,
|
||||||
|
) -> Self {
|
||||||
|
|
||||||
|
Self { client, crypt_config, cache_hint, cache: HashMap::new() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -24,6 +35,12 @@ impl ReadChunk for RemoteChunkReader {
|
|||||||
|
|
||||||
let writer = Vec::with_capacity(4*1024*1024);
|
let writer = Vec::with_capacity(4*1024*1024);
|
||||||
|
|
||||||
|
if let Some(raw_data) = self.cache.get(digest) {
|
||||||
|
return Ok(raw_data.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
let use_cache = self.cache_hint.contains_key(digest);
|
||||||
|
|
||||||
let chunk_data = self.client.download_chunk(&digest, writer).wait()?;
|
let chunk_data = self.client.download_chunk(&digest, writer).wait()?;
|
||||||
|
|
||||||
let chunk = DataChunk::from_raw(chunk_data, *digest)?;
|
let chunk = DataChunk::from_raw(chunk_data, *digest)?;
|
||||||
@ -34,6 +51,10 @@ impl ReadChunk for RemoteChunkReader {
|
|||||||
None => chunk.decode(None)?,
|
None => chunk.decode(None)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if use_cache {
|
||||||
|
self.cache.insert(*digest, raw_data.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
Ok(raw_data)
|
Ok(raw_data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user