server: rustfmt
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
@ -8,27 +8,28 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use serde_json::json;
|
||||
use http::StatusCode;
|
||||
use serde_json::json;
|
||||
|
||||
use proxmox_router::HttpError;
|
||||
use proxmox_sys::task_log;
|
||||
|
||||
use pbs_api_types::{
|
||||
Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
|
||||
Operation, SnapshotListItem,
|
||||
Authid, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
|
||||
};
|
||||
|
||||
use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
|
||||
use pbs_client::{
|
||||
BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
|
||||
};
|
||||
use pbs_datastore::data_blob::DataBlob;
|
||||
use pbs_datastore::dynamic_index::DynamicIndexReader;
|
||||
use pbs_datastore::fixed_index::FixedIndexReader;
|
||||
use pbs_datastore::index::IndexFile;
|
||||
use pbs_datastore::manifest::{
|
||||
CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
|
||||
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
|
||||
};
|
||||
use pbs_datastore::{BackupDir, BackupGroup, BackupInfo, DataStore, StoreProgress};
|
||||
use pbs_tools::sha::sha256;
|
||||
use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
|
||||
use proxmox_rest_server::WorkerTask;
|
||||
|
||||
use crate::tools::parallel_handler::ParallelHandler;
|
||||
@ -71,7 +72,15 @@ impl PullParameters {
|
||||
remote_store.to_string(),
|
||||
);
|
||||
|
||||
Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
|
||||
Ok(Self {
|
||||
remote,
|
||||
source,
|
||||
store,
|
||||
owner,
|
||||
remove_vanished,
|
||||
group_filter,
|
||||
limit,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn client(&self) -> Result<HttpClient, Error> {
|
||||
@ -163,7 +172,7 @@ async fn pull_index_chunks<I: IndexFile>(
|
||||
let bytes = bytes.load(Ordering::SeqCst);
|
||||
|
||||
task_log!(
|
||||
worker,
|
||||
worker,
|
||||
"downloaded {} bytes ({:.2} MiB/s)",
|
||||
bytes,
|
||||
(bytes as f64) / (1024.0 * 1024.0 * elapsed)
|
||||
@ -495,7 +504,11 @@ pub async fn pull_snapshot_from(
|
||||
downloaded_chunks,
|
||||
)
|
||||
.await?;
|
||||
task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
|
||||
task_log!(
|
||||
worker,
|
||||
"re-sync snapshot {:?} done",
|
||||
snapshot.relative_path()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -524,13 +537,11 @@ impl SkipInfo {
|
||||
match self.count {
|
||||
0 => Ok(String::new()),
|
||||
1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
|
||||
_ => {
|
||||
Ok(format!(
|
||||
"{} .. {}",
|
||||
proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
|
||||
proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
|
||||
))
|
||||
}
|
||||
_ => Ok(format!(
|
||||
"{} .. {}",
|
||||
proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
|
||||
proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -553,7 +564,10 @@ pub async fn pull_group(
|
||||
group: &BackupGroup,
|
||||
progress: &mut StoreProgress,
|
||||
) -> Result<(), Error> {
|
||||
let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
|
||||
let path = format!(
|
||||
"api2/json/admin/datastore/{}/snapshots",
|
||||
params.source.store()
|
||||
);
|
||||
|
||||
let args = json!({
|
||||
"backup-type": group.backup_type(),
|
||||
@ -589,7 +603,11 @@ pub async fn pull_group(
|
||||
|
||||
// in-progress backups can't be synced
|
||||
if item.size.is_none() {
|
||||
task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
|
||||
task_log!(
|
||||
worker,
|
||||
"skipping snapshot {} - in-progress backup",
|
||||
snapshot
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -607,8 +625,9 @@ pub async fn pull_group(
|
||||
// get updated auth_info (new tickets)
|
||||
let auth_info = client.login().await?;
|
||||
|
||||
let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
|
||||
.rate_limit(params.limit.clone());
|
||||
let options =
|
||||
HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
|
||||
.rate_limit(params.limit.clone());
|
||||
|
||||
let new_client = HttpClient::new(
|
||||
params.source.host(),
|
||||
@ -658,7 +677,11 @@ pub async fn pull_group(
|
||||
);
|
||||
continue;
|
||||
}
|
||||
task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
|
||||
task_log!(
|
||||
worker,
|
||||
"delete vanished snapshot {:?}",
|
||||
info.backup_dir.relative_path()
|
||||
);
|
||||
params.store.remove_backup_dir(&info.backup_dir, false)?;
|
||||
}
|
||||
}
|
||||
@ -698,25 +721,26 @@ pub async fn pull_store(
|
||||
});
|
||||
|
||||
let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
|
||||
filters
|
||||
.iter()
|
||||
.any(|filter| group.matches(filter))
|
||||
filters.iter().any(|filter| group.matches(filter))
|
||||
};
|
||||
|
||||
let list:Vec<BackupGroup> = list
|
||||
let list: Vec<BackupGroup> = list
|
||||
.into_iter()
|
||||
.map(|item| BackupGroup::new(item.backup_type, item.backup_id))
|
||||
.collect();
|
||||
|
||||
let list = if let Some(ref group_filter) = ¶ms.group_filter {
|
||||
let unfiltered_count = list.len();
|
||||
let list:Vec<BackupGroup> = list
|
||||
let list: Vec<BackupGroup> = list
|
||||
.into_iter()
|
||||
.filter(|group| {
|
||||
apply_filters(group, group_filter)
|
||||
})
|
||||
.filter(|group| apply_filters(group, group_filter))
|
||||
.collect();
|
||||
task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
|
||||
task_log!(
|
||||
worker,
|
||||
"found {} groups to sync (out of {} total)",
|
||||
list.len(),
|
||||
unfiltered_count
|
||||
);
|
||||
list
|
||||
} else {
|
||||
task_log!(worker, "found {} groups to sync", total_count);
|
||||
@ -737,13 +761,17 @@ pub async fn pull_store(
|
||||
progress.done_snapshots = 0;
|
||||
progress.group_snapshots = 0;
|
||||
|
||||
let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) {
|
||||
let (owner, _lock_guard) = match params
|
||||
.store
|
||||
.create_locked_backup_group(&group, ¶ms.owner)
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
task_log!(
|
||||
worker,
|
||||
"sync group {} failed - group lock failed: {}",
|
||||
&group, err
|
||||
&group,
|
||||
err
|
||||
);
|
||||
errors = true; // do not stop here, instead continue
|
||||
continue;
|
||||
@ -756,23 +784,13 @@ pub async fn pull_store(
|
||||
task_log!(
|
||||
worker,
|
||||
"sync group {} failed - owner check failed ({} != {})",
|
||||
&group, params.owner, owner
|
||||
&group,
|
||||
params.owner,
|
||||
owner
|
||||
);
|
||||
errors = true; // do not stop here, instead continue
|
||||
} else if let Err(err) = pull_group(
|
||||
worker,
|
||||
client,
|
||||
params,
|
||||
&group,
|
||||
&mut progress,
|
||||
)
|
||||
.await
|
||||
{
|
||||
task_log!(
|
||||
worker,
|
||||
"sync group {} failed - {}",
|
||||
&group, err,
|
||||
);
|
||||
} else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
|
||||
task_log!(worker, "sync group {} failed - {}", &group, err,);
|
||||
errors = true; // do not stop here, instead continue
|
||||
}
|
||||
}
|
||||
@ -796,10 +814,14 @@ pub async fn pull_store(
|
||||
local_group.backup_id()
|
||||
);
|
||||
match params.store.remove_backup_group(&local_group) {
|
||||
Ok(true) => {},
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
|
||||
},
|
||||
task_log!(
|
||||
worker,
|
||||
"kept some protected snapshots of group '{}'",
|
||||
local_group
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
task_log!(worker, "{}", err);
|
||||
errors = true;
|
||||
|
Reference in New Issue
Block a user