diff --git a/src/client/pull.rs b/src/client/pull.rs index f671c003..f6f77c3d 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -3,8 +3,8 @@ use anyhow::{bail, format_err, Error}; use serde_json::json; use std::convert::TryFrom; -use std::sync::Arc; -use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::collections::{HashSet, HashMap}; use std::io::{Seek, SeekFrom}; use proxmox::api::error::{StatusCode, HttpError}; @@ -26,11 +26,25 @@ async fn pull_index_chunks( chunk_reader: RemoteChunkReader, target: Arc, index: I, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { use futures::stream::{self, StreamExt, TryStreamExt}; - let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap())); + let stream = stream::iter( + (0..index.index_count()) + .map(|pos| index.chunk_info(pos).unwrap()) + .filter(|info| { + let mut guard = downloaded_chunks.lock().unwrap(); + let done = guard.contains(&info.digest); + if !done { + // Note: We mark a chunk as downloaded before its actually downloaded + // to avoid duplicate downloads. + guard.insert(info.digest); + } + !done + }) + ); stream .map(|info| { @@ -103,6 +117,7 @@ async fn pull_single_archive( tgt_store: Arc, snapshot: &BackupDir, archive_info: &FileInfo, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { let archive_name = &archive_info.filename; @@ -129,7 +144,7 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; + pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile) @@ -137,7 +152,7 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; + pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; } ArchiveType::Blob => { let (csum, size) = compute_file_csum(&mut tmpfile)?; @@ -183,6 +198,7 @@ async fn pull_snapshot( reader: Arc, tgt_store: Arc, snapshot: &BackupDir, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { let mut manifest_name = tgt_store.base_path(); @@ -292,6 +308,7 @@ async fn pull_snapshot( tgt_store.clone(), snapshot, &item, + downloaded_chunks.clone(), ).await?; } @@ -314,6 +331,7 @@ pub async fn pull_snapshot_from( reader: Arc, tgt_store: Arc, snapshot: &BackupDir, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; @@ -321,7 +339,7 @@ pub async fn pull_snapshot_from( if is_new { worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); - if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { + if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { worker.log(format!("cleanup error - {}", cleanup_err)); } @@ -330,7 +348,7 @@ pub async fn pull_snapshot_from( worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); } else { worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); - pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?; + pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?; worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); } @@ -365,6 +383,9 @@ pub async fn pull_group( let mut remote_snapshots = std::collections::HashSet::new(); + // start with 16384 chunks (up to 65GB) + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64))); + for item in list { let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; @@ -398,7 +419,7 @@ pub async fn pull_group( true, ).await?; - pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; + pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?; } if delete {