src/client/pull.rs: avoid duplicate downloads using in memory HashSet

This commit is contained in:
Dietmar Maurer 2020-09-22 12:34:06 +02:00
parent 73b2cc4977
commit ebbe4958c6
1 changed files with 29 additions and 8 deletions

View File

@ -3,8 +3,8 @@
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::json; use serde_json::json;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::collections::HashMap; use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
use proxmox::api::error::{StatusCode, HttpError}; use proxmox::api::error::{StatusCode, HttpError};
@ -26,11 +26,25 @@ async fn pull_index_chunks<I: IndexFile>(
chunk_reader: RemoteChunkReader, chunk_reader: RemoteChunkReader,
target: Arc<DataStore>, target: Arc<DataStore>,
index: I, index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
use futures::stream::{self, StreamExt, TryStreamExt}; use futures::stream::{self, StreamExt, TryStreamExt};
let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap())); let stream = stream::iter(
(0..index.index_count())
.map(|pos| index.chunk_info(pos).unwrap())
.filter(|info| {
let mut guard = downloaded_chunks.lock().unwrap();
let done = guard.contains(&info.digest);
if !done {
// Note: We mark a chunk as downloaded before its actually downloaded
// to avoid duplicate downloads.
guard.insert(info.digest);
}
!done
})
);
stream stream
.map(|info| { .map(|info| {
@ -103,6 +117,7 @@ async fn pull_single_archive(
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
archive_info: &FileInfo, archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let archive_name = &archive_info.filename; let archive_name = &archive_info.filename;
@ -129,7 +144,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
} }
ArchiveType::FixedIndex => { ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile) let index = FixedIndexReader::new(tmpfile)
@ -137,7 +152,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
} }
ArchiveType::Blob => { ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?; let (csum, size) = compute_file_csum(&mut tmpfile)?;
@ -183,6 +198,7 @@ async fn pull_snapshot(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut manifest_name = tgt_store.base_path(); let mut manifest_name = tgt_store.base_path();
@ -292,6 +308,7 @@ async fn pull_snapshot(
tgt_store.clone(), tgt_store.clone(),
snapshot, snapshot,
&item, &item,
downloaded_chunks.clone(),
).await?; ).await?;
} }
@ -314,6 +331,7 @@ pub async fn pull_snapshot_from(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
@ -321,7 +339,7 @@ pub async fn pull_snapshot_from(
if is_new { if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
worker.log(format!("cleanup error - {}", cleanup_err)); worker.log(format!("cleanup error - {}", cleanup_err));
} }
@ -330,7 +348,7 @@ pub async fn pull_snapshot_from(
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
} else { } else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?; pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
} }
@ -365,6 +383,9 @@ pub async fn pull_group(
let mut remote_snapshots = std::collections::HashSet::new(); let mut remote_snapshots = std::collections::HashSet::new();
// start with 16384 chunks (up to 65GB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
for item in list { for item in list {
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
@ -398,7 +419,7 @@ pub async fn pull_group(
true, true,
).await?; ).await?;
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?;
} }
if delete { if delete {