make ReadChunk not require mutable self.
That way we can reduce lock contentions because we lock for much shorter times.
This commit is contained in:
parent
26f499b17b
commit
e9764238df
|
@ -45,7 +45,7 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, I> AsyncRead for AsyncIndexReader<S, I> where
|
impl<S, I> AsyncRead for AsyncIndexReader<S, I> where
|
||||||
S: AsyncReadChunk + Unpin + 'static,
|
S: AsyncReadChunk + Unpin + Sync + 'static,
|
||||||
I: IndexFile + Unpin
|
I: IndexFile + Unpin
|
||||||
{
|
{
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
|
@ -74,7 +74,7 @@ I: IndexFile + Unpin
|
||||||
|
|
||||||
this.current_chunk_digest = digest;
|
this.current_chunk_digest = digest;
|
||||||
|
|
||||||
let mut store = match this.store.take() {
|
let store = match this.store.take() {
|
||||||
Some(store) => store,
|
Some(store) => store,
|
||||||
None => {
|
None => {
|
||||||
return Poll::Ready(Err(io_format_err!("could not find store")));
|
return Poll::Ready(Err(io_format_err!("could not find store")));
|
||||||
|
|
|
@ -11,10 +11,10 @@ use super::datastore::DataStore;
|
||||||
/// The ReadChunk trait allows reading backup data chunks (local or remote)
|
/// The ReadChunk trait allows reading backup data chunks (local or remote)
|
||||||
pub trait ReadChunk {
|
pub trait ReadChunk {
|
||||||
/// Returns the encoded chunk data
|
/// Returns the encoded chunk data
|
||||||
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error>;
|
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error>;
|
||||||
|
|
||||||
/// Returns the decoded chunk data
|
/// Returns the decoded chunk data
|
||||||
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
|
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -33,7 +33,7 @@ impl LocalChunkReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReadChunk for LocalChunkReader {
|
impl ReadChunk for LocalChunkReader {
|
||||||
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
||||||
let (path, _) = self.store.chunk_path(digest);
|
let (path, _) = self.store.chunk_path(digest);
|
||||||
let raw_data = proxmox::tools::fs::file_get_contents(&path)?;
|
let raw_data = proxmox::tools::fs::file_get_contents(&path)?;
|
||||||
let chunk = DataBlob::from_raw(raw_data)?;
|
let chunk = DataBlob::from_raw(raw_data)?;
|
||||||
|
@ -42,7 +42,7 @@ impl ReadChunk for LocalChunkReader {
|
||||||
Ok(chunk)
|
Ok(chunk)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
|
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
|
||||||
let chunk = ReadChunk::read_raw_chunk(self, digest)?;
|
let chunk = ReadChunk::read_raw_chunk(self, digest)?;
|
||||||
|
|
||||||
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
|
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
|
||||||
|
@ -56,20 +56,20 @@ impl ReadChunk for LocalChunkReader {
|
||||||
pub trait AsyncReadChunk: Send {
|
pub trait AsyncReadChunk: Send {
|
||||||
/// Returns the encoded chunk data
|
/// Returns the encoded chunk data
|
||||||
fn read_raw_chunk<'a>(
|
fn read_raw_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>>;
|
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>>;
|
||||||
|
|
||||||
/// Returns the decoded chunk data
|
/// Returns the decoded chunk data
|
||||||
fn read_chunk<'a>(
|
fn read_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>>;
|
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncReadChunk for LocalChunkReader {
|
impl AsyncReadChunk for LocalChunkReader {
|
||||||
fn read_raw_chunk<'a>(
|
fn read_raw_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
|
||||||
Box::pin(async move{
|
Box::pin(async move{
|
||||||
|
@ -84,7 +84,7 @@ impl AsyncReadChunk for LocalChunkReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_chunk<'a>(
|
fn read_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -1119,7 +1119,7 @@ async fn dump_image<W: Write>(
|
||||||
|
|
||||||
let most_used = index.find_most_used_chunks(8);
|
let most_used = index.find_most_used_chunks(8);
|
||||||
|
|
||||||
let mut chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
|
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
|
||||||
|
|
||||||
// Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
|
// Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
|
||||||
// and thus slows down reading. Instead, directly use RemoteChunkReader
|
// and thus slows down reading. Instead, directly use RemoteChunkReader
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
|
|
||||||
|
@ -14,7 +14,7 @@ pub struct RemoteChunkReader {
|
||||||
client: Arc<BackupReader>,
|
client: Arc<BackupReader>,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
cache_hint: HashMap<[u8; 32], usize>,
|
cache_hint: HashMap<[u8; 32], usize>,
|
||||||
cache: HashMap<[u8; 32], Vec<u8>>,
|
cache: Mutex<HashMap<[u8; 32], Vec<u8>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RemoteChunkReader {
|
impl RemoteChunkReader {
|
||||||
|
@ -30,11 +30,11 @@ impl RemoteChunkReader {
|
||||||
client,
|
client,
|
||||||
crypt_config,
|
crypt_config,
|
||||||
cache_hint,
|
cache_hint,
|
||||||
cache: HashMap::new(),
|
cache: Mutex::new(HashMap::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
||||||
let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
|
let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
|
||||||
|
|
||||||
self.client
|
self.client
|
||||||
|
@ -49,12 +49,12 @@ impl RemoteChunkReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReadChunk for RemoteChunkReader {
|
impl ReadChunk for RemoteChunkReader {
|
||||||
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
|
||||||
block_on(Self::read_raw_chunk(self, digest))
|
block_on(Self::read_raw_chunk(self, digest))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
|
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
|
||||||
if let Some(raw_data) = self.cache.get(digest) {
|
if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
|
||||||
return Ok(raw_data.to_vec());
|
return Ok(raw_data.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ impl ReadChunk for RemoteChunkReader {
|
||||||
|
|
||||||
let use_cache = self.cache_hint.contains_key(digest);
|
let use_cache = self.cache_hint.contains_key(digest);
|
||||||
if use_cache {
|
if use_cache {
|
||||||
self.cache.insert(*digest, raw_data.to_vec());
|
(*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(raw_data)
|
Ok(raw_data)
|
||||||
|
@ -75,18 +75,18 @@ impl ReadChunk for RemoteChunkReader {
|
||||||
|
|
||||||
impl AsyncReadChunk for RemoteChunkReader {
|
impl AsyncReadChunk for RemoteChunkReader {
|
||||||
fn read_raw_chunk<'a>(
|
fn read_raw_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
|
||||||
Box::pin(Self::read_raw_chunk(self, digest))
|
Box::pin(Self::read_raw_chunk(self, digest))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_chunk<'a>(
|
fn read_chunk<'a>(
|
||||||
&'a mut self,
|
&'a self,
|
||||||
digest: &'a [u8; 32],
|
digest: &'a [u8; 32],
|
||||||
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
if let Some(raw_data) = self.cache.get(digest) {
|
if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
|
||||||
return Ok(raw_data.to_vec());
|
return Ok(raw_data.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ impl AsyncReadChunk for RemoteChunkReader {
|
||||||
|
|
||||||
let use_cache = self.cache_hint.contains_key(digest);
|
let use_cache = self.cache_hint.contains_key(digest);
|
||||||
if use_cache {
|
if use_cache {
|
||||||
self.cache.insert(*digest, raw_data.to_vec());
|
(*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(raw_data)
|
Ok(raw_data)
|
||||||
|
|
Loading…
Reference in New Issue