proxmox-backup/src/api2/pull.rs

473 lines
15 KiB
Rust
Raw Normal View History

//! Sync datastore from remote server
use anyhow::{bail, format_err, Error};
use serde_json::json;
use std::convert::TryFrom;
use std::sync::Arc;
use std::collections::HashMap;
use std::io::{Seek, SeekFrom};
use chrono::{Utc, TimeZone};
use proxmox::api::api;
use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
use crate::server::{WorkerTask};
use crate::backup::*;
use crate::client::*;
use crate::config::remote;
use crate::api2::types::*;
use crate::config::acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ};
use crate::config::cached_user_info::CachedUserInfo;
// fixme: implement filters
// fixme: delete vanished groups
// Todo: correctly lock backup groups
async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask,
chunk_reader: &mut RemoteChunkReader,
target: Arc<DataStore>,
index: I,
) -> Result<(), Error> {
for pos in 0..index.index_count() {
let digest = index.index_digest(pos).unwrap();
let chunk_exists = target.cond_touch_chunk(digest, false)?;
if chunk_exists {
//worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
continue;
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&digest)?;
target.insert_chunk(&chunk, &digest)?;
}
Ok(())
}
async fn download_manifest(
reader: &BackupReader,
filename: &std::path::Path,
) -> Result<std::fs::File, Error> {
let tmp_manifest_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.read(true)
.open(&filename)?;
let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
tmp_manifest_file.seek(SeekFrom::Start(0))?;
Ok(tmp_manifest_file)
}
async fn pull_single_archive(
worker: &WorkerTask,
reader: &BackupReader,
chunk_reader: &mut RemoteChunkReader,
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
archive_name: &str,
) -> Result<(), Error> {
let mut path = tgt_store.base_path();
path.push(snapshot.relative_path());
path.push(archive_name);
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
worker.log(format!("sync archive {}", archive_name));
let tmpfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.read(true)
.open(&tmp_path)?;
let tmpfile = reader.download(archive_name, tmpfile).await?;
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
}
ArchiveType::Blob => { /* nothing to do */ }
}
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
Ok(())
}
async fn pull_snapshot(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
) -> Result<(), Error> {
let mut manifest_name = tgt_store.base_path();
manifest_name.push(snapshot.relative_path());
manifest_name.push(MANIFEST_BLOB_NAME);
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
tmp_manifest_blob.verify_crc()?;
if manifest_name.exists() {
let manifest_blob = proxmox::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name)
.map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
let manifest_blob = DataBlob::load(&mut manifest_file)?;
manifest_blob.verify_crc()?;
Ok(manifest_blob)
}).map_err(|err: Error| {
format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
})?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
return Ok(()); // nothing changed
}
}
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
for item in manifest.files() {
let mut path = tgt_store.base_path();
path.push(snapshot.relative_path());
path.push(&item.filename);
if path.exists() {
match archive_type(&item.filename)? {
ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::open(&path)?;
let (csum, size) = index.compute_csum();
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err));
}
}
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::open(&path)?;
let (csum, size) = index.compute_csum();
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err));
}
}
}
ArchiveType::Blob => {
let mut tmpfile = std::fs::File::open(&path)?;
let (csum, size) = compute_file_csum(&mut tmpfile)?;
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
worker.log(format!("detected changed file {:?} - {}", path, err));
}
}
}
}
}
pull_single_archive(
worker,
&reader,
&mut chunk_reader,
tgt_store.clone(),
snapshot,
&item.filename,
).await?;
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
}
// cleanup - remove stale files
tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
Ok(())
}
pub async fn pull_snapshot_from(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
snapshot: &BackupDir,
) -> Result<(), Error> {
let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
if is_new {
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
worker.log(format!("cleanup error - {}", cleanup_err));
}
return Err(err);
}
} else {
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
}
Ok(())
}
pub async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
src_repo: &BackupRepository,
tgt_store: Arc<DataStore>,
group: &BackupGroup,
delete: bool,
) -> Result<(), Error> {
let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
let args = json!({
"backup-type": group.backup_type(),
"backup-id": group.backup_id(),
});
let mut result = client.get(&path, Some(args)).await?;
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
let auth_info = client.login().await?;
let fingerprint = client.fingerprint();
let last_sync = tgt_store.last_successful_backup(group)?;
let mut remote_snapshots = std::collections::HashSet::new();
for item in list {
let backup_time = Utc.timestamp(item.backup_time, 0);
remote_snapshots.insert(backup_time);
if let Some(last_sync_time) = last_sync {
if last_sync_time > backup_time { continue; }
}
let options = HttpClientOptions::new()
.password(Some(auth_info.ticket.clone()))
.fingerprint(fingerprint.clone());
let new_client = HttpClient::new(src_repo.host(), src_repo.user(), options)?;
let reader = BackupReader::start(
new_client,
None,
src_repo.store(),
&item.backup_type,
&item.backup_id,
backup_time,
true,
).await?;
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
}
if delete {
let local_list = group.list_backups(&tgt_store.base_path())?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) { continue; }
worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
tgt_store.remove_backup_dir(&info.backup_dir)?;
}
}
Ok(())
}
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
src_repo: &BackupRepository,
tgt_store: Arc<DataStore>,
delete: bool,
username: String,
) -> Result<(), Error> {
let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
let mut result = client.get(&path, None).await?;
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
list.sort_unstable_by(|a, b| {
let type_order = a.backup_type.cmp(&b.backup_type);
if type_order == std::cmp::Ordering::Equal {
a.backup_id.cmp(&b.backup_id)
} else {
type_order
}
});
let mut errors = false;
let mut new_groups = std::collections::HashSet::new();
for item in list.iter() {
new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
}
for item in list {
let group = BackupGroup::new(&item.backup_type, &item.backup_id);
let owner = tgt_store.create_backup_group(&group, &username)?;
// permission check
if owner != username { // only the owner is allowed to create additional snapshots
worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
item.backup_type, item.backup_id, username, owner));
errors = true;
continue; // do not stop here, instead continue
}
if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
errors = true;
continue; // do not stop here, instead continue
}
}
if delete {
let result: Result<(), Error> = proxmox::try_block!({
let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
for local_group in local_groups {
if new_groups.contains(&local_group) { continue; }
worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
if let Err(err) = tgt_store.remove_backup_group(&local_group) {
worker.log(err.to_string());
errors = true;
}
}
Ok(())
});
if let Err(err) = result {
worker.log(format!("error during cleanup: {}", err));
errors = true;
};
}
if errors {
bail!("sync failed with some errors.");
}
Ok(())
}
#[api(
input: {
properties: {
store: {
schema: DATASTORE_SCHEMA,
},
remote: {
schema: REMOTE_ID_SCHEMA,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
},
delete: {
description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
type: Boolean,
optional: true,
default: true,
},
},
},
access: {
// Note: used parameters are no uri parameters, so we need to test inside function body
description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
"###,
permission: &Permission::Anybody,
},
)]
/// Sync store from other repository
async fn pull (
store: String,
remote: String,
remote_store: String,
delete: Option<bool>,
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let user_info = CachedUserInfo::new()?;
let username = rpcenv.get_user().unwrap();
user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_BACKUP, false)?;
user_info.check_privs(&username, &["remote", &remote, &remote_store], PRIV_REMOTE_READ, false)?;
let delete = delete.unwrap_or(true);
if delete {
user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_PRUNE, false)?;
}
let tgt_store = DataStore::lookup_datastore(&store)?;
let (remote_config, _digest) = remote::config()?;
let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
let options = HttpClientOptions::new()
.password(Some(remote.password.clone()))
.fingerprint(remote.fingerprint.clone());
let client = HttpClient::new(&remote.host, &remote.userid, options)?;
let _auth_info = client.login() // make sure we can auth
.await
.map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
// fixme: set to_stdout to false?
let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
worker.log(format!("sync datastore '{}' start", store));
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, username).await?;
worker.log(format!("sync datastore '{}' end", store));
Ok(())
})?;
Ok(upid_str)
}
pub const ROUTER: Router = Router::new()
.post(&API_METHOD_PULL);