Compare commits

...

7 Commits

Author SHA1 Message Date
a329324139 bump version to 0.8.19-1 2020-09-22 13:30:52 +02:00
a83e2ffeab src/api2/reader.rs: use std::fs::read instead of tokio::fs::read
Because it is about 10%& faster this way.
2020-09-22 13:27:23 +02:00
5d7449a121 bump version to 0.8.18-1 2020-09-22 12:39:47 +02:00
ebbe4958c6 src/client/pull.rs: avoid duplicate downloads using in memory HashSet 2020-09-22 12:34:06 +02:00
73b2cc4977 src/client/pull.rs: allow up to 20 concurrent download streams 2020-09-22 11:39:31 +02:00
7ecfde8150 remote_chunk_reader.rs: use Arc for cache_hint to make clone faster 2020-09-22 11:39:31 +02:00
796480a38b docs: add version and date to HTML index
Similar to the PDF output or the Proxmox VE docs.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-09-22 09:00:12 +02:00
7 changed files with 76 additions and 30 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "proxmox-backup" name = "proxmox-backup"
version = "0.8.17" version = "0.8.19"
authors = ["Dietmar Maurer <dietmar@proxmox.com>"] authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
edition = "2018" edition = "2018"
license = "AGPL-3" license = "AGPL-3"

14
debian/changelog vendored
View File

@ -1,3 +1,17 @@
rust-proxmox-backup (0.8.19-1) unstable; urgency=medium
* src/api2/reader.rs: use std::fs::read instead of tokio::fs::read
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 13:30:27 +0200
rust-proxmox-backup (0.8.18-1) unstable; urgency=medium
* src/client/pull.rs: allow up to 20 concurrent download streams
* docs: add version and date to HTML index
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 12:39:26 +0200
rust-proxmox-backup (0.8.17-1) unstable; urgency=medium rust-proxmox-backup (0.8.17-1) unstable; urgency=medium
* src/client/pull.rs: open temporary manifest with truncate(true) * src/client/pull.rs: open temporary manifest with truncate(true)

View File

@ -97,12 +97,10 @@ language = None
# There are two options for replacing |today|: either, you set today to some # There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used: # non-false value, then it is used:
#
# today = '' # today = ''
# #
# Else, today_fmt is used as the format for a strftime call. # Else, today_fmt is used as the format for a strftime call.
# today_fmt = '%A, %d %B %Y'
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and # List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files. # directories to ignore when looking for source files.

View File

@ -2,8 +2,8 @@
Welcome to the Proxmox Backup documentation! Welcome to the Proxmox Backup documentation!
============================================ ============================================
| Copyright (C) 2019-2020 Proxmox Server Solutions GmbH
Copyright (C) 2019-2020 Proxmox Server Solutions GmbH | Version |version| -- |today|
Permission is granted to copy, distribute and/or modify this document under the Permission is granted to copy, distribute and/or modify this document under the
terms of the GNU Free Documentation License, Version 1.3 or any later version terms of the GNU Free Documentation License, Version 1.3 or any later version

View File

@ -229,8 +229,7 @@ fn download_chunk(
env.debug(format!("download chunk {:?}", path)); env.debug(format!("download chunk {:?}", path));
let data = tokio::fs::read(path) let data = tools::runtime::block_in_place(|| std::fs::read(path))
.await
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?; .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
let body = Body::from(data); let body = Body::from(data);

View File

@ -3,8 +3,8 @@
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::json; use serde_json::json;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::collections::HashMap; use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
use proxmox::api::error::{StatusCode, HttpError}; use proxmox::api::error::{StatusCode, HttpError};
@ -23,26 +23,54 @@ use crate::{
async fn pull_index_chunks<I: IndexFile>( async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask, _worker: &WorkerTask,
chunk_reader: &mut RemoteChunkReader, chunk_reader: RemoteChunkReader,
target: Arc<DataStore>, target: Arc<DataStore>,
index: I, index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
for pos in 0..index.index_count() { let stream = stream::iter(
let info = index.chunk_info(pos).unwrap(); (0..index.index_count())
let chunk_exists = target.cond_touch_chunk(&info.digest, false)?; .map(|pos| index.chunk_info(pos).unwrap())
if chunk_exists { .filter(|info| {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); let mut guard = downloaded_chunks.lock().unwrap();
continue; let done = guard.contains(&info.digest);
} if !done {
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); // Note: We mark a chunk as downloaded before its actually downloaded
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; // to avoid duplicate downloads.
guard.insert(info.digest);
}
!done
})
);
chunk.verify_unencrypted(info.size() as usize, &info.digest)?; stream
.map(|info| {
target.insert_chunk(&chunk, &info.digest)?; let target = Arc::clone(&target);
} let chunk_reader = chunk_reader.clone();
Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
return Ok::<_, Error>(());
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
crate::tools::runtime::block_in_place(|| {
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
target.insert_chunk(&chunk, &info.digest)?;
Ok(())
})
})
})
.try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(()))
.await?;
Ok(()) Ok(())
} }
@ -89,6 +117,7 @@ async fn pull_single_archive(
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
archive_info: &FileInfo, archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let archive_name = &archive_info.filename; let archive_name = &archive_info.filename;
@ -115,7 +144,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
} }
ArchiveType::FixedIndex => { ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile) let index = FixedIndexReader::new(tmpfile)
@ -123,7 +152,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
} }
ArchiveType::Blob => { ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?; let (csum, size) = compute_file_csum(&mut tmpfile)?;
@ -169,6 +198,7 @@ async fn pull_snapshot(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut manifest_name = tgt_store.base_path(); let mut manifest_name = tgt_store.base_path();
@ -278,6 +308,7 @@ async fn pull_snapshot(
tgt_store.clone(), tgt_store.clone(),
snapshot, snapshot,
&item, &item,
downloaded_chunks.clone(),
).await?; ).await?;
} }
@ -300,6 +331,7 @@ pub async fn pull_snapshot_from(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
@ -307,7 +339,7 @@ pub async fn pull_snapshot_from(
if is_new { if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
worker.log(format!("cleanup error - {}", cleanup_err)); worker.log(format!("cleanup error - {}", cleanup_err));
} }
@ -316,7 +348,7 @@ pub async fn pull_snapshot_from(
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
} else { } else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?; pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
} }
@ -351,6 +383,9 @@ pub async fn pull_group(
let mut remote_snapshots = std::collections::HashSet::new(); let mut remote_snapshots = std::collections::HashSet::new();
// start with 16384 chunks (up to 65GB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
for item in list { for item in list {
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
@ -384,7 +419,7 @@ pub async fn pull_group(
true, true,
).await?; ).await?;
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?;
} }
if delete { if delete {

View File

@ -15,7 +15,7 @@ pub struct RemoteChunkReader {
client: Arc<BackupReader>, client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode, crypt_mode: CryptMode,
cache_hint: HashMap<[u8; 32], usize>, cache_hint: Arc<HashMap<[u8; 32], usize>>,
cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>, cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
} }
@ -33,7 +33,7 @@ impl RemoteChunkReader {
client, client,
crypt_config, crypt_config,
crypt_mode, crypt_mode,
cache_hint, cache_hint: Arc::new(cache_hint),
cache: Arc::new(Mutex::new(HashMap::new())), cache: Arc::new(Mutex::new(HashMap::new())),
} }
} }