//! Sync datastore from remote server use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::io::{Seek, SeekFrom}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use serde_json::json; use proxmox::api::error::{HttpError, StatusCode}; use pbs_api_types::{Authid, SnapshotListItem, GroupListItem}; use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress}; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{ CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type }; use pbs_tools::sha::sha256; use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader}; use crate::{ backup::DataStore, server::WorkerTask, tools::ParallelHandler, }; // fixme: implement filters // fixme: delete vanished groups // Todo: correctly lock backup groups async fn pull_index_chunks( worker: &WorkerTask, chunk_reader: RemoteChunkReader, target: Arc, index: I, downloaded_chunks: Arc>>, ) -> Result<(), Error> { use futures::stream::{self, StreamExt, TryStreamExt}; let start_time = SystemTime::now(); let stream = stream::iter( (0..index.index_count()) .map(|pos| index.chunk_info(pos).unwrap()) .filter(|info| { let mut guard = downloaded_chunks.lock().unwrap(); let done = guard.contains(&info.digest); if !done { // Note: We mark a chunk as downloaded before its actually downloaded // to avoid duplicate downloads. guard.insert(info.digest); } !done }), ); let target2 = target.clone(); let verify_pool = ParallelHandler::new( "sync chunk writer", 4, move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); chunk.verify_unencrypted(size as usize, &digest)?; target2.insert_chunk(&chunk, &digest)?; Ok(()) }, ); let verify_and_write_channel = verify_pool.channel(); let bytes = Arc::new(AtomicUsize::new(0)); stream .map(|info| { let target = Arc::clone(&target); let chunk_reader = chunk_reader.clone(); let bytes = Arc::clone(&bytes); let verify_and_write_channel = verify_and_write_channel.clone(); Ok::<_, Error>(async move { let chunk_exists = pbs_runtime::block_in_place(|| { target.cond_touch_chunk(&info.digest, false) })?; if chunk_exists { //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); return Ok::<_, Error>(()); } //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; let raw_size = chunk.raw_size() as usize; // decode, verify and write in a separate threads to maximize throughput pbs_runtime::block_in_place(|| { verify_and_write_channel.send((chunk, info.digest, info.size())) })?; bytes.fetch_add(raw_size, Ordering::SeqCst); Ok(()) }) }) .try_buffer_unordered(20) .try_for_each(|_res| futures::future::ok(())) .await?; drop(verify_and_write_channel); verify_pool.complete()?; let elapsed = start_time.elapsed()?.as_secs_f64(); let bytes = bytes.load(Ordering::SeqCst); worker.log(format!( "downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64) / (1024.0 * 1024.0 * elapsed) )); Ok(()) } async fn download_manifest( reader: &BackupReader, filename: &std::path::Path, ) -> Result { let mut tmp_manifest_file = std::fs::OpenOptions::new() .write(true) .create(true) .truncate(true) .read(true) .open(&filename)?; reader .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) .await?; tmp_manifest_file.seek(SeekFrom::Start(0))?; Ok(tmp_manifest_file) } fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { if size != info.size { bail!( "wrong size for file '{}' ({} != {})", info.filename, info.size, size ); } if csum != &info.csum { bail!("wrong checksum for file '{}'", info.filename); } Ok(()) } async fn pull_single_archive( worker: &WorkerTask, reader: &BackupReader, chunk_reader: &mut RemoteChunkReader, tgt_store: Arc, snapshot: &BackupDir, archive_info: &FileInfo, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let archive_name = &archive_info.filename; let mut path = tgt_store.base_path(); path.push(snapshot.relative_path()); path.push(archive_name); let mut tmp_path = path.clone(); tmp_path.set_extension("tmp"); worker.log(format!("sync archive {}", archive_name)); let mut tmpfile = std::fs::OpenOptions::new() .write(true) .create(true) .read(true) .open(&tmp_path)?; reader.download(archive_name, &mut tmpfile).await?; match archive_type(archive_name)? { ArchiveType::DynamicIndex => { let index = DynamicIndexReader::new(tmpfile).map_err(|err| { format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) })?; let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; pull_index_chunks( worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks, ) .await?; } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile).map_err(|err| { format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) })?; let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; pull_index_chunks( worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks, ) .await?; } ArchiveType::Blob => { tmpfile.seek(SeekFrom::Start(0))?; let (csum, size) = sha256(&mut tmpfile)?; verify_archive(archive_info, &csum, size)?; } } if let Err(err) = std::fs::rename(&tmp_path, &path) { bail!("Atomic rename file {:?} failed - {}", path, err); } Ok(()) } // Note: The client.log.blob is uploaded after the backup, so it is // not mentioned in the manifest. async fn try_client_log_download( worker: &WorkerTask, reader: Arc, path: &std::path::Path, ) -> Result<(), Error> { let mut tmp_path = path.to_owned(); tmp_path.set_extension("tmp"); let tmpfile = std::fs::OpenOptions::new() .write(true) .create(true) .read(true) .open(&tmp_path)?; // Note: be silent if there is no log - only log successful download if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { if let Err(err) = std::fs::rename(&tmp_path, &path) { bail!("Atomic rename file {:?} failed - {}", path, err); } worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); } Ok(()) } async fn pull_snapshot( worker: &WorkerTask, reader: Arc, tgt_store: Arc, snapshot: &BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let mut manifest_name = tgt_store.base_path(); manifest_name.push(snapshot.relative_path()); manifest_name.push(MANIFEST_BLOB_NAME); let mut client_log_name = tgt_store.base_path(); client_log_name.push(snapshot.relative_path()); client_log_name.push(CLIENT_LOG_BLOB_NAME); let mut tmp_manifest_name = manifest_name.clone(); tmp_manifest_name.set_extension("tmp"); let download_res = download_manifest(&reader, &tmp_manifest_name).await; let mut tmp_manifest_file = match download_res { Ok(manifest_file) => manifest_file, Err(err) => { match err.downcast_ref::() { Some(HttpError { code, message }) => match *code { StatusCode::NOT_FOUND => { worker.log(format!( "skipping snapshot {} - vanished since start of sync", snapshot )); return Ok(()); } _ => { bail!("HTTP error {} - {}", code, message); } }, None => { return Err(err); } }; } }; let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; if manifest_name.exists() { let manifest_blob = proxmox::try_block!({ let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { format_err!( "unable to open local manifest {:?} - {}", manifest_name, err ) })?; let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; Ok(manifest_blob) }) .map_err(|err: Error| { format_err!( "unable to read local manifest {:?} - {}", manifest_name, err ) })?; if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { if !client_log_name.exists() { try_client_log_download(worker, reader, &client_log_name).await?; } worker.log("no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(()); // nothing changed } } let manifest = BackupManifest::try_from(tmp_manifest_blob)?; for item in manifest.files() { let mut path = tgt_store.base_path(); path.push(snapshot.relative_path()); path.push(&item.filename); if path.exists() { match archive_type(&item.filename)? { ArchiveType::DynamicIndex => { let index = DynamicIndexReader::open(&path)?; let (csum, size) = index.compute_csum(); match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { worker.log(format!("detected changed file {:?} - {}", path, err)); } } } ArchiveType::FixedIndex => { let index = FixedIndexReader::open(&path)?; let (csum, size) = index.compute_csum(); match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { worker.log(format!("detected changed file {:?} - {}", path, err)); } } } ArchiveType::Blob => { let mut tmpfile = std::fs::File::open(&path)?; let (csum, size) = sha256(&mut tmpfile)?; match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { worker.log(format!("detected changed file {:?} - {}", path, err)); } } } } } let mut chunk_reader = RemoteChunkReader::new( reader.clone(), None, item.chunk_crypt_mode(), HashMap::new(), ); pull_single_archive( worker, &reader, &mut chunk_reader, tgt_store.clone(), snapshot, &item, downloaded_chunks.clone(), ) .await?; } if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { bail!("Atomic rename file {:?} failed - {}", manifest_name, err); } if !client_log_name.exists() { try_client_log_download(worker, reader, &client_log_name).await?; } // cleanup - remove stale files tgt_store.cleanup_backup_dir(snapshot, &manifest)?; Ok(()) } pub async fn pull_snapshot_from( worker: &WorkerTask, reader: Arc, tgt_store: Arc, snapshot: &BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; if is_new { worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); if let Err(err) = pull_snapshot( worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks, ) .await { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { worker.log(format!("cleanup error - {}", cleanup_err)); } return Err(err); } worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); } else { worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); pull_snapshot( worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks, ) .await?; worker.log(format!( "re-sync snapshot {:?} done", snapshot.relative_path() )); } Ok(()) } struct SkipInfo { oldest: i64, newest: i64, count: u64, } impl SkipInfo { fn update(&mut self, backup_time: i64) { self.count += 1; if backup_time < self.oldest { self.oldest = backup_time; } if backup_time > self.newest { self.newest = backup_time; } } fn affected(&self) -> Result { match self.count { 0 => Ok(String::new()), 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest), _ => { Ok(format!( "{} .. {}", proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?, proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?, )) } } } } impl std::fmt::Display for SkipInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "skipped: {} snapshot(s) ({}) older than the newest local snapshot", self.count, self.affected().map_err(|_| std::fmt::Error)? ) } } pub async fn pull_group( worker: &WorkerTask, client: &HttpClient, src_repo: &BackupRepository, tgt_store: Arc, group: &BackupGroup, delete: bool, progress: &mut StoreProgress, ) -> Result<(), Error> { let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); let args = json!({ "backup-type": group.backup_type(), "backup-id": group.backup_id(), }); let mut result = client.get(&path, Some(args)).await?; let mut list: Vec = serde_json::from_value(result["data"].take())?; list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); client.login().await?; // make sure auth is complete let fingerprint = client.fingerprint(); let last_sync = tgt_store.last_successful_backup(group)?; let mut remote_snapshots = std::collections::HashSet::new(); // start with 16384 chunks (up to 65GB) let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); progress.group_snapshots = list.len() as u64; let mut skip_info = SkipInfo { oldest: i64::MAX, newest: i64::MIN, count: 0, }; for (pos, item) in list.into_iter().enumerate() { let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; // in-progress backups can't be synced if item.size.is_none() { worker.log(format!( "skipping snapshot {} - in-progress backup", snapshot )); continue; } let backup_time = snapshot.backup_time(); remote_snapshots.insert(backup_time); if let Some(last_sync_time) = last_sync { if last_sync_time > backup_time { skip_info.update(backup_time); continue; } } // get updated auth_info (new tickets) let auth_info = client.login().await?; let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); let new_client = HttpClient::new( src_repo.host(), src_repo.port(), src_repo.auth_id(), options, )?; let reader = BackupReader::start( new_client, None, src_repo.store(), snapshot.group().backup_type(), snapshot.group().backup_id(), backup_time, true, ) .await?; let result = pull_snapshot_from( worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone(), ) .await; progress.done_snapshots = pos as u64 + 1; worker.log(format!("percentage done: {}", progress)); result?; // stop on error } if delete { let local_list = group.list_backups(&tgt_store.base_path())?; for info in local_list { let backup_time = info.backup_dir.backup_time(); if remote_snapshots.contains(&backup_time) { continue; } worker.log(format!( "delete vanished snapshot {:?}", info.backup_dir.relative_path() )); tgt_store.remove_backup_dir(&info.backup_dir, false)?; } } if skip_info.count > 0 { task_log!(worker, "{}", skip_info); } Ok(()) } pub async fn pull_store( worker: &WorkerTask, client: &HttpClient, src_repo: &BackupRepository, tgt_store: Arc, delete: bool, auth_id: Authid, ) -> Result<(), Error> { // explicit create shared lock to prevent GC on newly created chunks let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); let mut result = client .get(&path, None) .await .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; let mut list: Vec = serde_json::from_value(result["data"].take())?; worker.log(format!("found {} groups to sync", list.len())); list.sort_unstable_by(|a, b| { let type_order = a.backup_type.cmp(&b.backup_type); if type_order == std::cmp::Ordering::Equal { a.backup_id.cmp(&b.backup_id) } else { type_order } }); let mut errors = false; let mut new_groups = std::collections::HashSet::new(); for item in list.iter() { new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id)); } let mut progress = StoreProgress::new(list.len() as u64); for (done, item) in list.into_iter().enumerate() { progress.done_groups = done as u64; progress.done_snapshots = 0; progress.group_snapshots = 0; let group = BackupGroup::new(&item.backup_type, &item.backup_id); let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { Ok(result) => result, Err(err) => { worker.log(format!( "sync group {}/{} failed - group lock failed: {}", item.backup_type, item.backup_id, err )); errors = true; // do not stop here, instead continue continue; } }; // permission check if auth_id != owner { // only the owner is allowed to create additional snapshots worker.log(format!( "sync group {}/{} failed - owner check failed ({} != {})", item.backup_type, item.backup_id, auth_id, owner )); errors = true; // do not stop here, instead continue } else if let Err(err) = pull_group( worker, client, src_repo, tgt_store.clone(), &group, delete, &mut progress, ) .await { worker.log(format!( "sync group {}/{} failed - {}", item.backup_type, item.backup_id, err, )); errors = true; // do not stop here, instead continue } } if delete { let result: Result<(), Error> = proxmox::try_block!({ let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; for local_group in local_groups { if new_groups.contains(&local_group) { continue; } worker.log(format!( "delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id() )); if let Err(err) = tgt_store.remove_backup_group(&local_group) { worker.log(err.to_string()); errors = true; } } Ok(()) }); if let Err(err) = result { worker.log(format!("error during cleanup: {}", err)); errors = true; }; } if errors { bail!("sync failed with some errors."); } Ok(()) }