pull: rustfmt

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-01-15 11:48:53 +01:00 committed by Thomas Lamprecht
parent b22b6c2299
commit e2956c605d

View File

@ -2,22 +2,21 @@
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::json; use serde_json::json;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::{Arc, Mutex};
use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
use std::time::SystemTime;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use proxmox::api::error::{StatusCode, HttpError};
use crate::{ use crate::{
tools::{ParallelHandler, compute_file_csum},
server::WorkerTask,
backup::*,
api2::types::*, api2::types::*,
backup::*,
client::*, client::*,
server::WorkerTask,
tools::{compute_file_csum, ParallelHandler},
}; };
use proxmox::api::error::{HttpError, StatusCode};
// fixme: implement filters // fixme: implement filters
// fixme: delete vanished groups // fixme: delete vanished groups
@ -28,9 +27,8 @@ async fn pull_index_chunks<I: IndexFile>(
chunk_reader: RemoteChunkReader, chunk_reader: RemoteChunkReader,
target: Arc<DataStore>, target: Arc<DataStore>,
index: I, index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
use futures::stream::{self, StreamExt, TryStreamExt}; use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now(); let start_time = SystemTime::now();
@ -47,18 +45,19 @@ async fn pull_index_chunks<I: IndexFile>(
guard.insert(info.digest); guard.insert(info.digest);
} }
!done !done
}) }),
); );
let target2 = target.clone(); let target2 = target.clone();
let verify_pool = ParallelHandler::new( let verify_pool = ParallelHandler::new(
"sync chunk writer", 4, "sync chunk writer",
move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { 4,
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
chunk.verify_unencrypted(size as usize, &digest)?; chunk.verify_unencrypted(size as usize, &digest)?;
target2.insert_chunk(&chunk, &digest)?; target2.insert_chunk(&chunk, &digest)?;
Ok(()) Ok(())
} },
); );
let verify_and_write_channel = verify_pool.channel(); let verify_and_write_channel = verify_pool.channel();
@ -67,14 +66,15 @@ async fn pull_index_chunks<I: IndexFile>(
stream stream
.map(|info| { .map(|info| {
let target = Arc::clone(&target); let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone(); let chunk_reader = chunk_reader.clone();
let bytes = Arc::clone(&bytes); let bytes = Arc::clone(&bytes);
let verify_and_write_channel = verify_and_write_channel.clone(); let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move { Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; let chunk_exists = crate::tools::runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists { if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
return Ok::<_, Error>(()); return Ok::<_, Error>(());
@ -84,12 +84,14 @@ async fn pull_index_chunks<I: IndexFile>(
let raw_size = chunk.raw_size() as usize; let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput // decode, verify and write in a separate threads to maximize throughput
crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?; crate::tools::runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size()))
})?;
bytes.fetch_add(raw_size, Ordering::SeqCst); bytes.fetch_add(raw_size, Ordering::SeqCst);
Ok(()) Ok(())
}) })
}) })
.try_buffer_unordered(20) .try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(())) .try_for_each(|_res| futures::future::ok(()))
@ -103,7 +105,11 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst); let bytes = bytes.load(Ordering::SeqCst);
worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed))); worker.log(format!(
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
(bytes as f64) / (1024.0 * 1024.0 * elapsed)
));
Ok(()) Ok(())
} }
@ -112,7 +118,6 @@ async fn download_manifest(
reader: &BackupReader, reader: &BackupReader,
filename: &std::path::Path, filename: &std::path::Path,
) -> Result<std::fs::File, Error> { ) -> Result<std::fs::File, Error> {
let mut tmp_manifest_file = std::fs::OpenOptions::new() let mut tmp_manifest_file = std::fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
@ -120,20 +125,23 @@ async fn download_manifest(
.read(true) .read(true)
.open(&filename)?; .open(&filename)?;
reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?; reader
.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
.await?;
tmp_manifest_file.seek(SeekFrom::Start(0))?; tmp_manifest_file.seek(SeekFrom::Start(0))?;
Ok(tmp_manifest_file) Ok(tmp_manifest_file)
} }
fn verify_archive( fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
info: &FileInfo,
csum: &[u8; 32],
size: u64,
) -> Result<(), Error> {
if size != info.size { if size != info.size {
bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size); bail!(
"wrong size for file '{}' ({} != {})",
info.filename,
info.size,
size
);
} }
if csum != &info.csum { if csum != &info.csum {
@ -150,9 +158,8 @@ async fn pull_single_archive(
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
archive_info: &FileInfo, archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let archive_name = &archive_info.filename; let archive_name = &archive_info.filename;
let mut path = tgt_store.base_path(); let mut path = tgt_store.base_path();
path.push(snapshot.relative_path()); path.push(snapshot.relative_path());
@ -172,20 +179,36 @@ async fn pull_single_archive(
match archive_type(archive_name)? { match archive_type(archive_name)? {
ArchiveType::DynamicIndex => { ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::new(tmpfile) let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
.map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
})?;
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; pull_index_chunks(
worker,
chunk_reader.clone(),
tgt_store.clone(),
index,
downloaded_chunks,
)
.await?;
} }
ArchiveType::FixedIndex => { ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile) let index = FixedIndexReader::new(tmpfile).map_err(|err| {
.map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
})?;
let (csum, size) = index.compute_csum(); let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?; verify_archive(archive_info, &csum, size)?;
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; pull_index_chunks(
worker,
chunk_reader.clone(),
tgt_store.clone(),
index,
downloaded_chunks,
)
.await?;
} }
ArchiveType::Blob => { ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?; let (csum, size) = compute_file_csum(&mut tmpfile)?;
@ -205,7 +228,6 @@ async fn try_client_log_download(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
path: &std::path::Path, path: &std::path::Path,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut tmp_path = path.to_owned(); let mut tmp_path = path.to_owned();
tmp_path.set_extension("tmp"); tmp_path.set_extension("tmp");
@ -231,9 +253,8 @@ async fn pull_snapshot(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut manifest_name = tgt_store.base_path(); let mut manifest_name = tgt_store.base_path();
manifest_name.push(snapshot.relative_path()); manifest_name.push(snapshot.relative_path());
manifest_name.push(MANIFEST_BLOB_NAME); manifest_name.push(MANIFEST_BLOB_NAME);
@ -250,34 +271,45 @@ async fn pull_snapshot(
Ok(manifest_file) => manifest_file, Ok(manifest_file) => manifest_file,
Err(err) => { Err(err) => {
match err.downcast_ref::<HttpError>() { match err.downcast_ref::<HttpError>() {
Some(HttpError { code, message }) => { Some(HttpError { code, message }) => match *code {
match *code { StatusCode::NOT_FOUND => {
StatusCode::NOT_FOUND => { worker.log(format!(
worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot)); "skipping snapshot {} - vanished since start of sync",
return Ok(()); snapshot
}, ));
_ => { return Ok(());
bail!("HTTP error {} - {}", code, message); }
}, _ => {
bail!("HTTP error {} - {}", code, message);
} }
}, },
None => { None => {
return Err(err); return Err(err);
}, }
}; };
}, }
}; };
let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
if manifest_name.exists() { if manifest_name.exists() {
let manifest_blob = proxmox::try_block!({ let manifest_blob = proxmox::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name) let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
.map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; format_err!(
"unable to open local manifest {:?} - {}",
manifest_name,
err
)
})?;
let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
Ok(manifest_blob) Ok(manifest_blob)
}).map_err(|err: Error| { })
format_err!("unable to read local manifest {:?} - {}", manifest_name, err) .map_err(|err: Error| {
format_err!(
"unable to read local manifest {:?} - {}",
manifest_name,
err
)
})?; })?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
@ -332,7 +364,12 @@ async fn pull_snapshot(
} }
} }
let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new()); let mut chunk_reader = RemoteChunkReader::new(
reader.clone(),
None,
item.chunk_crypt_mode(),
HashMap::new(),
);
pull_single_archive( pull_single_archive(
worker, worker,
@ -342,7 +379,8 @@ async fn pull_snapshot(
snapshot, snapshot,
&item, &item,
downloaded_chunks.clone(), downloaded_chunks.clone(),
).await?; )
.await?;
} }
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@ -364,15 +402,22 @@ pub async fn pull_snapshot_from(
reader: Arc<BackupReader>, reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>, tgt_store: Arc<DataStore>,
snapshot: &BackupDir, snapshot: &BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
if is_new { if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await { if let Err(err) = pull_snapshot(
worker,
reader,
tgt_store.clone(),
&snapshot,
downloaded_chunks,
)
.await
{
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
worker.log(format!("cleanup error - {}", cleanup_err)); worker.log(format!("cleanup error - {}", cleanup_err));
} }
@ -381,8 +426,18 @@ pub async fn pull_snapshot_from(
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
} else { } else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?; pull_snapshot(
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); worker,
reader,
tgt_store.clone(),
&snapshot,
downloaded_chunks,
)
.await?;
worker.log(format!(
"re-sync snapshot {:?} done",
snapshot.relative_path()
));
} }
Ok(()) Ok(())
@ -397,7 +452,6 @@ pub async fn pull_group(
delete: bool, delete: bool,
progress: &mut StoreProgress, progress: &mut StoreProgress,
) -> Result<(), Error> { ) -> Result<(), Error> {
let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
let args = json!({ let args = json!({
@ -419,7 +473,7 @@ pub async fn pull_group(
let mut remote_snapshots = std::collections::HashSet::new(); let mut remote_snapshots = std::collections::HashSet::new();
// start with 16384 chunks (up to 65GB) // start with 16384 chunks (up to 65GB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64))); let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
progress.group_snapshots = list.len() as u64; progress.group_snapshots = list.len() as u64;
@ -428,7 +482,10 @@ pub async fn pull_group(
// in-progress backups can't be synced // in-progress backups can't be synced
if item.size.is_none() { if item.size.is_none() {
worker.log(format!("skipping snapshot {} - in-progress backup", snapshot)); worker.log(format!(
"skipping snapshot {} - in-progress backup",
snapshot
));
continue; continue;
} }
@ -437,7 +494,9 @@ pub async fn pull_group(
remote_snapshots.insert(backup_time); remote_snapshots.insert(backup_time);
if let Some(last_sync_time) = last_sync { if let Some(last_sync_time) = last_sync {
if last_sync_time > backup_time { continue; } if last_sync_time > backup_time {
continue;
}
} }
// get updated auth_info (new tickets) // get updated auth_info (new tickets)
@ -447,7 +506,12 @@ pub async fn pull_group(
.password(Some(auth_info.ticket.clone())) .password(Some(auth_info.ticket.clone()))
.fingerprint(fingerprint.clone()); .fingerprint(fingerprint.clone());
let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?; let new_client = HttpClient::new(
src_repo.host(),
src_repo.port(),
src_repo.auth_id(),
options,
)?;
let reader = BackupReader::start( let reader = BackupReader::start(
new_client, new_client,
@ -457,9 +521,17 @@ pub async fn pull_group(
snapshot.group().backup_id(), snapshot.group().backup_id(),
backup_time, backup_time,
true, true,
).await?; )
.await?;
let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await; let result = pull_snapshot_from(
worker,
reader,
tgt_store.clone(),
&snapshot,
downloaded_chunks.clone(),
)
.await;
progress.done_snapshots = pos as u64 + 1; progress.done_snapshots = pos as u64 + 1;
worker.log(format!("percentage done: {}", progress)); worker.log(format!("percentage done: {}", progress));
@ -471,8 +543,13 @@ pub async fn pull_group(
let local_list = group.list_backups(&tgt_store.base_path())?; let local_list = group.list_backups(&tgt_store.base_path())?;
for info in local_list { for info in local_list {
let backup_time = info.backup_dir.backup_time(); let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) { continue; } if remote_snapshots.contains(&backup_time) {
worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path())); continue;
}
worker.log(format!(
"delete vanished snapshot {:?}",
info.backup_dir.relative_path()
));
tgt_store.remove_backup_dir(&info.backup_dir, false)?; tgt_store.remove_backup_dir(&info.backup_dir, false)?;
} }
} }
@ -488,7 +565,6 @@ pub async fn pull_store(
delete: bool, delete: bool,
auth_id: Authid, auth_id: Authid,
) -> Result<(), Error> { ) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks // explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
@ -528,19 +604,23 @@ pub async fn pull_store(
let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
Ok(result) => result, Ok(result) => result,
Err(err) => { Err(err) => {
worker.log(format!("sync group {}/{} failed - group lock failed: {}", worker.log(format!(
item.backup_type, item.backup_id, err)); "sync group {}/{} failed - group lock failed: {}",
item.backup_type, item.backup_id, err
));
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
continue; continue;
} }
}; };
// permission check // permission check
if auth_id != owner { // only the owner is allowed to create additional snapshots if auth_id != owner {
worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})", // only the owner is allowed to create additional snapshots
item.backup_type, item.backup_id, auth_id, owner)); worker.log(format!(
"sync group {}/{} failed - owner check failed ({} != {})",
item.backup_type, item.backup_id, auth_id, owner
));
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
} else if let Err(err) = pull_group( } else if let Err(err) = pull_group(
worker, worker,
client, client,
@ -549,12 +629,12 @@ pub async fn pull_store(
&group, &group,
delete, delete,
&mut progress, &mut progress,
).await { )
.await
{
worker.log(format!( worker.log(format!(
"sync group {}/{} failed - {}", "sync group {}/{} failed - {}",
item.backup_type, item.backup_type, item.backup_id, err,
item.backup_id,
err,
)); ));
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
} }
@ -564,8 +644,14 @@ pub async fn pull_store(
let result: Result<(), Error> = proxmox::try_block!({ let result: Result<(), Error> = proxmox::try_block!({
let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
for local_group in local_groups { for local_group in local_groups {
if new_groups.contains(&local_group) { continue; } if new_groups.contains(&local_group) {
worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id())); continue;
}
worker.log(format!(
"delete vanished group '{}/{}'",
local_group.backup_type(),
local_group.backup_id()
));
if let Err(err) = tgt_store.remove_backup_group(&local_group) { if let Err(err) = tgt_store.remove_backup_group(&local_group) {
worker.log(err.to_string()); worker.log(err.to_string());
errors = true; errors = true;