src/client/backup_reader.rs: split BackupReader code into separate file
This commit is contained in:
parent
fd04ca7a5a
commit
9e490a7479
|
@ -4,7 +4,7 @@ use failure::*;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
use proxmox_backup::client::HttpClient;
|
use proxmox_backup::client::{HttpClient, BackupReader};
|
||||||
|
|
||||||
pub struct DummyWriter {
|
pub struct DummyWriter {
|
||||||
bytes: usize,
|
bytes: usize,
|
||||||
|
@ -33,8 +33,7 @@ async fn run() -> Result<(), Error> {
|
||||||
|
|
||||||
let backup_time = "2019-06-28T10:49:48Z".parse::<DateTime<Utc>>()?;
|
let backup_time = "2019-06-28T10:49:48Z".parse::<DateTime<Utc>>()?;
|
||||||
|
|
||||||
let client = client
|
let client = BackupReader::start(client, "store2", "host", "elsa", backup_time, true)
|
||||||
.start_backup_reader("store2", "host", "elsa", backup_time, true)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let start = std::time::SystemTime::now();
|
let start = std::time::SystemTime::now();
|
||||||
|
|
|
@ -479,11 +479,14 @@ fn dump_catalog(
|
||||||
let client = HttpClient::new(repo.host(), repo.user(), None)?;
|
let client = HttpClient::new(repo.host(), repo.user(), None)?;
|
||||||
|
|
||||||
async_main(async move {
|
async_main(async move {
|
||||||
let client = client.start_backup_reader(
|
let client = BackupReader::start(
|
||||||
|
client,
|
||||||
repo.store(),
|
repo.store(),
|
||||||
&snapshot.group().backup_type(),
|
&snapshot.group().backup_type(),
|
||||||
&snapshot.group().backup_id(),
|
&snapshot.group().backup_id(),
|
||||||
snapshot.backup_time(), true).await?;
|
snapshot.backup_time(),
|
||||||
|
true,
|
||||||
|
).await?;
|
||||||
|
|
||||||
let backup_index_data = download_index_blob(client.clone(), crypt_config.clone()).await?;
|
let backup_index_data = download_index_blob(client.clone(), crypt_config.clone()).await?;
|
||||||
let backup_index: Value = serde_json::from_slice(&backup_index_data[..])?;
|
let backup_index: Value = serde_json::from_slice(&backup_index_data[..])?;
|
||||||
|
@ -1034,9 +1037,7 @@ async fn restore_do(param: Value) -> Result<Value, Error> {
|
||||||
format!("{}.blob", archive_name)
|
format!("{}.blob", archive_name)
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = client
|
let client = BackupReader::start(client, repo.store(), &backup_type, &backup_id, backup_time, true).await?;
|
||||||
.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
|
@ -1753,9 +1754,7 @@ async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
|
||||||
bail!("Can only mount pxar archives.");
|
bail!("Can only mount pxar archives.");
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = client
|
let client = BackupReader::start(client, repo.store(), &backup_type, &backup_id, backup_time, true).await?;
|
||||||
.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
|
|
|
@ -9,6 +9,9 @@ mod merge_known_chunks;
|
||||||
mod http_client;
|
mod http_client;
|
||||||
pub use http_client::*;
|
pub use http_client::*;
|
||||||
|
|
||||||
|
mod backup_reader;
|
||||||
|
pub use backup_reader::*;
|
||||||
|
|
||||||
mod remote_chunk_reader;
|
mod remote_chunk_reader;
|
||||||
pub use remote_chunk_reader::*;
|
pub use remote_chunk_reader::*;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
use failure::*;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
|
use proxmox::tools::digest_to_hex;
|
||||||
|
|
||||||
|
use crate::tools::futures::Canceller;
|
||||||
|
|
||||||
|
use super::{HttpClient, H2Client};
|
||||||
|
|
||||||
|
/// Backup
|
||||||
|
pub struct BackupReader {
|
||||||
|
h2: H2Client,
|
||||||
|
canceller: Canceller,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for BackupReader {
|
||||||
|
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.canceller.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BackupReader {
|
||||||
|
|
||||||
|
fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
|
||||||
|
Arc::new(Self { h2, canceller })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(
|
||||||
|
client: HttpClient,
|
||||||
|
datastore: &str,
|
||||||
|
backup_type: &str,
|
||||||
|
backup_id: &str,
|
||||||
|
backup_time: DateTime<Utc>,
|
||||||
|
debug: bool,
|
||||||
|
) -> Result<Arc<BackupReader>, Error> {
|
||||||
|
|
||||||
|
let param = json!({
|
||||||
|
"backup-type": backup_type,
|
||||||
|
"backup-id": backup_id,
|
||||||
|
"backup-time": backup_time.timestamp(),
|
||||||
|
"store": datastore,
|
||||||
|
"debug": debug,
|
||||||
|
});
|
||||||
|
let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap();
|
||||||
|
|
||||||
|
let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
|
||||||
|
|
||||||
|
Ok(BackupReader::new(h2, canceller))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(
|
||||||
|
&self,
|
||||||
|
path: &str,
|
||||||
|
param: Option<Value>,
|
||||||
|
) -> Result<Value, Error> {
|
||||||
|
self.h2.get(path, param).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn put(
|
||||||
|
&self,
|
||||||
|
path: &str,
|
||||||
|
param: Option<Value>,
|
||||||
|
) -> Result<Value, Error> {
|
||||||
|
self.h2.put(path, param).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn post(
|
||||||
|
&self,
|
||||||
|
path: &str,
|
||||||
|
param: Option<Value>,
|
||||||
|
) -> Result<Value, Error> {
|
||||||
|
self.h2.post(path, param).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download<W: Write + Send>(
|
||||||
|
&self,
|
||||||
|
file_name: &str,
|
||||||
|
output: W,
|
||||||
|
) -> Result<W, Error> {
|
||||||
|
let path = "download";
|
||||||
|
let param = json!({ "file-name": file_name });
|
||||||
|
self.h2.download(path, Some(param), output).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn speedtest<W: Write + Send>(
|
||||||
|
&self,
|
||||||
|
output: W,
|
||||||
|
) -> Result<W, Error> {
|
||||||
|
self.h2.download("speedtest", None, output).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_chunk<W: Write + Send>(
|
||||||
|
&self,
|
||||||
|
digest: &[u8; 32],
|
||||||
|
output: W,
|
||||||
|
) -> Result<W, Error> {
|
||||||
|
let path = "chunk";
|
||||||
|
let param = json!({ "digest": digest_to_hex(digest) });
|
||||||
|
self.h2.download(path, Some(param), output).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn force_close(self) {
|
||||||
|
self.canceller.cancel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -315,29 +315,6 @@ impl HttpClient {
|
||||||
Ok(BackupClient::new(h2, canceller))
|
Ok(BackupClient::new(h2, canceller))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_backup_reader(
|
|
||||||
&self,
|
|
||||||
datastore: &str,
|
|
||||||
backup_type: &str,
|
|
||||||
backup_id: &str,
|
|
||||||
backup_time: DateTime<Utc>,
|
|
||||||
debug: bool,
|
|
||||||
) -> Result<Arc<BackupReader>, Error> {
|
|
||||||
|
|
||||||
let param = json!({
|
|
||||||
"backup-type": backup_type,
|
|
||||||
"backup-id": backup_id,
|
|
||||||
"backup-time": backup_time.timestamp(),
|
|
||||||
"store": datastore,
|
|
||||||
"debug": debug,
|
|
||||||
});
|
|
||||||
let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
|
|
||||||
|
|
||||||
let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
|
|
||||||
|
|
||||||
Ok(BackupReader::new(h2, canceller))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start_h2_connection(
|
pub async fn start_h2_connection(
|
||||||
&self,
|
&self,
|
||||||
mut req: Request<Body>,
|
mut req: Request<Body>,
|
||||||
|
@ -443,6 +420,11 @@ impl HttpClient {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read-only access to server property
|
||||||
|
pub fn server(&self) -> &str {
|
||||||
|
&self.server
|
||||||
|
}
|
||||||
|
|
||||||
pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
|
pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
|
||||||
let path = path.trim_matches('/');
|
let path = path.trim_matches('/');
|
||||||
let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
|
let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
|
||||||
|
@ -480,81 +462,6 @@ impl HttpClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct BackupReader {
|
|
||||||
h2: H2Client,
|
|
||||||
canceller: Canceller,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for BackupReader {
|
|
||||||
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.canceller.cancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BackupReader {
|
|
||||||
|
|
||||||
pub fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
|
|
||||||
Arc::new(Self { h2, canceller })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get(
|
|
||||||
&self,
|
|
||||||
path: &str,
|
|
||||||
param: Option<Value>,
|
|
||||||
) -> Result<Value, Error> {
|
|
||||||
self.h2.get(path, param).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn put(
|
|
||||||
&self,
|
|
||||||
path: &str,
|
|
||||||
param: Option<Value>,
|
|
||||||
) -> Result<Value, Error> {
|
|
||||||
self.h2.put(path, param).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn post(
|
|
||||||
&self,
|
|
||||||
path: &str,
|
|
||||||
param: Option<Value>,
|
|
||||||
) -> Result<Value, Error> {
|
|
||||||
self.h2.post(path, param).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn download<W: Write + Send>(
|
|
||||||
&self,
|
|
||||||
file_name: &str,
|
|
||||||
output: W,
|
|
||||||
) -> Result<W, Error> {
|
|
||||||
let path = "download";
|
|
||||||
let param = json!({ "file-name": file_name });
|
|
||||||
self.h2.download(path, Some(param), output).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn speedtest<W: Write + Send>(
|
|
||||||
&self,
|
|
||||||
output: W,
|
|
||||||
) -> Result<W, Error> {
|
|
||||||
self.h2.download("speedtest", None, output).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn download_chunk<W: Write + Send>(
|
|
||||||
&self,
|
|
||||||
digest: &[u8; 32],
|
|
||||||
output: W,
|
|
||||||
) -> Result<W, Error> {
|
|
||||||
let path = "chunk";
|
|
||||||
let param = json!({ "digest": digest_to_hex(digest) });
|
|
||||||
self.h2.download(path, Some(param), output).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn force_close(self) {
|
|
||||||
self.canceller.cancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BackupClient {
|
pub struct BackupClient {
|
||||||
h2: H2Client,
|
h2: H2Client,
|
||||||
canceller: Canceller,
|
canceller: Canceller,
|
||||||
|
|
Loading…
Reference in New Issue