diff --git a/src/backup/read_chunk.rs b/src/backup/read_chunk.rs index 0a0477ed..49e4737b 100644 --- a/src/backup/read_chunk.rs +++ b/src/backup/read_chunk.rs @@ -1,3 +1,5 @@ +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use anyhow::Error; @@ -52,3 +54,17 @@ impl ReadChunk for LocalChunkReader { Ok(raw_data) } } + +pub trait AsyncReadChunk { + /// Returns the encoded chunk data + fn read_raw_chunk<'a>( + &'a mut self, + digest: &'a [u8; 32], + ) -> Pin> + Send + 'a>>; + + /// Returns the decoded chunk data + fn read_chunk<'a>( + &'a mut self, + digest: &'a [u8; 32], + ) -> Pin, Error>> + Send + 'a>>; +} diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index a90a6a72..2634a8f5 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -31,8 +31,33 @@ use proxmox::api::api; use proxmox_backup::tools; use proxmox_backup::api2::types::*; use proxmox_backup::client::*; -use proxmox_backup::backup::*; use proxmox_backup::pxar::catalog::*; +use proxmox_backup::backup::{ + archive_type, + encrypt_key_with_passphrase, + load_and_decrypt_key, + store_key_config, + verify_chunk_size, + ArchiveType, + BackupDir, + BackupGroup, + BackupManifest, + BufferedDynamicReader, + CatalogReader, + CatalogWriter, + CATALOG_NAME, + ChunkStream, + CryptConfig, + DataBlob, + DynamicIndexReader, + FixedChunkStream, + FixedIndexReader, + IndexFile, + KeyConfig, + MANIFEST_BLOB_NAME, + ReadChunk, + Shell, +}; const ENV_VAR_PBS_FINGERPRINT: &str = "PBS_FINGERPRINT"; const ENV_VAR_PBS_PASSWORD: &str = "PBS_PASSWORD"; diff --git a/src/client/pull.rs b/src/client/pull.rs index 87904360..ea7099cf 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -34,7 +34,7 @@ async fn pull_index_chunks( continue; } //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); - let chunk = chunk_reader.read_raw_chunk(&digest)?; + let chunk = chunk_reader.read_raw_chunk(&digest).await?; target.insert_chunk(&chunk, &digest)?; } diff --git a/src/client/remote_chunk_reader.rs b/src/client/remote_chunk_reader.rs index 2d56543b..be57f0f5 100644 --- a/src/client/remote_chunk_reader.rs +++ b/src/client/remote_chunk_reader.rs @@ -1,10 +1,12 @@ +use std::future::Future; use std::collections::HashMap; +use std::pin::Pin; use std::sync::Arc; use anyhow::Error; use super::BackupReader; -use crate::backup::{CryptConfig, DataBlob, ReadChunk}; +use crate::backup::{AsyncReadChunk, CryptConfig, DataBlob, ReadChunk}; use crate::tools::runtime::block_on; /// Read chunks from remote host using ``BackupReader`` @@ -31,6 +33,19 @@ impl RemoteChunkReader { cache: HashMap::new(), } } + + pub async fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result { + let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); + + self.client + .download_chunk(&digest, &mut chunk_data) + .await?; + + let chunk = DataBlob::from_raw(chunk_data)?; + chunk.verify_crc()?; + + Ok(chunk) + } } impl ReadChunk for RemoteChunkReader { @@ -57,7 +72,7 @@ impl ReadChunk for RemoteChunkReader { return Ok(raw_data.to_vec()); } - let chunk = self.read_raw_chunk(digest)?; + let chunk = ReadChunk::read_raw_chunk(self, digest)?; let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?; @@ -71,3 +86,36 @@ impl ReadChunk for RemoteChunkReader { Ok(raw_data) } } + +impl AsyncReadChunk for RemoteChunkReader { + fn read_raw_chunk<'a>( + &'a mut self, + digest: &'a [u8; 32], + ) -> Pin> + Send + 'a>> { + Box::pin(Self::read_raw_chunk(self, digest)) + } + + fn read_chunk<'a>( + &'a mut self, + digest: &'a [u8; 32], + ) -> Pin, Error>> + Send + 'a>> { + Box::pin(async move { + if let Some(raw_data) = self.cache.get(digest) { + return Ok(raw_data.to_vec()); + } + + let chunk = Self::read_raw_chunk(self, digest).await?; + + let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?; + + // fixme: verify digest? + + let use_cache = self.cache_hint.contains_key(digest); + if use_cache { + self.cache.insert(*digest, raw_data.to_vec()); + } + + Ok(raw_data) + }) + } +}