diff --git a/src/client/pull.rs b/src/client/pull.rs index ed8256fa..f6ef7cde 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -2,22 +2,21 @@ use anyhow::{bail, format_err, Error}; use serde_json::json; +use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; -use std::sync::{Arc, Mutex}; -use std::collections::{HashSet, HashMap}; use std::io::{Seek, SeekFrom}; -use std::time::SystemTime; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; -use proxmox::api::error::{StatusCode, HttpError}; use crate::{ - tools::{ParallelHandler, compute_file_csum}, - server::WorkerTask, - backup::*, api2::types::*, + backup::*, client::*, + server::WorkerTask, + tools::{compute_file_csum, ParallelHandler}, }; - +use proxmox::api::error::{HttpError, StatusCode}; // fixme: implement filters // fixme: delete vanished groups @@ -28,9 +27,8 @@ async fn pull_index_chunks( chunk_reader: RemoteChunkReader, target: Arc, index: I, - downloaded_chunks: Arc>>, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { - use futures::stream::{self, StreamExt, TryStreamExt}; let start_time = SystemTime::now(); @@ -47,18 +45,19 @@ async fn pull_index_chunks( guard.insert(info.digest); } !done - }) + }), ); let target2 = target.clone(); let verify_pool = ParallelHandler::new( - "sync chunk writer", 4, - move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { + "sync chunk writer", + 4, + move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); chunk.verify_unencrypted(size as usize, &digest)?; target2.insert_chunk(&chunk, &digest)?; Ok(()) - } + }, ); let verify_and_write_channel = verify_pool.channel(); @@ -67,14 +66,15 @@ async fn pull_index_chunks( stream .map(|info| { - let target = Arc::clone(&target); let chunk_reader = chunk_reader.clone(); let bytes = Arc::clone(&bytes); let verify_and_write_channel = verify_and_write_channel.clone(); Ok::<_, Error>(async move { - let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; + let chunk_exists = crate::tools::runtime::block_in_place(|| { + target.cond_touch_chunk(&info.digest, false) + })?; if chunk_exists { //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); return Ok::<_, Error>(()); @@ -84,12 +84,14 @@ async fn pull_index_chunks( let raw_size = chunk.raw_size() as usize; // decode, verify and write in a separate threads to maximize throughput - crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?; + crate::tools::runtime::block_in_place(|| { + verify_and_write_channel.send((chunk, info.digest, info.size())) + })?; bytes.fetch_add(raw_size, Ordering::SeqCst); Ok(()) - }) + }) }) .try_buffer_unordered(20) .try_for_each(|_res| futures::future::ok(())) @@ -103,7 +105,11 @@ async fn pull_index_chunks( let bytes = bytes.load(Ordering::SeqCst); - worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed))); + worker.log(format!( + "downloaded {} bytes ({:.2} MiB/s)", + bytes, + (bytes as f64) / (1024.0 * 1024.0 * elapsed) + )); Ok(()) } @@ -112,7 +118,6 @@ async fn download_manifest( reader: &BackupReader, filename: &std::path::Path, ) -> Result { - let mut tmp_manifest_file = std::fs::OpenOptions::new() .write(true) .create(true) @@ -120,20 +125,23 @@ async fn download_manifest( .read(true) .open(&filename)?; - reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?; + reader + .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) + .await?; tmp_manifest_file.seek(SeekFrom::Start(0))?; Ok(tmp_manifest_file) } -fn verify_archive( - info: &FileInfo, - csum: &[u8; 32], - size: u64, -) -> Result<(), Error> { +fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { if size != info.size { - bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size); + bail!( + "wrong size for file '{}' ({} != {})", + info.filename, + info.size, + size + ); } if csum != &info.csum { @@ -150,9 +158,8 @@ async fn pull_single_archive( tgt_store: Arc, snapshot: &BackupDir, archive_info: &FileInfo, - downloaded_chunks: Arc>>, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let archive_name = &archive_info.filename; let mut path = tgt_store.base_path(); path.push(snapshot.relative_path()); @@ -172,20 +179,36 @@ async fn pull_single_archive( match archive_type(archive_name)? { ArchiveType::DynamicIndex => { - let index = DynamicIndexReader::new(tmpfile) - .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; + let index = DynamicIndexReader::new(tmpfile).map_err(|err| { + format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) + })?; let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; + pull_index_chunks( + worker, + chunk_reader.clone(), + tgt_store.clone(), + index, + downloaded_chunks, + ) + .await?; } ArchiveType::FixedIndex => { - let index = FixedIndexReader::new(tmpfile) - .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; + let index = FixedIndexReader::new(tmpfile).map_err(|err| { + format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) + })?; let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; + pull_index_chunks( + worker, + chunk_reader.clone(), + tgt_store.clone(), + index, + downloaded_chunks, + ) + .await?; } ArchiveType::Blob => { let (csum, size) = compute_file_csum(&mut tmpfile)?; @@ -205,7 +228,6 @@ async fn try_client_log_download( reader: Arc, path: &std::path::Path, ) -> Result<(), Error> { - let mut tmp_path = path.to_owned(); tmp_path.set_extension("tmp"); @@ -231,9 +253,8 @@ async fn pull_snapshot( reader: Arc, tgt_store: Arc, snapshot: &BackupDir, - downloaded_chunks: Arc>>, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let mut manifest_name = tgt_store.base_path(); manifest_name.push(snapshot.relative_path()); manifest_name.push(MANIFEST_BLOB_NAME); @@ -250,34 +271,45 @@ async fn pull_snapshot( Ok(manifest_file) => manifest_file, Err(err) => { match err.downcast_ref::() { - Some(HttpError { code, message }) => { - match *code { - StatusCode::NOT_FOUND => { - worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot)); - return Ok(()); - }, - _ => { - bail!("HTTP error {} - {}", code, message); - }, + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + worker.log(format!( + "skipping snapshot {} - vanished since start of sync", + snapshot + )); + return Ok(()); + } + _ => { + bail!("HTTP error {} - {}", code, message); } }, None => { return Err(err); - }, + } }; - }, + } }; let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; if manifest_name.exists() { let manifest_blob = proxmox::try_block!({ - let mut manifest_file = std::fs::File::open(&manifest_name) - .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; + let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { + format_err!( + "unable to open local manifest {:?} - {}", + manifest_name, + err + ) + })?; let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; Ok(manifest_blob) - }).map_err(|err: Error| { - format_err!("unable to read local manifest {:?} - {}", manifest_name, err) + }) + .map_err(|err: Error| { + format_err!( + "unable to read local manifest {:?} - {}", + manifest_name, + err + ) })?; if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { @@ -332,7 +364,12 @@ async fn pull_snapshot( } } - let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new()); + let mut chunk_reader = RemoteChunkReader::new( + reader.clone(), + None, + item.chunk_crypt_mode(), + HashMap::new(), + ); pull_single_archive( worker, @@ -342,7 +379,8 @@ async fn pull_snapshot( snapshot, &item, downloaded_chunks.clone(), - ).await?; + ) + .await?; } if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { @@ -364,15 +402,22 @@ pub async fn pull_snapshot_from( reader: Arc, tgt_store: Arc, snapshot: &BackupDir, - downloaded_chunks: Arc>>, + downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; if is_new { worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); - if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await { + if let Err(err) = pull_snapshot( + worker, + reader, + tgt_store.clone(), + &snapshot, + downloaded_chunks, + ) + .await + { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { worker.log(format!("cleanup error - {}", cleanup_err)); } @@ -381,8 +426,18 @@ pub async fn pull_snapshot_from( worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); } else { worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); - pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?; - worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); + pull_snapshot( + worker, + reader, + tgt_store.clone(), + &snapshot, + downloaded_chunks, + ) + .await?; + worker.log(format!( + "re-sync snapshot {:?} done", + snapshot.relative_path() + )); } Ok(()) @@ -397,7 +452,6 @@ pub async fn pull_group( delete: bool, progress: &mut StoreProgress, ) -> Result<(), Error> { - let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); let args = json!({ @@ -419,7 +473,7 @@ pub async fn pull_group( let mut remote_snapshots = std::collections::HashSet::new(); // start with 16384 chunks (up to 65GB) - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64))); + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); progress.group_snapshots = list.len() as u64; @@ -428,7 +482,10 @@ pub async fn pull_group( // in-progress backups can't be synced if item.size.is_none() { - worker.log(format!("skipping snapshot {} - in-progress backup", snapshot)); + worker.log(format!( + "skipping snapshot {} - in-progress backup", + snapshot + )); continue; } @@ -437,7 +494,9 @@ pub async fn pull_group( remote_snapshots.insert(backup_time); if let Some(last_sync_time) = last_sync { - if last_sync_time > backup_time { continue; } + if last_sync_time > backup_time { + continue; + } } // get updated auth_info (new tickets) @@ -447,7 +506,12 @@ pub async fn pull_group( .password(Some(auth_info.ticket.clone())) .fingerprint(fingerprint.clone()); - let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?; + let new_client = HttpClient::new( + src_repo.host(), + src_repo.port(), + src_repo.auth_id(), + options, + )?; let reader = BackupReader::start( new_client, @@ -457,9 +521,17 @@ pub async fn pull_group( snapshot.group().backup_id(), backup_time, true, - ).await?; + ) + .await?; - let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await; + let result = pull_snapshot_from( + worker, + reader, + tgt_store.clone(), + &snapshot, + downloaded_chunks.clone(), + ) + .await; progress.done_snapshots = pos as u64 + 1; worker.log(format!("percentage done: {}", progress)); @@ -471,8 +543,13 @@ pub async fn pull_group( let local_list = group.list_backups(&tgt_store.base_path())?; for info in local_list { let backup_time = info.backup_dir.backup_time(); - if remote_snapshots.contains(&backup_time) { continue; } - worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path())); + if remote_snapshots.contains(&backup_time) { + continue; + } + worker.log(format!( + "delete vanished snapshot {:?}", + info.backup_dir.relative_path() + )); tgt_store.remove_backup_dir(&info.backup_dir, false)?; } } @@ -488,7 +565,6 @@ pub async fn pull_store( delete: bool, auth_id: Authid, ) -> Result<(), Error> { - // explicit create shared lock to prevent GC on newly created chunks let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; @@ -528,19 +604,23 @@ pub async fn pull_store( let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { Ok(result) => result, Err(err) => { - worker.log(format!("sync group {}/{} failed - group lock failed: {}", - item.backup_type, item.backup_id, err)); + worker.log(format!( + "sync group {}/{} failed - group lock failed: {}", + item.backup_type, item.backup_id, err + )); errors = true; // do not stop here, instead continue continue; } }; // permission check - if auth_id != owner { // only the owner is allowed to create additional snapshots - worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})", - item.backup_type, item.backup_id, auth_id, owner)); + if auth_id != owner { + // only the owner is allowed to create additional snapshots + worker.log(format!( + "sync group {}/{} failed - owner check failed ({} != {})", + item.backup_type, item.backup_id, auth_id, owner + )); errors = true; // do not stop here, instead continue - } else if let Err(err) = pull_group( worker, client, @@ -549,12 +629,12 @@ pub async fn pull_store( &group, delete, &mut progress, - ).await { + ) + .await + { worker.log(format!( "sync group {}/{} failed - {}", - item.backup_type, - item.backup_id, - err, + item.backup_type, item.backup_id, err, )); errors = true; // do not stop here, instead continue } @@ -564,8 +644,14 @@ pub async fn pull_store( let result: Result<(), Error> = proxmox::try_block!({ let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; for local_group in local_groups { - if new_groups.contains(&local_group) { continue; } - worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id())); + if new_groups.contains(&local_group) { + continue; + } + worker.log(format!( + "delete vanished group '{}/{}'", + local_group.backup_type(), + local_group.backup_id() + )); if let Err(err) = tgt_store.remove_backup_group(&local_group) { worker.log(err.to_string()); errors = true;