src/bin/proxmox-backup-client.rs: use new BackupClient to upload data
This commit is contained in:
parent
10241c20ea
commit
c4ff3dcefd
@ -25,6 +25,7 @@ use xdg::BaseDirectories;
|
|||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap();
|
static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap();
|
||||||
@ -106,37 +107,34 @@ fn complete_repository(_arg: &str, _param: &HashMap<String, String>) -> Vec<Stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn backup_directory<P: AsRef<Path>>(
|
fn backup_directory<P: AsRef<Path>>(
|
||||||
client: &mut HttpClient,
|
client: &BackupClient,
|
||||||
repo: &BackupRepository,
|
|
||||||
dir_path: P,
|
dir_path: P,
|
||||||
archive_name: &str,
|
archive_name: &str,
|
||||||
backup_id: &str,
|
|
||||||
backup_time: DateTime<Local>,
|
|
||||||
chunk_size: Option<u64>,
|
chunk_size: Option<u64>,
|
||||||
all_file_systems: bool,
|
all_file_systems: bool,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut param = json!({
|
if let Some(_size) = chunk_size {
|
||||||
"archive-name": archive_name,
|
unimplemented!();
|
||||||
"backup-type": "host",
|
|
||||||
"backup-id": backup_id,
|
|
||||||
"backup-time": backup_time.timestamp(),
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(size) = chunk_size {
|
|
||||||
param["chunk-size"] = size.into();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let query = tools::json_object_to_query(param)?;
|
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
|
||||||
|
let chunk_stream = ChunkStream::new(pxar_stream);
|
||||||
|
|
||||||
let path = format!("api2/json/admin/datastore/{}/pxar?{}", repo.store(), query);
|
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
|
||||||
|
|
||||||
let stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
|
let stream = rx
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(|x| x); // flatten
|
||||||
|
|
||||||
let body = Body::wrap_stream(stream);
|
// spawn chunker inside a separate task so that it can run parallel
|
||||||
|
tokio::spawn(
|
||||||
|
tx.send_all(chunk_stream.then(|r| Ok(r)))
|
||||||
|
.map_err(|e| {}).map(|_| ())
|
||||||
|
);
|
||||||
|
|
||||||
client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?;
|
client.upload_stream(archive_name, stream, "dynamic", None).wait()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -440,20 +438,22 @@ fn create_backup(
|
|||||||
|
|
||||||
let backup_time = Local.timestamp(Local::now().timestamp(), 0);
|
let backup_time = Local.timestamp(Local::now().timestamp(), 0);
|
||||||
|
|
||||||
let mut client = HttpClient::new(repo.host(), repo.user())?;
|
let client = HttpClient::new(repo.host(), repo.user())?;
|
||||||
|
|
||||||
record_repository(&repo);
|
record_repository(&repo);
|
||||||
|
|
||||||
println!("Starting backup");
|
println!("Starting backup");
|
||||||
println!("Client name: {}", tools::nodename());
|
println!("Client name: {}", tools::nodename());
|
||||||
println!("Start Time: {}", backup_time.to_rfc3339());
|
println!("Start Time: {}", backup_time.to_rfc3339());
|
||||||
|
|
||||||
|
let client = client.start_backup(repo.store(), "host", &backup_id).wait()?;
|
||||||
|
|
||||||
for (filename, target) in upload_list {
|
for (filename, target) in upload_list {
|
||||||
println!("Upload '{}' to '{:?}' as {}", filename, repo, target);
|
println!("Upload '{}' to '{:?}' as {}", filename, repo, target);
|
||||||
backup_directory(&mut client, &repo, &filename, &target, backup_id, backup_time,
|
backup_directory(&client, &filename, &target, chunk_size_opt, all_file_systems, verbose)?;
|
||||||
chunk_size_opt, all_file_systems, verbose)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client.finish().wait()?;
|
||||||
|
|
||||||
let end_time = Local.timestamp(Local::now().timestamp(), 0);
|
let end_time = Local.timestamp(Local::now().timestamp(), 0);
|
||||||
let elapsed = end_time.signed_duration_since(backup_time);
|
let elapsed = end_time.signed_duration_since(backup_time);
|
||||||
println!("Duration: {}", elapsed);
|
println!("Duration: {}", elapsed);
|
||||||
|
Loading…
Reference in New Issue
Block a user