Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
a329324139 | |||
a83e2ffeab | |||
5d7449a121 | |||
ebbe4958c6 | |||
73b2cc4977 | |||
7ecfde8150 | |||
796480a38b |
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "proxmox-backup"
|
name = "proxmox-backup"
|
||||||
version = "0.8.17"
|
version = "0.8.19"
|
||||||
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
|
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "AGPL-3"
|
license = "AGPL-3"
|
||||||
|
14
debian/changelog
vendored
14
debian/changelog
vendored
@ -1,3 +1,17 @@
|
|||||||
|
rust-proxmox-backup (0.8.19-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* src/api2/reader.rs: use std::fs::read instead of tokio::fs::read
|
||||||
|
|
||||||
|
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 13:30:27 +0200
|
||||||
|
|
||||||
|
rust-proxmox-backup (0.8.18-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* src/client/pull.rs: allow up to 20 concurrent download streams
|
||||||
|
|
||||||
|
* docs: add version and date to HTML index
|
||||||
|
|
||||||
|
-- Proxmox Support Team <support@proxmox.com> Tue, 22 Sep 2020 12:39:26 +0200
|
||||||
|
|
||||||
rust-proxmox-backup (0.8.17-1) unstable; urgency=medium
|
rust-proxmox-backup (0.8.17-1) unstable; urgency=medium
|
||||||
|
|
||||||
* src/client/pull.rs: open temporary manifest with truncate(true)
|
* src/client/pull.rs: open temporary manifest with truncate(true)
|
||||||
|
@ -97,12 +97,10 @@ language = None
|
|||||||
|
|
||||||
# There are two options for replacing |today|: either, you set today to some
|
# There are two options for replacing |today|: either, you set today to some
|
||||||
# non-false value, then it is used:
|
# non-false value, then it is used:
|
||||||
#
|
|
||||||
# today = ''
|
# today = ''
|
||||||
#
|
#
|
||||||
# Else, today_fmt is used as the format for a strftime call.
|
# Else, today_fmt is used as the format for a strftime call.
|
||||||
#
|
today_fmt = '%A, %d %B %Y'
|
||||||
# today_fmt = '%B %d, %Y'
|
|
||||||
|
|
||||||
# List of patterns, relative to source directory, that match files and
|
# List of patterns, relative to source directory, that match files and
|
||||||
# directories to ignore when looking for source files.
|
# directories to ignore when looking for source files.
|
||||||
|
@ -2,8 +2,8 @@
|
|||||||
|
|
||||||
Welcome to the Proxmox Backup documentation!
|
Welcome to the Proxmox Backup documentation!
|
||||||
============================================
|
============================================
|
||||||
|
| Copyright (C) 2019-2020 Proxmox Server Solutions GmbH
|
||||||
Copyright (C) 2019-2020 Proxmox Server Solutions GmbH
|
| Version |version| -- |today|
|
||||||
|
|
||||||
Permission is granted to copy, distribute and/or modify this document under the
|
Permission is granted to copy, distribute and/or modify this document under the
|
||||||
terms of the GNU Free Documentation License, Version 1.3 or any later version
|
terms of the GNU Free Documentation License, Version 1.3 or any later version
|
||||||
|
@ -229,8 +229,7 @@ fn download_chunk(
|
|||||||
|
|
||||||
env.debug(format!("download chunk {:?}", path));
|
env.debug(format!("download chunk {:?}", path));
|
||||||
|
|
||||||
let data = tokio::fs::read(path)
|
let data = tools::runtime::block_in_place(|| std::fs::read(path))
|
||||||
.await
|
|
||||||
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
|
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
|
||||||
|
|
||||||
let body = Body::from(data);
|
let body = Body::from(data);
|
||||||
|
@ -3,8 +3,8 @@
|
|||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashSet, HashMap};
|
||||||
use std::io::{Seek, SeekFrom};
|
use std::io::{Seek, SeekFrom};
|
||||||
|
|
||||||
use proxmox::api::error::{StatusCode, HttpError};
|
use proxmox::api::error::{StatusCode, HttpError};
|
||||||
@ -23,26 +23,54 @@ use crate::{
|
|||||||
|
|
||||||
async fn pull_index_chunks<I: IndexFile>(
|
async fn pull_index_chunks<I: IndexFile>(
|
||||||
_worker: &WorkerTask,
|
_worker: &WorkerTask,
|
||||||
chunk_reader: &mut RemoteChunkReader,
|
chunk_reader: RemoteChunkReader,
|
||||||
target: Arc<DataStore>,
|
target: Arc<DataStore>,
|
||||||
index: I,
|
index: I,
|
||||||
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
|
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||||
|
|
||||||
for pos in 0..index.index_count() {
|
let stream = stream::iter(
|
||||||
let info = index.chunk_info(pos).unwrap();
|
(0..index.index_count())
|
||||||
let chunk_exists = target.cond_touch_chunk(&info.digest, false)?;
|
.map(|pos| index.chunk_info(pos).unwrap())
|
||||||
if chunk_exists {
|
.filter(|info| {
|
||||||
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
|
let mut guard = downloaded_chunks.lock().unwrap();
|
||||||
continue;
|
let done = guard.contains(&info.digest);
|
||||||
}
|
if !done {
|
||||||
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
|
// Note: We mark a chunk as downloaded before its actually downloaded
|
||||||
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
|
// to avoid duplicate downloads.
|
||||||
|
guard.insert(info.digest);
|
||||||
|
}
|
||||||
|
!done
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
|
stream
|
||||||
|
.map(|info| {
|
||||||
|
|
||||||
target.insert_chunk(&chunk, &info.digest)?;
|
let target = Arc::clone(&target);
|
||||||
}
|
let chunk_reader = chunk_reader.clone();
|
||||||
|
|
||||||
|
Ok::<_, Error>(async move {
|
||||||
|
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
|
||||||
|
if chunk_exists {
|
||||||
|
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
|
||||||
|
return Ok::<_, Error>(());
|
||||||
|
}
|
||||||
|
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
|
||||||
|
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
|
||||||
|
|
||||||
|
crate::tools::runtime::block_in_place(|| {
|
||||||
|
chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
|
||||||
|
target.insert_chunk(&chunk, &info.digest)?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.try_buffer_unordered(20)
|
||||||
|
.try_for_each(|_res| futures::future::ok(()))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -89,6 +117,7 @@ async fn pull_single_archive(
|
|||||||
tgt_store: Arc<DataStore>,
|
tgt_store: Arc<DataStore>,
|
||||||
snapshot: &BackupDir,
|
snapshot: &BackupDir,
|
||||||
archive_info: &FileInfo,
|
archive_info: &FileInfo,
|
||||||
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let archive_name = &archive_info.filename;
|
let archive_name = &archive_info.filename;
|
||||||
@ -115,7 +144,7 @@ async fn pull_single_archive(
|
|||||||
let (csum, size) = index.compute_csum();
|
let (csum, size) = index.compute_csum();
|
||||||
verify_archive(archive_info, &csum, size)?;
|
verify_archive(archive_info, &csum, size)?;
|
||||||
|
|
||||||
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
|
||||||
}
|
}
|
||||||
ArchiveType::FixedIndex => {
|
ArchiveType::FixedIndex => {
|
||||||
let index = FixedIndexReader::new(tmpfile)
|
let index = FixedIndexReader::new(tmpfile)
|
||||||
@ -123,7 +152,7 @@ async fn pull_single_archive(
|
|||||||
let (csum, size) = index.compute_csum();
|
let (csum, size) = index.compute_csum();
|
||||||
verify_archive(archive_info, &csum, size)?;
|
verify_archive(archive_info, &csum, size)?;
|
||||||
|
|
||||||
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
|
||||||
}
|
}
|
||||||
ArchiveType::Blob => {
|
ArchiveType::Blob => {
|
||||||
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
||||||
@ -169,6 +198,7 @@ async fn pull_snapshot(
|
|||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
tgt_store: Arc<DataStore>,
|
||||||
snapshot: &BackupDir,
|
snapshot: &BackupDir,
|
||||||
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut manifest_name = tgt_store.base_path();
|
let mut manifest_name = tgt_store.base_path();
|
||||||
@ -278,6 +308,7 @@ async fn pull_snapshot(
|
|||||||
tgt_store.clone(),
|
tgt_store.clone(),
|
||||||
snapshot,
|
snapshot,
|
||||||
&item,
|
&item,
|
||||||
|
downloaded_chunks.clone(),
|
||||||
).await?;
|
).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -300,6 +331,7 @@ pub async fn pull_snapshot_from(
|
|||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
tgt_store: Arc<DataStore>,
|
||||||
snapshot: &BackupDir,
|
snapshot: &BackupDir,
|
||||||
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
|
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
|
||||||
@ -307,7 +339,7 @@ pub async fn pull_snapshot_from(
|
|||||||
if is_new {
|
if is_new {
|
||||||
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
|
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
|
||||||
|
|
||||||
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
|
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
|
||||||
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
|
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
|
||||||
worker.log(format!("cleanup error - {}", cleanup_err));
|
worker.log(format!("cleanup error - {}", cleanup_err));
|
||||||
}
|
}
|
||||||
@ -316,7 +348,7 @@ pub async fn pull_snapshot_from(
|
|||||||
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
|
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
|
||||||
} else {
|
} else {
|
||||||
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
|
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
|
||||||
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?;
|
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
|
||||||
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
|
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,6 +383,9 @@ pub async fn pull_group(
|
|||||||
|
|
||||||
let mut remote_snapshots = std::collections::HashSet::new();
|
let mut remote_snapshots = std::collections::HashSet::new();
|
||||||
|
|
||||||
|
// start with 16384 chunks (up to 65GB)
|
||||||
|
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
|
||||||
|
|
||||||
for item in list {
|
for item in list {
|
||||||
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
|
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
|
||||||
|
|
||||||
@ -384,7 +419,7 @@ pub async fn pull_group(
|
|||||||
true,
|
true,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
|
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if delete {
|
if delete {
|
||||||
|
@ -15,7 +15,7 @@ pub struct RemoteChunkReader {
|
|||||||
client: Arc<BackupReader>,
|
client: Arc<BackupReader>,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
crypt_mode: CryptMode,
|
crypt_mode: CryptMode,
|
||||||
cache_hint: HashMap<[u8; 32], usize>,
|
cache_hint: Arc<HashMap<[u8; 32], usize>>,
|
||||||
cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
|
cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ impl RemoteChunkReader {
|
|||||||
client,
|
client,
|
||||||
crypt_config,
|
crypt_config,
|
||||||
crypt_mode,
|
crypt_mode,
|
||||||
cache_hint,
|
cache_hint: Arc::new(cache_hint),
|
||||||
cache: Arc::new(Mutex::new(HashMap::new())),
|
cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user