2020-05-22 06:04:20 +00:00
|
|
|
//! Sync datastore from remote server
|
|
|
|
|
|
|
|
use anyhow::{bail, format_err, Error};
|
|
|
|
use serde_json::json;
|
|
|
|
use std::convert::TryFrom;
|
2020-09-22 10:34:06 +00:00
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use std::collections::{HashSet, HashMap};
|
2020-05-22 06:04:20 +00:00
|
|
|
use std::io::{Seek, SeekFrom};
|
2020-09-24 10:58:53 +00:00
|
|
|
use std::time::SystemTime;
|
2020-09-25 10:14:59 +00:00
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-07-21 13:03:35 +00:00
|
|
|
use proxmox::api::error::{StatusCode, HttpError};
|
2020-09-17 08:27:04 +00:00
|
|
|
use crate::{
|
2020-09-25 10:14:59 +00:00
|
|
|
tools::{ParallelHandler, compute_file_csum},
|
2020-09-17 08:27:04 +00:00
|
|
|
server::WorkerTask,
|
|
|
|
backup::*,
|
|
|
|
api2::types::*,
|
|
|
|
client::*,
|
|
|
|
};
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
// fixme: implement filters
|
|
|
|
// fixme: delete vanished groups
|
|
|
|
// Todo: correctly lock backup groups
|
|
|
|
|
|
|
|
async fn pull_index_chunks<I: IndexFile>(
|
2020-09-24 10:58:53 +00:00
|
|
|
worker: &WorkerTask,
|
2020-09-22 07:52:14 +00:00
|
|
|
chunk_reader: RemoteChunkReader,
|
2020-05-22 06:04:20 +00:00
|
|
|
target: Arc<DataStore>,
|
|
|
|
index: I,
|
2020-09-22 10:34:06 +00:00
|
|
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
2020-09-22 07:52:14 +00:00
|
|
|
use futures::stream::{self, StreamExt, TryStreamExt};
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-09-24 10:58:53 +00:00
|
|
|
let start_time = SystemTime::now();
|
|
|
|
|
2020-09-22 10:34:06 +00:00
|
|
|
let stream = stream::iter(
|
|
|
|
(0..index.index_count())
|
|
|
|
.map(|pos| index.chunk_info(pos).unwrap())
|
|
|
|
.filter(|info| {
|
|
|
|
let mut guard = downloaded_chunks.lock().unwrap();
|
|
|
|
let done = guard.contains(&info.digest);
|
|
|
|
if !done {
|
|
|
|
// Note: We mark a chunk as downloaded before its actually downloaded
|
|
|
|
// to avoid duplicate downloads.
|
|
|
|
guard.insert(info.digest);
|
|
|
|
}
|
|
|
|
!done
|
|
|
|
})
|
|
|
|
);
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-10-01 12:48:49 +00:00
|
|
|
let target2 = target.clone();
|
2020-09-25 10:14:59 +00:00
|
|
|
let verify_pool = ParallelHandler::new(
|
|
|
|
"sync chunk writer", 4,
|
2020-10-01 12:48:49 +00:00
|
|
|
move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
|
2020-09-25 10:14:59 +00:00
|
|
|
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
|
|
|
|
chunk.verify_unencrypted(size as usize, &digest)?;
|
2020-10-01 12:48:49 +00:00
|
|
|
target2.insert_chunk(&chunk, &digest)?;
|
2020-09-25 10:14:59 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
);
|
|
|
|
|
|
|
|
let verify_and_write_channel = verify_pool.channel();
|
2020-09-24 10:58:53 +00:00
|
|
|
|
2020-09-25 10:14:59 +00:00
|
|
|
let bytes = Arc::new(AtomicUsize::new(0));
|
|
|
|
|
|
|
|
stream
|
2020-09-22 07:52:14 +00:00
|
|
|
.map(|info| {
|
2020-08-03 12:10:44 +00:00
|
|
|
|
2020-09-22 07:52:14 +00:00
|
|
|
let target = Arc::clone(&target);
|
|
|
|
let chunk_reader = chunk_reader.clone();
|
2020-09-25 10:14:59 +00:00
|
|
|
let bytes = Arc::clone(&bytes);
|
|
|
|
let verify_and_write_channel = verify_and_write_channel.clone();
|
2020-09-22 07:52:14 +00:00
|
|
|
|
|
|
|
Ok::<_, Error>(async move {
|
|
|
|
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
|
|
|
|
if chunk_exists {
|
|
|
|
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
|
|
|
|
return Ok::<_, Error>(());
|
|
|
|
}
|
|
|
|
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
|
|
|
|
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
|
2020-09-25 10:14:59 +00:00
|
|
|
let raw_size = chunk.raw_size() as usize;
|
2020-09-22 07:52:14 +00:00
|
|
|
|
2020-09-24 10:58:53 +00:00
|
|
|
// decode, verify and write in a separate threads to maximize throughput
|
2020-09-25 10:14:59 +00:00
|
|
|
crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
|
|
|
|
|
|
|
|
bytes.fetch_add(raw_size, Ordering::SeqCst);
|
2020-09-24 10:58:53 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
})
|
2020-09-22 07:52:14 +00:00
|
|
|
})
|
|
|
|
.try_buffer_unordered(20)
|
|
|
|
.try_for_each(|_res| futures::future::ok(()))
|
2020-09-25 10:14:59 +00:00
|
|
|
.await?;
|
2020-09-24 10:58:53 +00:00
|
|
|
|
2020-09-25 10:14:59 +00:00
|
|
|
drop(verify_and_write_channel);
|
2020-09-24 10:58:53 +00:00
|
|
|
|
2020-09-25 10:14:59 +00:00
|
|
|
verify_pool.complete()?;
|
2020-09-24 10:58:53 +00:00
|
|
|
|
|
|
|
let elapsed = start_time.elapsed()?.as_secs_f64();
|
|
|
|
|
2020-09-25 10:14:59 +00:00
|
|
|
let bytes = bytes.load(Ordering::SeqCst);
|
|
|
|
|
2020-09-24 10:58:53 +00:00
|
|
|
worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn download_manifest(
|
|
|
|
reader: &BackupReader,
|
|
|
|
filename: &std::path::Path,
|
|
|
|
) -> Result<std::fs::File, Error> {
|
|
|
|
|
2020-06-12 09:46:42 +00:00
|
|
|
let mut tmp_manifest_file = std::fs::OpenOptions::new()
|
2020-05-22 06:04:20 +00:00
|
|
|
.write(true)
|
|
|
|
.create(true)
|
2020-09-21 11:53:35 +00:00
|
|
|
.truncate(true)
|
2020-05-22 06:04:20 +00:00
|
|
|
.read(true)
|
|
|
|
.open(&filename)?;
|
|
|
|
|
2020-06-12 09:46:42 +00:00
|
|
|
reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
tmp_manifest_file.seek(SeekFrom::Start(0))?;
|
|
|
|
|
|
|
|
Ok(tmp_manifest_file)
|
|
|
|
}
|
|
|
|
|
2020-08-03 12:10:45 +00:00
|
|
|
fn verify_archive(
|
|
|
|
info: &FileInfo,
|
|
|
|
csum: &[u8; 32],
|
|
|
|
size: u64,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if size != info.size {
|
|
|
|
bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size);
|
|
|
|
}
|
|
|
|
|
|
|
|
if csum != &info.csum {
|
|
|
|
bail!("wrong checksum for file '{}'", info.filename);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
async fn pull_single_archive(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
reader: &BackupReader,
|
|
|
|
chunk_reader: &mut RemoteChunkReader,
|
|
|
|
tgt_store: Arc<DataStore>,
|
|
|
|
snapshot: &BackupDir,
|
2020-08-03 12:10:45 +00:00
|
|
|
archive_info: &FileInfo,
|
2020-09-22 10:34:06 +00:00
|
|
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
2020-08-03 12:10:45 +00:00
|
|
|
let archive_name = &archive_info.filename;
|
2020-05-22 06:04:20 +00:00
|
|
|
let mut path = tgt_store.base_path();
|
|
|
|
path.push(snapshot.relative_path());
|
|
|
|
path.push(archive_name);
|
|
|
|
|
|
|
|
let mut tmp_path = path.clone();
|
|
|
|
tmp_path.set_extension("tmp");
|
|
|
|
|
|
|
|
worker.log(format!("sync archive {}", archive_name));
|
2020-06-12 09:46:42 +00:00
|
|
|
let mut tmpfile = std::fs::OpenOptions::new()
|
2020-05-22 06:04:20 +00:00
|
|
|
.write(true)
|
|
|
|
.create(true)
|
|
|
|
.read(true)
|
|
|
|
.open(&tmp_path)?;
|
|
|
|
|
2020-06-12 09:46:42 +00:00
|
|
|
reader.download(archive_name, &mut tmpfile).await?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
match archive_type(archive_name)? {
|
|
|
|
ArchiveType::DynamicIndex => {
|
|
|
|
let index = DynamicIndexReader::new(tmpfile)
|
|
|
|
.map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
|
2020-08-03 12:10:45 +00:00
|
|
|
let (csum, size) = index.compute_csum();
|
|
|
|
verify_archive(archive_info, &csum, size)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-09-22 10:34:06 +00:00
|
|
|
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
ArchiveType::FixedIndex => {
|
|
|
|
let index = FixedIndexReader::new(tmpfile)
|
|
|
|
.map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
|
2020-08-03 12:10:45 +00:00
|
|
|
let (csum, size) = index.compute_csum();
|
|
|
|
verify_archive(archive_info, &csum, size)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-09-22 10:34:06 +00:00
|
|
|
pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
2020-08-03 12:10:45 +00:00
|
|
|
ArchiveType::Blob => {
|
|
|
|
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
|
|
|
verify_archive(archive_info, &csum, size)?;
|
|
|
|
}
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
if let Err(err) = std::fs::rename(&tmp_path, &path) {
|
|
|
|
bail!("Atomic rename file {:?} failed - {}", path, err);
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-05-30 12:39:38 +00:00
|
|
|
// Note: The client.log.blob is uploaded after the backup, so it is
|
|
|
|
// not mentioned in the manifest.
|
|
|
|
async fn try_client_log_download(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
reader: Arc<BackupReader>,
|
|
|
|
path: &std::path::Path,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
|
|
|
let mut tmp_path = path.to_owned();
|
|
|
|
tmp_path.set_extension("tmp");
|
|
|
|
|
|
|
|
let tmpfile = std::fs::OpenOptions::new()
|
|
|
|
.write(true)
|
|
|
|
.create(true)
|
|
|
|
.read(true)
|
|
|
|
.open(&tmp_path)?;
|
|
|
|
|
2020-05-30 14:37:33 +00:00
|
|
|
// Note: be silent if there is no log - only log successful download
|
2020-06-12 09:46:42 +00:00
|
|
|
if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
|
2020-05-30 12:39:38 +00:00
|
|
|
if let Err(err) = std::fs::rename(&tmp_path, &path) {
|
|
|
|
bail!("Atomic rename file {:?} failed - {}", path, err);
|
|
|
|
}
|
2020-05-30 14:37:33 +00:00
|
|
|
worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
|
2020-05-30 12:39:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
async fn pull_snapshot(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
reader: Arc<BackupReader>,
|
|
|
|
tgt_store: Arc<DataStore>,
|
|
|
|
snapshot: &BackupDir,
|
2020-09-22 10:34:06 +00:00
|
|
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
|
|
|
let mut manifest_name = tgt_store.base_path();
|
|
|
|
manifest_name.push(snapshot.relative_path());
|
|
|
|
manifest_name.push(MANIFEST_BLOB_NAME);
|
|
|
|
|
2020-05-30 12:39:38 +00:00
|
|
|
let mut client_log_name = tgt_store.base_path();
|
|
|
|
client_log_name.push(snapshot.relative_path());
|
|
|
|
client_log_name.push(CLIENT_LOG_BLOB_NAME);
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
let mut tmp_manifest_name = manifest_name.clone();
|
|
|
|
tmp_manifest_name.set_extension("tmp");
|
|
|
|
|
2020-07-21 13:03:35 +00:00
|
|
|
let download_res = download_manifest(&reader, &tmp_manifest_name).await;
|
|
|
|
let mut tmp_manifest_file = match download_res {
|
|
|
|
Ok(manifest_file) => manifest_file,
|
|
|
|
Err(err) => {
|
|
|
|
match err.downcast_ref::<HttpError>() {
|
|
|
|
Some(HttpError { code, message }) => {
|
2020-10-14 09:18:26 +00:00
|
|
|
match *code {
|
|
|
|
StatusCode::NOT_FOUND => {
|
2020-07-21 13:03:35 +00:00
|
|
|
worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot));
|
|
|
|
return Ok(());
|
|
|
|
},
|
|
|
|
_ => {
|
|
|
|
bail!("HTTP error {} - {}", code, message);
|
|
|
|
},
|
|
|
|
}
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
return Err(err);
|
|
|
|
},
|
|
|
|
};
|
|
|
|
},
|
|
|
|
};
|
2020-07-28 08:23:16 +00:00
|
|
|
let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
if manifest_name.exists() {
|
|
|
|
let manifest_blob = proxmox::try_block!({
|
|
|
|
let mut manifest_file = std::fs::File::open(&manifest_name)
|
|
|
|
.map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
|
|
|
|
|
2020-07-28 08:23:16 +00:00
|
|
|
let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
Ok(manifest_blob)
|
|
|
|
}).map_err(|err: Error| {
|
|
|
|
format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
|
|
|
|
})?;
|
|
|
|
|
|
|
|
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
|
2020-05-30 12:39:38 +00:00
|
|
|
if !client_log_name.exists() {
|
|
|
|
try_client_log_download(worker, reader, &client_log_name).await?;
|
|
|
|
}
|
|
|
|
worker.log("no data changes");
|
2020-09-21 12:03:01 +00:00
|
|
|
let _ = std::fs::remove_file(&tmp_manifest_name);
|
2020-05-22 06:04:20 +00:00
|
|
|
return Ok(()); // nothing changed
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
|
|
|
|
|
|
|
|
for item in manifest.files() {
|
|
|
|
let mut path = tgt_store.base_path();
|
|
|
|
path.push(snapshot.relative_path());
|
|
|
|
path.push(&item.filename);
|
|
|
|
|
|
|
|
if path.exists() {
|
|
|
|
match archive_type(&item.filename)? {
|
|
|
|
ArchiveType::DynamicIndex => {
|
|
|
|
let index = DynamicIndexReader::open(&path)?;
|
|
|
|
let (csum, size) = index.compute_csum();
|
|
|
|
match manifest.verify_file(&item.filename, &csum, size) {
|
|
|
|
Ok(_) => continue,
|
|
|
|
Err(err) => {
|
|
|
|
worker.log(format!("detected changed file {:?} - {}", path, err));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ArchiveType::FixedIndex => {
|
|
|
|
let index = FixedIndexReader::open(&path)?;
|
|
|
|
let (csum, size) = index.compute_csum();
|
|
|
|
match manifest.verify_file(&item.filename, &csum, size) {
|
|
|
|
Ok(_) => continue,
|
|
|
|
Err(err) => {
|
|
|
|
worker.log(format!("detected changed file {:?} - {}", path, err));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ArchiveType::Blob => {
|
|
|
|
let mut tmpfile = std::fs::File::open(&path)?;
|
|
|
|
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
|
|
|
match manifest.verify_file(&item.filename, &csum, size) {
|
|
|
|
Ok(_) => continue,
|
|
|
|
Err(err) => {
|
|
|
|
worker.log(format!("detected changed file {:?} - {}", path, err));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-10 11:25:07 +00:00
|
|
|
let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new());
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
pull_single_archive(
|
|
|
|
worker,
|
|
|
|
&reader,
|
|
|
|
&mut chunk_reader,
|
|
|
|
tgt_store.clone(),
|
|
|
|
snapshot,
|
2020-08-03 12:10:45 +00:00
|
|
|
&item,
|
2020-09-22 10:34:06 +00:00
|
|
|
downloaded_chunks.clone(),
|
2020-05-22 06:04:20 +00:00
|
|
|
).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
|
|
|
|
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
|
|
|
|
}
|
|
|
|
|
2020-05-30 12:39:38 +00:00
|
|
|
if !client_log_name.exists() {
|
|
|
|
try_client_log_download(worker, reader, &client_log_name).await?;
|
|
|
|
}
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
// cleanup - remove stale files
|
|
|
|
tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn pull_snapshot_from(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
reader: Arc<BackupReader>,
|
|
|
|
tgt_store: Arc<DataStore>,
|
|
|
|
snapshot: &BackupDir,
|
2020-09-22 10:34:06 +00:00
|
|
|
downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
2020-08-11 08:50:39 +00:00
|
|
|
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
if is_new {
|
|
|
|
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
|
|
|
|
|
2020-09-22 10:34:06 +00:00
|
|
|
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
|
2020-07-29 12:33:11 +00:00
|
|
|
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
|
2020-05-22 06:04:20 +00:00
|
|
|
worker.log(format!("cleanup error - {}", cleanup_err));
|
|
|
|
}
|
|
|
|
return Err(err);
|
|
|
|
}
|
2020-05-30 06:12:43 +00:00
|
|
|
worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
|
2020-05-22 06:04:20 +00:00
|
|
|
} else {
|
|
|
|
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
|
2020-09-22 10:34:06 +00:00
|
|
|
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
|
2020-05-30 06:12:43 +00:00
|
|
|
worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn pull_group(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
client: &HttpClient,
|
|
|
|
src_repo: &BackupRepository,
|
|
|
|
tgt_store: Arc<DataStore>,
|
|
|
|
group: &BackupGroup,
|
|
|
|
delete: bool,
|
2020-09-30 11:35:09 +00:00
|
|
|
progress: Option<(usize, usize)>, // (groups_done, group_count)
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
|
|
|
let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
|
|
|
|
|
|
|
|
let args = json!({
|
|
|
|
"backup-type": group.backup_type(),
|
|
|
|
"backup-id": group.backup_id(),
|
|
|
|
});
|
|
|
|
|
|
|
|
let mut result = client.get(&path, Some(args)).await?;
|
|
|
|
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
|
|
|
|
|
|
|
|
list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
|
|
|
|
|
|
|
|
let auth_info = client.login().await?;
|
|
|
|
let fingerprint = client.fingerprint();
|
|
|
|
|
|
|
|
let last_sync = tgt_store.last_successful_backup(group)?;
|
|
|
|
|
|
|
|
let mut remote_snapshots = std::collections::HashSet::new();
|
|
|
|
|
2020-09-30 11:35:09 +00:00
|
|
|
let (per_start, per_group) = if let Some((groups_done, group_count)) = progress {
|
|
|
|
let per_start = (groups_done as f64)/(group_count as f64);
|
|
|
|
let per_group = 1.0/(group_count as f64);
|
|
|
|
(per_start, per_group)
|
|
|
|
} else {
|
|
|
|
(0.0, 1.0)
|
|
|
|
};
|
|
|
|
|
2020-09-22 10:34:06 +00:00
|
|
|
// start with 16384 chunks (up to 65GB)
|
|
|
|
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
|
|
|
|
|
2020-09-30 11:35:09 +00:00
|
|
|
let snapshot_count = list.len();
|
|
|
|
|
|
|
|
for (pos, item) in list.into_iter().enumerate() {
|
2020-09-11 12:34:38 +00:00
|
|
|
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
|
2020-07-21 13:03:33 +00:00
|
|
|
|
|
|
|
// in-progress backups can't be synced
|
2020-10-14 09:18:26 +00:00
|
|
|
if item.size.is_none() {
|
2020-07-21 13:03:33 +00:00
|
|
|
worker.log(format!("skipping snapshot {} - in-progress backup", snapshot));
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
let backup_time = snapshot.backup_time();
|
|
|
|
|
2020-05-22 06:04:20 +00:00
|
|
|
remote_snapshots.insert(backup_time);
|
|
|
|
|
|
|
|
if let Some(last_sync_time) = last_sync {
|
|
|
|
if last_sync_time > backup_time { continue; }
|
|
|
|
}
|
|
|
|
|
|
|
|
let options = HttpClientOptions::new()
|
|
|
|
.password(Some(auth_info.ticket.clone()))
|
|
|
|
.fingerprint(fingerprint.clone());
|
|
|
|
|
2020-10-08 13:19:39 +00:00
|
|
|
let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
|
|
|
|
let reader = BackupReader::start(
|
|
|
|
new_client,
|
|
|
|
None,
|
|
|
|
src_repo.store(),
|
2020-07-21 13:03:33 +00:00
|
|
|
snapshot.group().backup_type(),
|
|
|
|
snapshot.group().backup_id(),
|
2020-05-22 06:04:20 +00:00
|
|
|
backup_time,
|
|
|
|
true,
|
|
|
|
).await?;
|
|
|
|
|
2020-09-30 11:35:09 +00:00
|
|
|
let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await;
|
|
|
|
|
|
|
|
let percentage = (pos as f64)/(snapshot_count as f64);
|
|
|
|
let percentage = per_start + percentage*per_group;
|
|
|
|
worker.log(format!("percentage done: {:.2}%", percentage*100.0));
|
|
|
|
|
|
|
|
result?; // stop on error
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if delete {
|
|
|
|
let local_list = group.list_backups(&tgt_store.base_path())?;
|
|
|
|
for info in local_list {
|
|
|
|
let backup_time = info.backup_dir.backup_time();
|
|
|
|
if remote_snapshots.contains(&backup_time) { continue; }
|
|
|
|
worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
|
2020-07-29 12:33:11 +00:00
|
|
|
tgt_store.remove_backup_dir(&info.backup_dir, false)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn pull_store(
|
|
|
|
worker: &WorkerTask,
|
|
|
|
client: &HttpClient,
|
|
|
|
src_repo: &BackupRepository,
|
|
|
|
tgt_store: Arc<DataStore>,
|
|
|
|
delete: bool,
|
2020-10-23 11:33:21 +00:00
|
|
|
auth_id: Authid,
|
2020-05-22 06:04:20 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
|
|
|
// explicit create shared lock to prevent GC on newly created chunks
|
|
|
|
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
|
|
|
|
|
|
|
|
let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
|
|
|
|
|
|
|
|
let mut result = client.get(&path, None).await?;
|
|
|
|
|
|
|
|
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
|
|
|
|
|
|
|
|
list.sort_unstable_by(|a, b| {
|
|
|
|
let type_order = a.backup_type.cmp(&b.backup_type);
|
|
|
|
if type_order == std::cmp::Ordering::Equal {
|
|
|
|
a.backup_id.cmp(&b.backup_id)
|
|
|
|
} else {
|
|
|
|
type_order
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let mut errors = false;
|
|
|
|
|
|
|
|
let mut new_groups = std::collections::HashSet::new();
|
|
|
|
for item in list.iter() {
|
|
|
|
new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
|
|
|
|
}
|
|
|
|
|
2020-09-30 11:35:09 +00:00
|
|
|
let group_count = list.len();
|
|
|
|
|
2020-10-14 09:18:26 +00:00
|
|
|
for (groups_done, item) in list.into_iter().enumerate() {
|
2020-05-22 06:04:20 +00:00
|
|
|
let group = BackupGroup::new(&item.backup_type, &item.backup_id);
|
|
|
|
|
2020-10-23 11:33:21 +00:00
|
|
|
let (owner, _lock_guard) = tgt_store.create_locked_backup_group(&group, &auth_id)?;
|
2020-05-22 06:04:20 +00:00
|
|
|
// permission check
|
2020-10-23 11:33:21 +00:00
|
|
|
if auth_id != owner { // only the owner is allowed to create additional snapshots
|
2020-05-22 06:04:20 +00:00
|
|
|
worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
|
2020-10-23 11:33:21 +00:00
|
|
|
item.backup_type, item.backup_id, auth_id, owner));
|
2020-09-30 11:35:09 +00:00
|
|
|
errors = true; // do not stop here, instead continue
|
2020-05-22 06:04:20 +00:00
|
|
|
|
2020-10-14 12:22:38 +00:00
|
|
|
} else if let Err(err) = pull_group(
|
|
|
|
worker,
|
|
|
|
client,
|
|
|
|
src_repo,
|
|
|
|
tgt_store.clone(),
|
|
|
|
&group,
|
|
|
|
delete,
|
|
|
|
Some((groups_done, group_count)),
|
|
|
|
).await {
|
|
|
|
worker.log(format!(
|
|
|
|
"sync group {}/{} failed - {}",
|
|
|
|
item.backup_type,
|
|
|
|
item.backup_id,
|
|
|
|
err,
|
|
|
|
));
|
|
|
|
errors = true; // do not stop here, instead continue
|
2020-05-22 06:04:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if delete {
|
|
|
|
let result: Result<(), Error> = proxmox::try_block!({
|
|
|
|
let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
|
|
|
|
for local_group in local_groups {
|
|
|
|
if new_groups.contains(&local_group) { continue; }
|
|
|
|
worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
|
|
|
|
if let Err(err) = tgt_store.remove_backup_group(&local_group) {
|
|
|
|
worker.log(err.to_string());
|
|
|
|
errors = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
if let Err(err) = result {
|
|
|
|
worker.log(format!("error during cleanup: {}", err));
|
|
|
|
errors = true;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
if errors {
|
|
|
|
bail!("sync failed with some errors.");
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|