src/client/http_client.rs: cleanup h2 backup client

This commit is contained in:
Dietmar Maurer 2019-05-22 13:05:51 +02:00
parent a55fcd740f
commit 6ab34afa88
2 changed files with 82 additions and 30 deletions

View File

@ -13,8 +13,7 @@ fn upload_speed() -> Result<usize, Error> {
let mut client = HttpClient::new(host, username)?; let mut client = HttpClient::new(host, username)?;
let param = json!({"backup-type": "host", "backup-id": "speedtest" }); let upgrade = client.start_backup(datastore, "host", "speedtest");
let upgrade = client.h2upgrade(&format!("/api2/json/admin/datastore/{}/backup", datastore), Some(param));
let res = upgrade.and_then(|h2| { let res = upgrade.and_then(|h2| {
println!("start upload speed test"); println!("start upload speed test");

View File

@ -248,12 +248,16 @@ impl HttpClient {
self.request(req) self.request(req)
} }
pub fn h2upgrade( pub fn start_backup(
&mut self, path: &self,
&str, param: Option<Value> datastore: &str,
) -> impl Future<Item=H2Client, Error=Error> { backup_type: &str,
backup_id: &str,
) -> impl Future<Item=BackupClient, Error=Error> {
let mut req = Self::request_builder(&self.server, "GET", path, param).unwrap(); let path = format!("/api2/json/admin/datastore/{}/backup", datastore);
let param = json!({"backup-type": backup_type, "backup-id": backup_id});
let mut req = Self::request_builder(&self.server, "GET", &path, Some(param)).unwrap();
let login = self.auth.listen(); let login = self.auth.listen();
@ -271,7 +275,7 @@ impl HttpClient {
let status = resp.status(); let status = resp.status();
if status != http::StatusCode::SWITCHING_PROTOCOLS { if status != http::StatusCode::SWITCHING_PROTOCOLS {
bail!("h2upgrade failed with status {:?}", status); bail!("got status {:?} instead of protocol switch", status);
} }
Ok(resp.into_body().on_upgrade().map_err(Error::from)) Ok(resp.into_body().on_upgrade().map_err(Error::from))
@ -289,7 +293,7 @@ impl HttpClient {
// Wait until the `SendRequest` handle has available capacity. // Wait until the `SendRequest` handle has available capacity.
h2.ready() h2.ready()
.map(H2Client::new) .map(BackupClient::new)
.map_err(Error::from) .map_err(Error::from)
}) })
}) })
@ -395,12 +399,12 @@ impl HttpClient {
} }
} }
#[derive(Clone)] //#[derive(Clone)]
pub struct H2Client { pub struct BackupClient {
h2: h2::client::SendRequest<bytes::Bytes>, h2: h2::client::SendRequest<bytes::Bytes>,
} }
impl H2Client { impl BackupClient {
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self { pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
Self { h2 } Self { h2 }
@ -408,17 +412,25 @@ impl H2Client {
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> { pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder("localhost", "GET", path, param).unwrap(); let req = Self::request_builder("localhost", "GET", path, param).unwrap();
self.request(req) Self::request(self.h2.clone(), req)
} }
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> { pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
self.request(req) Self::request(self.h2.clone(), req)
} }
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> { pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
Self::h2post(self.h2.clone(), path, param)
}
fn h2post(
h2: h2::client::SendRequest<bytes::Bytes>,
path: &str,
param: Option<Value>
) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder("localhost", "POST", path, param).unwrap(); let req = Self::request_builder("localhost", "POST", path, param).unwrap();
self.request(req) Self::request(h2, req)
} }
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> { pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
@ -438,7 +450,7 @@ impl H2Client {
}) })
} }
fn response_queue(self) -> ( fn response_queue() -> (
mpsc::Sender<h2::client::ResponseFuture>, mpsc::Sender<h2::client::ResponseFuture>,
sync::oneshot::Receiver<Result<(), Error>> sync::oneshot::Receiver<Result<(), Error>>
) { ) {
@ -467,8 +479,8 @@ impl H2Client {
(verify_queue_tx, verify_result_rx) (verify_queue_tx, verify_result_rx)
} }
pub fn download_chunk_list( fn download_chunk_list(
&self, h2: h2::client::SendRequest<bytes::Bytes>,
path: &str, path: &str,
archive_name: &str, archive_name: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
@ -477,7 +489,7 @@ impl H2Client {
let param = json!({ "archive-name": archive_name }); let param = json!({ "archive-name": archive_name });
let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap(); let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap();
self.send_request(request, None) Self::send_request(h2.clone(), request, None)
.and_then(move |response| { .and_then(move |response| {
response response
.map_err(Error::from) .map_err(Error::from)
@ -506,8 +518,47 @@ impl H2Client {
}) })
} }
pub fn upload_stream( pub fn finish(&self) -> impl Future<Item=(), Error=Error> {
Self::h2post(self.h2.clone(), "finish", None)
.map(|_| ())
}
pub fn upload_dynamic_stream(
&self, &self,
archive_name: &str,
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
) -> impl Future<Item=(), Error=Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let h2 = self.h2.clone();
let h2_2 = self.h2.clone();
let h2_3 = self.h2.clone();
let h2_4 = self.h2.clone();
let param = json!({ "archive-name": archive_name });
Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
.and_then(move |_| {
Self::h2post(h2_2, "dynamic_index", Some(param))
})
.and_then(move |res| {
println!("GOT1 {:?}", res);
let wid = res.as_u64().unwrap();
//let dir_path = PathBuf::from("../casync-pve");
//let dir_path = PathBuf::from(".");
//upload_pxar(h2, known_chunks, &dir_path, wid).unwrap()
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
.and_then(move |size| {
Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid })))
})
.map(|_| ())
})
}
fn upload_stream(
h2: h2::client::SendRequest<bytes::Bytes>,
wid: u64, wid: u64,
stream: impl Stream<Item=bytes::BytesMut, Error=Error>, stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
@ -519,14 +570,15 @@ impl H2Client {
let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
let stream_len2 = stream_len.clone(); let stream_len2 = stream_len.clone();
let (upload_queue, upload_result) = self.clone().response_queue(); let (upload_queue, upload_result) = Self::response_queue();
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
let self2 = self.clone();
stream stream
.for_each(move |data| { .for_each(move |data| {
let h2 = h2.clone();
repeat.fetch_add(1, Ordering::SeqCst); repeat.fetch_add(1, Ordering::SeqCst);
stream_len.fetch_add(data.len(), Ordering::SeqCst); stream_len.fetch_add(data.len(), Ordering::SeqCst);
@ -553,7 +605,7 @@ impl H2Client {
upload_data = Some(bytes::Bytes::from(data)); upload_data = Some(bytes::Bytes::from(data));
} }
self2.send_request(request, upload_data) Self::send_request(h2, request, upload_data)
.and_then(move |response| { .and_then(move |response| {
upload_queue.send(response) upload_queue.send(response)
.map(|_| ()).map_err(Error::from) .map(|_| ()).map_err(Error::from)
@ -595,11 +647,11 @@ impl H2Client {
let repeat = std::sync::Arc::new(AtomicUsize::new(0)); let repeat = std::sync::Arc::new(AtomicUsize::new(0));
let repeat2 = repeat.clone(); let repeat2 = repeat.clone();
let (upload_queue, upload_result) = self.clone().response_queue(); let (upload_queue, upload_result) = Self::response_queue();
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
let self2 = self.clone(); let h2 = self.h2.clone();
futures::stream::repeat(data) futures::stream::repeat(data)
.take_while(move |_| { .take_while(move |_| {
@ -607,12 +659,13 @@ impl H2Client {
Ok(start_time.elapsed().as_secs() < 5) Ok(start_time.elapsed().as_secs() < 5)
}) })
.for_each(move |data| { .for_each(move |data| {
let h2 = h2.clone();
let upload_queue = upload_queue.clone(); let upload_queue = upload_queue.clone();
println!("send test data ({} bytes)", data.len()); println!("send test data ({} bytes)", data.len());
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
self2.send_request(request, Some(bytes::Bytes::from(data))) Self::send_request(h2, request, Some(bytes::Bytes::from(data)))
.and_then(move |response| { .and_then(move |response| {
upload_queue.send(response) upload_queue.send(response)
.map(|_| ()).map_err(Error::from) .map(|_| ()).map_err(Error::from)
@ -637,11 +690,11 @@ impl H2Client {
} }
fn request( fn request(
&self, h2: h2::client::SendRequest<bytes::Bytes>,
request: Request<()>, request: Request<()>,
) -> impl Future<Item=Value, Error=Error> { ) -> impl Future<Item=Value, Error=Error> {
self.send_request(request, None) Self::send_request(h2, request, None)
.and_then(move |response| { .and_then(move |response| {
response response
.map_err(Error::from) .map_err(Error::from)
@ -650,12 +703,12 @@ impl H2Client {
} }
fn send_request( fn send_request(
&self, h2: h2::client::SendRequest<bytes::Bytes>,
request: Request<()>, request: Request<()>,
data: Option<bytes::Bytes>, data: Option<bytes::Bytes>,
) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> { ) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
self.h2.clone() h2
.ready() .ready()
.map_err(Error::from) .map_err(Error::from)
.and_then(move |mut send_request| { .and_then(move |mut send_request| {