rename src/api2/sync.rs -> src/api2/pull.rs
Use "pull" instead of "sync", because this also indicates a direction (like "sync from").
This commit is contained in:
parent
a81af92f9d
commit
eb506c830c
|
@ -7,7 +7,7 @@ pub mod reader;
|
||||||
mod subscription;
|
mod subscription;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
pub mod version;
|
pub mod version;
|
||||||
pub mod sync;
|
pub mod pull;
|
||||||
|
|
||||||
use proxmox::api::list_subdirs_api_method;
|
use proxmox::api::list_subdirs_api_method;
|
||||||
use proxmox::api::router::SubdirMap;
|
use proxmox::api::router::SubdirMap;
|
||||||
|
@ -21,9 +21,9 @@ pub const SUBDIRS: SubdirMap = &[
|
||||||
("backup", &backup::ROUTER),
|
("backup", &backup::ROUTER),
|
||||||
("config", &config::ROUTER),
|
("config", &config::ROUTER),
|
||||||
("nodes", &NODES_ROUTER),
|
("nodes", &NODES_ROUTER),
|
||||||
|
("pull", &pull::ROUTER),
|
||||||
("reader", &reader::ROUTER),
|
("reader", &reader::ROUTER),
|
||||||
("subscription", &subscription::ROUTER),
|
("subscription", &subscription::ROUTER),
|
||||||
("sync", &sync::ROUTER),
|
|
||||||
("version", &version::ROUTER),
|
("version", &version::ROUTER),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
//! Sync datastore from remote server
|
||||||
|
|
||||||
use failure::*;
|
use failure::*;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
@ -15,9 +17,10 @@ use crate::client::*;
|
||||||
use crate::api2::types::*;
|
use crate::api2::types::*;
|
||||||
|
|
||||||
// fixme: implement filters
|
// fixme: implement filters
|
||||||
|
// fixme: delete vanished groups
|
||||||
// Todo: correctly lock backup groups
|
// Todo: correctly lock backup groups
|
||||||
|
|
||||||
async fn sync_index_chunks<I: IndexFile>(
|
async fn pull_index_chunks<I: IndexFile>(
|
||||||
_worker: &WorkerTask,
|
_worker: &WorkerTask,
|
||||||
chunk_reader: &mut RemoteChunkReader,
|
chunk_reader: &mut RemoteChunkReader,
|
||||||
target: Arc<DataStore>,
|
target: Arc<DataStore>,
|
||||||
|
@ -59,7 +62,7 @@ async fn download_manifest(
|
||||||
Ok(tmp_manifest_file)
|
Ok(tmp_manifest_file)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_single_archive(
|
async fn pull_single_archive(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: &BackupReader,
|
reader: &BackupReader,
|
||||||
chunk_reader: &mut RemoteChunkReader,
|
chunk_reader: &mut RemoteChunkReader,
|
||||||
|
@ -89,13 +92,13 @@ async fn sync_single_archive(
|
||||||
let index = DynamicIndexReader::new(tmpfile)
|
let index = DynamicIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
|
.map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
|
||||||
|
|
||||||
sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
||||||
}
|
}
|
||||||
ArchiveType::FixedIndex => {
|
ArchiveType::FixedIndex => {
|
||||||
let index = FixedIndexReader::new(tmpfile)
|
let index = FixedIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
|
.map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
|
||||||
|
|
||||||
sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
|
||||||
}
|
}
|
||||||
ArchiveType::Blob => { /* nothing to do */ }
|
ArchiveType::Blob => { /* nothing to do */ }
|
||||||
}
|
}
|
||||||
|
@ -105,7 +108,7 @@ async fn sync_single_archive(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_snapshot(
|
async fn pull_snapshot(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
tgt_store: Arc<DataStore>,
|
||||||
|
@ -184,7 +187,7 @@ async fn sync_snapshot(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sync_single_archive(
|
pull_single_archive(
|
||||||
worker,
|
worker,
|
||||||
&reader,
|
&reader,
|
||||||
&mut chunk_reader,
|
&mut chunk_reader,
|
||||||
|
@ -204,7 +207,7 @@ async fn sync_snapshot(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn sync_snapshot_from(
|
pub async fn pull_snapshot_from(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
tgt_store: Arc<DataStore>,
|
||||||
|
@ -216,7 +219,7 @@ pub async fn sync_snapshot_from(
|
||||||
if is_new {
|
if is_new {
|
||||||
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
|
worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
|
||||||
|
|
||||||
if let Err(err) = sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
|
if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
|
||||||
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
|
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
|
||||||
worker.log(format!("cleanup error - {}", cleanup_err));
|
worker.log(format!("cleanup error - {}", cleanup_err));
|
||||||
}
|
}
|
||||||
|
@ -224,13 +227,13 @@ pub async fn sync_snapshot_from(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
|
worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
|
||||||
sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
|
pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn sync_group(
|
pub async fn pull_group(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
client: &HttpClient,
|
client: &HttpClient,
|
||||||
src_repo: &BackupRepository,
|
src_repo: &BackupRepository,
|
||||||
|
@ -278,13 +281,13 @@ pub async fn sync_group(
|
||||||
|
|
||||||
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
|
let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
|
||||||
|
|
||||||
sync_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
|
pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn sync_store(
|
pub async fn pull_store(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
client: &HttpClient,
|
client: &HttpClient,
|
||||||
src_repo: &BackupRepository,
|
src_repo: &BackupRepository,
|
||||||
|
@ -319,7 +322,7 @@ pub async fn sync_store(
|
||||||
let btype = item["backup-type"].as_str().unwrap();
|
let btype = item["backup-type"].as_str().unwrap();
|
||||||
|
|
||||||
let group = BackupGroup::new(btype, id);
|
let group = BackupGroup::new(btype, id);
|
||||||
if let Err(err) = sync_group(worker, client, src_repo, tgt_store.clone(), &group).await {
|
if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group).await {
|
||||||
worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
|
worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
|
||||||
errors = true;
|
errors = true;
|
||||||
// continue
|
// continue
|
||||||
|
@ -357,8 +360,8 @@ pub async fn sync_store(
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)]
|
)]
|
||||||
/// Sync store from otherrepository
|
/// Sync store from other repository
|
||||||
async fn sync_from (
|
async fn pull (
|
||||||
store: String,
|
store: String,
|
||||||
remote_host: String,
|
remote_host: String,
|
||||||
remote_store: String,
|
remote_store: String,
|
||||||
|
@ -387,7 +390,7 @@ async fn sync_from (
|
||||||
// explicit create shared lock to prevent GC on newly created chunks
|
// explicit create shared lock to prevent GC on newly created chunks
|
||||||
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
|
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
|
||||||
|
|
||||||
sync_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
|
pull_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
|
||||||
|
|
||||||
worker.log(format!("sync datastore '{}' end", store));
|
worker.log(format!("sync datastore '{}' end", store));
|
||||||
|
|
||||||
|
@ -398,4 +401,4 @@ async fn sync_from (
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const ROUTER: Router = Router::new()
|
pub const ROUTER: Router = Router::new()
|
||||||
.post(&API_METHOD_SYNC_FROM);
|
.post(&API_METHOD_PULL);
|
|
@ -356,7 +356,7 @@ fn cert_mgmt_cli() -> CommandLineInterface {
|
||||||
#[api(
|
#[api(
|
||||||
input: {
|
input: {
|
||||||
properties: {
|
properties: {
|
||||||
store: {
|
"local-store": {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
remote: {
|
remote: {
|
||||||
|
@ -373,11 +373,11 @@ fn cert_mgmt_cli() -> CommandLineInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)]
|
)]
|
||||||
/// Start datastore sync
|
/// Sync datastore from another repository
|
||||||
async fn start_datastore_sync(
|
async fn pull_datastore(
|
||||||
store: String,
|
|
||||||
remote: String,
|
remote: String,
|
||||||
remote_store: String,
|
remote_store: String,
|
||||||
|
local_store: String,
|
||||||
output_format: Option<String>,
|
output_format: Option<String>,
|
||||||
) -> Result<Value, Error> {
|
) -> Result<Value, Error> {
|
||||||
|
|
||||||
|
@ -390,14 +390,14 @@ async fn start_datastore_sync(
|
||||||
let remote: Remote = remote_config.lookup("remote", &remote)?;
|
let remote: Remote = remote_config.lookup("remote", &remote)?;
|
||||||
|
|
||||||
let args = json!({
|
let args = json!({
|
||||||
"store": store,
|
"store": local_store,
|
||||||
"remote-host": remote.host,
|
"remote-host": remote.host,
|
||||||
"remote-user": remote.userid,
|
"remote-user": remote.userid,
|
||||||
"remote-store": remote_store,
|
"remote-store": remote_store,
|
||||||
"remote-password": remote.password,
|
"remote-password": remote.password,
|
||||||
});
|
});
|
||||||
|
|
||||||
let result = client.post("api2/json/sync", Some(args)).await?;
|
let result = client.post("api2/json/pull", Some(args)).await?;
|
||||||
|
|
||||||
view_task_result(client, result, &output_format).await?;
|
view_task_result(client, result, &output_format).await?;
|
||||||
|
|
||||||
|
@ -412,10 +412,10 @@ fn main() {
|
||||||
.insert("cert", cert_mgmt_cli())
|
.insert("cert", cert_mgmt_cli())
|
||||||
.insert("task", task_mgmt_cli())
|
.insert("task", task_mgmt_cli())
|
||||||
.insert(
|
.insert(
|
||||||
"sync",
|
"pull",
|
||||||
CliCommand::new(&API_METHOD_START_DATASTORE_SYNC)
|
CliCommand::new(&API_METHOD_PULL_DATASTORE)
|
||||||
.arg_param(&["store", "remote", "remote-store"])
|
.arg_param(&["remote", "remote-store", "local-store"])
|
||||||
.completion_cb("store", config::datastore::complete_datastore_name)
|
.completion_cb("local-store", config::datastore::complete_datastore_name)
|
||||||
.completion_cb("remote", config::remotes::complete_remote_name)
|
.completion_cb("remote", config::remotes::complete_remote_name)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue