src/api2/pull.rs: implement delete flag for vanished groups
This commit is contained in:
		@ -240,6 +240,7 @@ pub async fn pull_group(
 | 
			
		||||
    src_repo: &BackupRepository,
 | 
			
		||||
    tgt_store: Arc<DataStore>,
 | 
			
		||||
    group: &BackupGroup,
 | 
			
		||||
    delete: bool,
 | 
			
		||||
) -> Result<(), Error> {
 | 
			
		||||
 | 
			
		||||
    let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
 | 
			
		||||
@ -285,6 +286,10 @@ pub async fn pull_group(
 | 
			
		||||
        pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if delete {
 | 
			
		||||
        // fixme: implement me
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -293,6 +298,7 @@ pub async fn pull_store(
 | 
			
		||||
    client: &HttpClient,
 | 
			
		||||
    src_repo: &BackupRepository,
 | 
			
		||||
    tgt_store: Arc<DataStore>,
 | 
			
		||||
    delete: bool,
 | 
			
		||||
) -> Result<(), Error> {
 | 
			
		||||
 | 
			
		||||
    let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
 | 
			
		||||
@ -312,13 +318,35 @@ pub async fn pull_store(
 | 
			
		||||
 | 
			
		||||
    let mut errors = false;
 | 
			
		||||
 | 
			
		||||
    let mut new_groups = std::collections::HashSet::new();
 | 
			
		||||
 | 
			
		||||
    for item in list {
 | 
			
		||||
        let group = BackupGroup::new(&item.backup_type, &item.backup_id);
 | 
			
		||||
        if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group).await {
 | 
			
		||||
        if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
 | 
			
		||||
            worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
 | 
			
		||||
            errors = true;
 | 
			
		||||
            // continue
 | 
			
		||||
            // do not stop here, instead continue
 | 
			
		||||
        }
 | 
			
		||||
        new_groups.insert(group);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if delete {
 | 
			
		||||
        let result: Result<(), Error> = proxmox::tools::try_block!({
 | 
			
		||||
            let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
 | 
			
		||||
            for local_group in local_groups {
 | 
			
		||||
                if new_groups.contains(&local_group) { continue; }
 | 
			
		||||
                worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
 | 
			
		||||
                if let Err(err) = tgt_store.remove_backup_group(&local_group) {
 | 
			
		||||
                    worker.log(format!("delete failed: {}", err));
 | 
			
		||||
                    errors = true;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            Ok(())
 | 
			
		||||
        });
 | 
			
		||||
        if let Err(err) = result {
 | 
			
		||||
            worker.log(format!("error during cleanup: {}", err));
 | 
			
		||||
            errors = true;
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if errors {
 | 
			
		||||
@ -340,6 +368,12 @@ pub async fn pull_store(
 | 
			
		||||
            "remote-store": {
 | 
			
		||||
                schema: DATASTORE_SCHEMA,
 | 
			
		||||
            },
 | 
			
		||||
            delete: {
 | 
			
		||||
                description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
 | 
			
		||||
                type: Boolean,
 | 
			
		||||
                optional: true,
 | 
			
		||||
                default: true,
 | 
			
		||||
            },
 | 
			
		||||
        },
 | 
			
		||||
    },
 | 
			
		||||
)]
 | 
			
		||||
@ -348,12 +382,15 @@ async fn pull (
 | 
			
		||||
    store: String,
 | 
			
		||||
    remote: String,
 | 
			
		||||
    remote_store: String,
 | 
			
		||||
    delete: Option<bool>,
 | 
			
		||||
    _info: &ApiMethod,
 | 
			
		||||
    rpcenv: &mut dyn RpcEnvironment,
 | 
			
		||||
) -> Result<String, Error> {
 | 
			
		||||
 | 
			
		||||
    let username = rpcenv.get_user().unwrap();
 | 
			
		||||
 | 
			
		||||
    let delete = delete.unwrap_or(true);
 | 
			
		||||
 | 
			
		||||
    let tgt_store = DataStore::lookup_datastore(&store)?;
 | 
			
		||||
 | 
			
		||||
    let (remote_config, _digest) = remote::config()?;
 | 
			
		||||
@ -374,7 +411,7 @@ async fn pull (
 | 
			
		||||
        // explicit create shared lock to prevent GC on newly created chunks
 | 
			
		||||
        let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
 | 
			
		||||
 | 
			
		||||
        pull_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
 | 
			
		||||
        pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?;
 | 
			
		||||
 | 
			
		||||
        worker.log(format!("sync datastore '{}' end", store));
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
 | 
			
		||||
use failure::*;
 | 
			
		||||
use lazy_static::lazy_static;
 | 
			
		||||
 | 
			
		||||
use super::backup_info::BackupDir;
 | 
			
		||||
use super::backup_info::{BackupGroup, BackupDir};
 | 
			
		||||
use super::chunk_store::{ChunkStore, GarbageCollectionStatus};
 | 
			
		||||
use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
 | 
			
		||||
use super::fixed_index::{FixedIndexReader, FixedIndexWriter};
 | 
			
		||||
@ -168,6 +168,20 @@ impl DataStore {
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove a complete backup group including all snapshots
 | 
			
		||||
    pub fn remove_backup_group(&self, backup_group: &BackupGroup,
 | 
			
		||||
    ) ->  Result<(), io::Error> {
 | 
			
		||||
 | 
			
		||||
        let mut full_path = self.base_path();
 | 
			
		||||
        full_path.push(backup_group.group_path());
 | 
			
		||||
 | 
			
		||||
        log::info!("removing backup group {:?}", full_path);
 | 
			
		||||
        std::fs::remove_dir_all(full_path)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove a backup directory including all content
 | 
			
		||||
    pub fn remove_backup_dir(&self, backup_dir: &BackupDir,
 | 
			
		||||
    ) ->  Result<(), io::Error> {
 | 
			
		||||
 | 
			
		||||
@ -386,6 +386,7 @@ fn cert_mgmt_cli() -> CommandLineInterface {
 | 
			
		||||
    cmd_def.into()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// fixme: avoid API redefinition
 | 
			
		||||
#[api(
 | 
			
		||||
   input: {
 | 
			
		||||
        properties: {
 | 
			
		||||
@ -398,6 +399,12 @@ fn cert_mgmt_cli() -> CommandLineInterface {
 | 
			
		||||
            "remote-store": {
 | 
			
		||||
                schema: DATASTORE_SCHEMA,
 | 
			
		||||
            },
 | 
			
		||||
            delete: {
 | 
			
		||||
                description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
 | 
			
		||||
                type: Boolean,
 | 
			
		||||
                optional: true,
 | 
			
		||||
                default: true,
 | 
			
		||||
            },
 | 
			
		||||
            "output-format": {
 | 
			
		||||
                schema: OUTPUT_FORMAT,
 | 
			
		||||
                optional: true,
 | 
			
		||||
@ -410,6 +417,7 @@ async fn pull_datastore(
 | 
			
		||||
    remote: String,
 | 
			
		||||
    remote_store: String,
 | 
			
		||||
    local_store: String,
 | 
			
		||||
    delete: Option<bool>,
 | 
			
		||||
    output_format: Option<String>,
 | 
			
		||||
) -> Result<Value, Error> {
 | 
			
		||||
 | 
			
		||||
@ -417,12 +425,16 @@ async fn pull_datastore(
 | 
			
		||||
 | 
			
		||||
    let mut client = connect()?;
 | 
			
		||||
 | 
			
		||||
    let args = json!({
 | 
			
		||||
    let mut args = json!({
 | 
			
		||||
        "store": local_store,
 | 
			
		||||
        "remote": remote,
 | 
			
		||||
        "remote-store": remote_store,
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    if let Some(delete) = delete {
 | 
			
		||||
        args["delete"] = delete.into();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let result = client.post("api2/json/pull", Some(args)).await?;
 | 
			
		||||
 | 
			
		||||
    view_task_result(client, result, &output_format).await?;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user