From e9722f8bdede544df7ab5f871813deb12832aeca Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 28 Aug 2019 17:20:32 +0200 Subject: [PATCH] src/bin/proxmox-backup-client.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/bin/proxmox-backup-client.rs | 389 +++++++++++++++++-------------- 1 file changed, 220 insertions(+), 169 deletions(-) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 9f402a54..507c8476 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -150,7 +150,7 @@ fn complete_repository(_arg: &str, _param: &HashMap) -> Vec>( +async fn backup_directory>( client: &BackupClient, dir_path: P, archive_name: &str, @@ -163,26 +163,26 @@ fn backup_directory>( ) -> Result { let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?; - let chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); - let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks + let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = rx - .map_err(Error::from) - .and_then(|x| x); // flatten + .map_err(Error::from); // spawn chunker inside a separate task so that it can run parallel - tokio::spawn( - tx.send_all(chunk_stream.then(|r| Ok(r))) - .map_err(|_| {}).map(|_| ()) - ); + tokio::spawn(async move { + let _ = tx.send_all(&mut chunk_stream).await; + }); - let stats = client.upload_stream(archive_name, stream, "dynamic", None, crypt_config).wait()?; + let stats = client + .upload_stream(archive_name, stream, "dynamic", None, crypt_config) + .await?; Ok(stats) } -fn backup_image>( +async fn backup_image>( client: &BackupClient, image_path: P, archive_name: &str, @@ -194,14 +194,16 @@ fn backup_image>( let path = image_path.as_ref().to_owned(); - let file = tokio::fs::File::open(path).wait()?; + let file = tokio::fs::File::open(path).await?; let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) .map_err(Error::from); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); - let stats = client.upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config).wait()?; + let stats = client + .upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config) + .await?; Ok(stats) } @@ -231,7 +233,9 @@ fn list_backup_groups( let path = format!("api2/json/admin/datastore/{}/groups", repo.store()); - let mut result = client.get(&path, None).wait()?; + let mut result = async_main(async move { + client.get(&path, None).await + })?; record_repository(&repo); @@ -316,7 +320,9 @@ fn list_snapshots( args["backup-id"] = group.backup_id().into(); } - let result = client.get(&path, Some(args)).wait()?; + let result = async_main(async move { + client.get(&path, Some(args)).await + })?; record_repository(&repo); @@ -378,11 +384,13 @@ fn forget_snapshots( let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = client.delete(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).wait()?; + let result = async_main(async move { + client.delete(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await + })?; record_repository(&repo); @@ -398,7 +406,7 @@ fn api_login( let repo = extract_repository_from_value(¶m)?; let client = HttpClient::new(repo.host(), repo.user())?; - client.login().wait()?; + async_main(async move { client.login().await })?; record_repository(&repo); @@ -441,28 +449,32 @@ fn dump_catalog( let client = HttpClient::new(repo.host(), repo.user())?; - let client = client.start_backup_reader( - repo.store(), - &snapshot.group().backup_type(), - &snapshot.group().backup_id(), - snapshot.backup_time(), true).wait()?; + async_main(async move { + let client = client.start_backup_reader( + repo.store(), + &snapshot.group().backup_type(), + &snapshot.group().backup_id(), + snapshot.backup_time(), true).await?; - let blob_file = std::fs::OpenOptions::new() - .read(true) - .write(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; + let blob_file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; - let mut blob_file = client.download("catalog.blob", blob_file).wait()?; + let mut blob_file = client.download("catalog.blob", blob_file).await?; - blob_file.seek(SeekFrom::Start(0))?; + blob_file.seek(SeekFrom::Start(0))?; - let reader = BufReader::new(blob_file); - let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?; + let reader = BufReader::new(blob_file); + let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?; - catalog_reader.dump()?; + catalog_reader.dump()?; - record_repository(&repo); + record_repository(&repo); + + Ok::<(), Error>(()) + })?; Ok(Value::Null) } @@ -484,11 +496,13 @@ fn list_snapshot_files( let path = format!("api2/json/admin/datastore/{}/files", repo.store()); - let mut result = client.get(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).wait()?; + let mut result = async_main(async move { + client.get(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await + })?; record_repository(&repo); @@ -521,7 +535,7 @@ fn start_garbage_collection( let path = format!("api2/json/admin/datastore/{}/gc", repo.store()); - let result = client.post(&path, None).wait()?; + let result = async_main(async move { client.post(&path, None).await })?; record_repository(&repo); @@ -672,127 +686,139 @@ fn create_backup( } }; - let client = client.start_backup(repo.store(), backup_type, &backup_id, backup_time, verbose).wait()?; + async_main(async move { + let client = client + .start_backup(repo.store(), backup_type, &backup_id, backup_time, verbose) + .await?; - let mut file_list = vec![]; + let mut file_list = vec![]; - // fixme: encrypt/sign catalog? - let catalog_file = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; + // fixme: encrypt/sign catalog? + let catalog_file = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; - let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?)); - let mut upload_catalog = false; + let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?)); + let mut upload_catalog = false; - for (backup_type, filename, target, size) in upload_list { - match backup_type { - BackupType::CONFIG => { - println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client.upload_blob_from_file(&filename, &target, crypt_config.clone(), true).wait()?; - file_list.push((target, stats)); - } - BackupType::LOGFILE => { // fixme: remove - not needed anymore ? - println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client.upload_blob_from_file(&filename, &target, crypt_config.clone(), true).wait()?; - file_list.push((target, stats)); - } - BackupType::PXAR => { - upload_catalog = true; - println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); - catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; - let stats = backup_directory( - &client, - &filename, - &target, - chunk_size_opt, - devices.clone(), - verbose, - skip_lost_and_found, - crypt_config.clone(), - catalog.clone(), - )?; - file_list.push((target, stats)); - catalog.lock().unwrap().end_directory()?; - } - BackupType::IMAGE => { - println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); - let stats = backup_image( - &client, - &filename, - &target, - size, - chunk_size_opt, - verbose, - crypt_config.clone(), - )?; - file_list.push((target, stats)); + for (backup_type, filename, target, size) in upload_list { + match backup_type { + BackupType::CONFIG => { + println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .await?; + file_list.push((target, stats)); + } + BackupType::LOGFILE => { // fixme: remove - not needed anymore ? + println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .await?; + file_list.push((target, stats)); + } + BackupType::PXAR => { + upload_catalog = true; + println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); + catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; + let stats = backup_directory( + &client, + &filename, + &target, + chunk_size_opt, + devices.clone(), + verbose, + skip_lost_and_found, + crypt_config.clone(), + catalog.clone(), + ).await?; + file_list.push((target, stats)); + catalog.lock().unwrap().end_directory()?; + } + BackupType::IMAGE => { + println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); + let stats = backup_image( + &client, + &filename, + &target, + size, + chunk_size_opt, + verbose, + crypt_config.clone(), + ).await?; + file_list.push((target, stats)); + } } } - } - // finalize and upload catalog - if upload_catalog { - let mutex = Arc::try_unwrap(catalog) - .map_err(|_| format_err!("unable to get catalog (still used)"))?; - let mut catalog_file = mutex.into_inner().unwrap().finish()?; + // finalize and upload catalog + if upload_catalog { + let mutex = Arc::try_unwrap(catalog) + .map_err(|_| format_err!("unable to get catalog (still used)"))?; + let mut catalog_file = mutex.into_inner().unwrap().finish()?; - let target = "catalog.blob"; + let target = "catalog.blob"; - catalog_file.seek(SeekFrom::Start(0))?; + catalog_file.seek(SeekFrom::Start(0))?; - let stats = client.upload_blob(catalog_file, target).wait()?; - file_list.push((target.to_owned(), stats)); - } + let stats = client.upload_blob(catalog_file, target).await?; + file_list.push((target.to_owned(), stats)); + } - if let Some(rsa_encrypted_key) = rsa_encrypted_key { - let target = "rsa-encrypted.key"; - println!("Upload RSA encoded key to '{:?}' as {}", repo, target); - let stats = client.upload_blob_from_data(rsa_encrypted_key, target, None, false, false).wait()?; - file_list.push((format!("{}.blob", target), stats)); + if let Some(rsa_encrypted_key) = rsa_encrypted_key { + let target = "rsa-encrypted.key"; + println!("Upload RSA encoded key to '{:?}' as {}", repo, target); + let stats = client + .upload_blob_from_data(rsa_encrypted_key, target, None, false, false) + .await?; + file_list.push((format!("{}.blob", target), stats)); - // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t - /* - let mut buffer2 = vec![0u8; rsa.size() as usize]; - let pem_data = file_get_contents("master-private.pem")?; - let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; - let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; - println!("TEST {} {:?}", len, buffer2); - */ - } + // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t + /* + let mut buffer2 = vec![0u8; rsa.size() as usize]; + let pem_data = file_get_contents("master-private.pem")?; + let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; + let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; + println!("TEST {} {:?}", len, buffer2); + */ + } - // create index.json - let file_list = file_list.iter() - .fold(vec![], |mut acc, (filename, stats)| { - acc.push(json!({ - "filename": filename, - "size": stats.size, - "csum": proxmox::tools::digest_to_hex(&stats.csum), - })); - acc + // create index.json + let file_list = file_list.iter() + .fold(vec![], |mut acc, (filename, stats)| { + acc.push(json!({ + "filename": filename, + "size": stats.size, + "csum": proxmox::tools::digest_to_hex(&stats.csum), + })); + acc + }); + + let index = json!({ + "backup-type": backup_type, + "backup-id": backup_id, + "backup-time": backup_time.timestamp(), + "files": file_list, }); - let index = json!({ - "backup-type": backup_type, - "backup-id": backup_id, - "backup-time": backup_time.timestamp(), - "files": file_list, - }); + println!("Upload index.json to '{:?}'", repo); + let index_data = serde_json::to_string_pretty(&index)?.into(); + client + .upload_blob_from_data(index_data, "index.json.blob", crypt_config.clone(), true, true) + .await?; - println!("Upload index.json to '{:?}'", repo); - let index_data = serde_json::to_string_pretty(&index)?.into(); - client.upload_blob_from_data(index_data, "index.json.blob", crypt_config.clone(), true, true).wait()?; + client.finish().await?; - client.finish().wait()?; + let end_time = Local::now(); + let elapsed = end_time.signed_duration_since(start_time); + println!("Duration: {}", elapsed); - let end_time = Local::now(); - let elapsed = end_time.signed_duration_since(start_time); - println!("Duration: {}", elapsed); + println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); - println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); - - Ok(Value::Null) + Ok(Value::Null) + }) } fn complete_backup_source(arg: &str, param: &HashMap) -> Vec { @@ -821,7 +847,10 @@ fn restore( _info: &ApiMethod, _rpcenv: &mut dyn RpcEnvironment, ) -> Result { + async_main(restore_do(param)) +} +async fn restore_do(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; let verbose = param["verbose"].as_bool().unwrap_or(false); @@ -843,7 +872,7 @@ fn restore( let result = client.get(&path, Some(json!({ "backup-type": group.backup_type(), "backup-id": group.backup_id(), - }))).wait()?; + }))).await?; let list = result["data"].as_array().unwrap(); if list.len() == 0 { @@ -879,7 +908,9 @@ fn restore( format!("{}.blob", archive_name) }; - let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?; + let client = client + .start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true) + .await?; let tmpfile = std::fs::OpenOptions::new() .write(true) @@ -889,7 +920,7 @@ fn restore( const INDEX_BLOB_NAME: &str = "index.json.blob"; - let index_data = client.download(INDEX_BLOB_NAME, Vec::with_capacity(64*1024)).wait()?; + let index_data = client.download(INDEX_BLOB_NAME, Vec::with_capacity(64*1024)).await?; let blob = DataBlob::from_raw(index_data)?; blob.verify_crc()?; let backup_index_data = blob.decode(crypt_config.clone())?; @@ -926,7 +957,7 @@ fn restore( } } else if server_archive_name.ends_with(".didx") { - let tmpfile = client.download(&server_archive_name, tmpfile).wait()?; + let tmpfile = client.download(&server_archive_name, tmpfile).await?; let index = DynamicIndexReader::new(tmpfile) .map_err(|err| format_err!("unable to read dynamic index '{}' - {}", archive_name, err))?; @@ -958,7 +989,7 @@ fn restore( .map_err(|err| format_err!("unable to pipe data - {}", err))?; } } else if server_archive_name.ends_with(".fidx") { - let tmpfile = client.download(&server_archive_name, tmpfile).wait()?; + let tmpfile = client.download(&server_archive_name, tmpfile).await?; let index = FixedIndexReader::new(tmpfile) .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?; @@ -1034,9 +1065,9 @@ fn upload_log( let body = hyper::Body::from(raw_data); - let result = client.upload("application/octet-stream", body, &path, Some(args)).wait()?; - - Ok(result) + async_main(async move { + client.upload("application/octet-stream", body, &path, Some(args)).await + }) } fn prune( @@ -1060,7 +1091,7 @@ fn prune( param["backup-type"] = group.backup_type().into(); param["backup-id"] = group.backup_id().into(); - let _result = client.post(&path, Some(param)).wait()?; + let _result = async_main(async move { client.post(&path, Some(param)).await })?; record_repository(&repo); @@ -1081,7 +1112,7 @@ fn status( let path = format!("api2/json/admin/datastore/{}/status", repo.store()); - let result = client.get(&path, None).wait()?; + let result = async_main(async move { client.get(&path, None).await })?; let data = &result["data"]; record_repository(&repo); @@ -1107,14 +1138,14 @@ fn status( } // like get, but simply ignore errors and return Null instead -fn try_get(repo: &BackupRepository, url: &str) -> Value { +async fn try_get(repo: &BackupRepository, url: &str) -> Value { let client = match HttpClient::new(repo.host(), repo.user()) { Ok(v) => v, _ => return Value::Null, }; - let mut resp = match client.get(url, None).wait() { + let mut resp = match client.get(url, None).await { Ok(v) => v, _ => return Value::Null, }; @@ -1128,6 +1159,10 @@ fn try_get(repo: &BackupRepository, url: &str) -> Value { } fn complete_backup_group(_arg: &str, param: &HashMap) -> Vec { + async_main(async { complete_backup_group_do(param).await }) +} + +async fn complete_backup_group_do(param: &HashMap) -> Vec { let mut result = vec![]; @@ -1138,7 +1173,7 @@ fn complete_backup_group(_arg: &str, param: &HashMap) -> Vec) -> Vec) -> Vec { + async_main(async { complete_group_or_snapshot_do(arg, param).await }) +} + +async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap) -> Vec { if arg.matches('/').count() < 2 { - let groups = complete_backup_group(arg, param); + let groups = complete_backup_group_do(param).await; let mut result = vec![]; for group in groups { result.push(group.to_string()); @@ -1165,10 +1204,14 @@ fn complete_group_or_snapshot(arg: &str, param: &HashMap) -> Vec return result; } - complete_backup_snapshot(arg, param) + complete_backup_snapshot_do(param).await } fn complete_backup_snapshot(_arg: &str, param: &HashMap) -> Vec { + async_main(async { complete_backup_snapshot_do(param).await }) +} + +async fn complete_backup_snapshot_do(param: &HashMap) -> Vec { let mut result = vec![]; @@ -1179,7 +1222,7 @@ fn complete_backup_snapshot(_arg: &str, param: &HashMap) -> Vec< let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let data = try_get(&repo, &path); + let data = try_get(&repo, &path).await; if let Some(list) = data.as_array() { for item in list { @@ -1196,6 +1239,10 @@ fn complete_backup_snapshot(_arg: &str, param: &HashMap) -> Vec< } fn complete_server_file_name(_arg: &str, param: &HashMap) -> Vec { + async_main(async { complete_server_file_name_do(param).await }) +} + +async fn complete_server_file_name_do(param: &HashMap) -> Vec { let mut result = vec![]; @@ -1222,7 +1269,7 @@ fn complete_server_file_name(_arg: &str, param: &HashMap) -> Vec let path = format!("api2/json/admin/datastore/{}/files?{}", repo.store(), query); - let data = try_get(&repo, &path); + let data = try_get(&repo, &path).await; if let Some(list) = data.as_array() { for item in list { @@ -1236,9 +1283,10 @@ fn complete_server_file_name(_arg: &str, param: &HashMap) -> Vec } fn complete_archive_name(arg: &str, param: &HashMap) -> Vec { - complete_server_file_name(arg, param) - .iter().map(|v| strip_server_file_expenstion(&v)).collect() + .iter() + .map(|v| strip_server_file_expenstion(&v)) + .collect() } fn complete_chunk_size(_arg: &str, _param: &HashMap) -> Vec { @@ -1721,9 +1769,12 @@ We do not extraxt '.pxar' archives when writing to stdandard output. .insert("status".to_owned(), status_cmd_def.into()) .insert("key".to_owned(), key_mgmt_cli().into()); - hyper::rt::run(futures::future::lazy(move || { - run_cli_command(cmd_def.into()); - Ok(()) - })); - + run_cli_command(cmd_def.into()); +} + +fn async_main(fut: F) -> ::Output { + let rt = tokio::runtime::Runtime::new().unwrap(); + let ret = rt.block_on(fut); + rt.shutdown_now(); + ret }