src/client/pull.rs: allow up to 20 concurrent download streams

This commit is contained in:
Dietmar Maurer 2020-09-22 09:52:14 +02:00
parent 7ecfde8150
commit 73b2cc4977

View File

@ -23,26 +23,40 @@ use crate::{
async fn pull_index_chunks<I: IndexFile>( async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask, _worker: &WorkerTask,
chunk_reader: &mut RemoteChunkReader, chunk_reader: RemoteChunkReader,
target: Arc<DataStore>, target: Arc<DataStore>,
index: I, index: I,
) -> Result<(), Error> { ) -> Result<(), Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
for pos in 0..index.index_count() { let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap()));
let info = index.chunk_info(pos).unwrap();
let chunk_exists = target.cond_touch_chunk(&info.digest, false)?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
continue;
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
chunk.verify_unencrypted(info.size() as usize, &info.digest)?; stream
.map(|info| {
target.insert_chunk(&chunk, &info.digest)?; let target = Arc::clone(&target);
} let chunk_reader = chunk_reader.clone();
Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
return Ok::<_, Error>(());
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
crate::tools::runtime::block_in_place(|| {
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
target.insert_chunk(&chunk, &info.digest)?;
Ok(())
})
})
})
.try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(()))
.await?;
Ok(()) Ok(())
} }
@ -115,7 +129,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
} }
ArchiveType::FixedIndex => { ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile) let index = FixedIndexReader::new(tmpfile)
@ -123,7 +137,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
} }
ArchiveType::Blob => { ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?; let (csum, size) = compute_file_csum(&mut tmpfile)?;