Compare commits

..

7 Commits

Author SHA1 Message Date
a329324139 bump version to 0.8.19-1 2020-09-22 13:30:52 +02:00
a83e2ffeab src/api2/reader.rs: use std::fs::read instead of tokio::fs::read
Because it is about 10%& faster this way.
2020-09-22 13:27:23 +02:00
5d7449a121 bump version to 0.8.18-1 2020-09-22 12:39:47 +02:00
ebbe4958c6 src/client/pull.rs: avoid duplicate downloads using in memory HashSet 2020-09-22 12:34:06 +02:00
73b2cc4977 src/client/pull.rs: allow up to 20 concurrent download streams 2020-09-22 11:39:31 +02:00
7ecfde8150 remote_chunk_reader.rs: use Arc for cache_hint to make clone faster 2020-09-22 11:39:31 +02:00
796480a38b docs: add version and date to HTML index
Similar to the PDF output or the Proxmox VE docs.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-09-22 09:00:12 +02:00
7 changed files with 76 additions and 30 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "proxmox-backup"
version = "0.8.17"
version = "0.8.19"
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
edition = "2018"
license = "AGPL-3"

14
debian/changelog vendored
View File

@ -1,3 +1,17 @@
rust-proxmox-backup (0.8.19-1) unstable; urgency=medium
* src/api2/reader.rs: use std::fs::read instead of tokio::fs::read
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 13:30:27 +0200
rust-proxmox-backup (0.8.18-1) unstable; urgency=medium
* src/client/pull.rs: allow up to 20 concurrent download streams
* docs: add version and date to HTML index
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 12:39:26 +0200
rust-proxmox-backup (0.8.17-1) unstable; urgency=medium
* src/client/pull.rs: open temporary manifest with truncate(true)

View File

@ -97,12 +97,10 @@ language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#
# today = ''
#
# Else, today_fmt is used as the format for a strftime call.
#
# today_fmt = '%B %d, %Y'
today_fmt = '%A, %d %B %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.

View File

@ -2,8 +2,8 @@
Welcome to the Proxmox Backup documentation!
============================================
Copyright (C) 2019-2020 Proxmox Server Solutions GmbH
| Copyright (C) 2019-2020 Proxmox Server Solutions GmbH
| Version |version| -- |today|
Permission is granted to copy, distribute and/or modify this document under the
terms of the GNU Free Documentation License, Version 1.3 or any later version

View File

@ -229,8 +229,7 @@ fn download_chunk(
env.debug(format!("download chunk {:?}", path));
let data = tokio::fs::read(path)
.await
let data = tools::runtime::block_in_place(|| std::fs::read(path))
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
let body = Body::from(data);

View File

@ -3,8 +3,8 @@
use anyhow::{bail, format_err, Error};
use serde_json::json;
use std::convert::TryFrom;
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom};
use proxmox::api::error::{StatusCode, HttpError};
@ -23,26 +23,54 @@ use crate::{
async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask,
chunk_reader: &mut RemoteChunkReader,
chunk_reader: RemoteChunkReader,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
for pos in 0..index.index_count() {
let info = index.chunk_info(pos).unwrap();
let chunk_exists = target.cond_touch_chunk(&info.digest, false)?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
continue;
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
let stream = stream::iter(
(0..index.index_count())
.map(|pos| index.chunk_info(pos).unwrap())
.filter(|info| {
let mut guard = downloaded_chunks.lock().unwrap();
let done = guard.contains(&info.digest);
if !done {
// Note: We mark a chunk as downloaded before its actually downloaded
// to avoid duplicate downloads.
guard.insert(info.digest);
}
!done
})
);
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
stream
.map(|info| {
target.insert_chunk(&chunk, &info.digest)?;
}
let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone();
Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
return Ok::<_, Error>(());
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
crate::tools::runtime::block_in_place(|| {
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
target.insert_chunk(&chunk, &info.digest)?;
Ok(())
})
})
})
.try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(()))
.await?;
Ok(())
}
@ -89,6 +117,7 @@ async fn pull_single_archive(
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
@ -115,7 +144,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile)
@ -123,7 +152,7 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
}
ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?;
@ -169,6 +198,7 @@ async fn pull_snapshot(
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
let mut manifest_name = tgt_store.base_path();
@ -278,6 +308,7 @@ async fn pull_snapshot(
tgt_store.clone(),
snapshot,
&item,
downloaded_chunks.clone(),
).await?;
}
@ -300,6 +331,7 @@ pub async fn pull_snapshot_from(
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
@ -307,7 +339,7 @@ pub async fn pull_snapshot_from(
if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
worker.log(format!("cleanup error - {}", cleanup_err));
}
@ -316,7 +348,7 @@ pub async fn pull_snapshot_from(
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
} else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?;
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
}
@ -351,6 +383,9 @@ pub async fn pull_group(
let mut remote_snapshots = std::collections::HashSet::new();
// start with 16384 chunks (up to 65GB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
for item in list {
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
@ -384,7 +419,7 @@ pub async fn pull_group(
true,
).await?;
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?;
}
if delete {

View File

@ -15,7 +15,7 @@ pub struct RemoteChunkReader {
client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
cache_hint: HashMap<[u8; 32], usize>,
cache_hint: Arc<HashMap<[u8; 32], usize>>,
cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
}
@ -33,7 +33,7 @@ impl RemoteChunkReader {
client,
crypt_config,
crypt_mode,
cache_hint,
cache_hint: Arc::new(cache_hint),
cache: Arc::new(Mutex::new(HashMap::new())),
}
}