src/client/http_client.rs: move low level H2 code into separate class
This commit is contained in:
parent
d2c48afc6e
commit
9af37c8f0e
@ -275,12 +275,11 @@ impl HttpClient {
|
|||||||
|
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
if status != http::StatusCode::SWITCHING_PROTOCOLS {
|
if status != http::StatusCode::SWITCHING_PROTOCOLS {
|
||||||
bail!("got status {:?} instead of protocol switch", status);
|
future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
|
||||||
|
} else {
|
||||||
|
future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(resp.into_body().on_upgrade().map_err(Error::from))
|
|
||||||
})
|
})
|
||||||
.flatten()
|
|
||||||
.and_then(|upgraded| {
|
.and_then(|upgraded| {
|
||||||
h2::client::handshake(upgraded).map_err(Error::from)
|
h2::client::handshake(upgraded).map_err(Error::from)
|
||||||
})
|
})
|
||||||
@ -403,53 +402,25 @@ impl HttpClient {
|
|||||||
|
|
||||||
//#[derive(Clone)]
|
//#[derive(Clone)]
|
||||||
pub struct BackupClient {
|
pub struct BackupClient {
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
h2: H2Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackupClient {
|
impl BackupClient {
|
||||||
|
|
||||||
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
|
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
|
||||||
Self { h2 }
|
Self { h2: H2Client::new(h2) }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
let req = Self::request_builder("localhost", "GET", path, param).unwrap();
|
self.h2.get(path, param)
|
||||||
Self::request(self.h2.clone(), req)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
|
self.h2.put(path, param)
|
||||||
Self::request(self.h2.clone(), req)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
Self::h2post(self.h2.clone(), path, param)
|
self.h2.post(path, param)
|
||||||
}
|
|
||||||
|
|
||||||
fn h2post(
|
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
|
||||||
path: &str,
|
|
||||||
param: Option<Value>
|
|
||||||
) -> impl Future<Item=Value, Error=Error> {
|
|
||||||
let req = Self::request_builder("localhost", "POST", path, param).unwrap();
|
|
||||||
Self::request(h2, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
|
||||||
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
|
||||||
|
|
||||||
self.h2.clone()
|
|
||||||
.ready()
|
|
||||||
.map_err(Error::from)
|
|
||||||
.and_then(move |mut send_request| {
|
|
||||||
let (response, stream) = send_request.send_request(request, false).unwrap();
|
|
||||||
PipeToSendStream::new(bytes::Bytes::from(data), stream)
|
|
||||||
.and_then(|_| {
|
|
||||||
response
|
|
||||||
.map_err(Error::from)
|
|
||||||
.and_then(Self::h2api_response)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn response_queue() -> (
|
fn response_queue() -> (
|
||||||
@ -465,7 +436,7 @@ impl BackupClient {
|
|||||||
.for_each(|response: h2::client::ResponseFuture| {
|
.for_each(|response: h2::client::ResponseFuture| {
|
||||||
response
|
response
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.and_then(Self::h2api_response)
|
.and_then(H2Client::h2api_response)
|
||||||
.and_then(|result| {
|
.and_then(|result| {
|
||||||
println!("RESPONSE: {:?}", result);
|
println!("RESPONSE: {:?}", result);
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -482,16 +453,16 @@ impl BackupClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn download_chunk_list(
|
fn download_chunk_list(
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
h2: H2Client,
|
||||||
path: &str,
|
path: &str,
|
||||||
archive_name: &str,
|
archive_name: &str,
|
||||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> impl Future<Item=(), Error=Error> {
|
) -> impl Future<Item=(), Error=Error> {
|
||||||
|
|
||||||
let param = json!({ "archive-name": archive_name });
|
let param = json!({ "archive-name": archive_name });
|
||||||
let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap();
|
let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
|
||||||
|
|
||||||
Self::send_request(h2.clone(), request, None)
|
h2.send_request(request, None)
|
||||||
.and_then(move |response| {
|
.and_then(move |response| {
|
||||||
response
|
response
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
@ -521,8 +492,7 @@ impl BackupClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish(&self) -> impl Future<Item=(), Error=Error> {
|
pub fn finish(&self) -> impl Future<Item=(), Error=Error> {
|
||||||
Self::h2post(self.h2.clone(), "finish", None)
|
self.h2.clone().post("finish", None).map(|_| ())
|
||||||
.map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn upload_dynamic_stream(
|
pub fn upload_dynamic_stream(
|
||||||
@ -542,25 +512,20 @@ impl BackupClient {
|
|||||||
|
|
||||||
Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
|
Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
|
||||||
.and_then(move |_| {
|
.and_then(move |_| {
|
||||||
Self::h2post(h2_2, "dynamic_index", Some(param))
|
h2_2.post("dynamic_index", Some(param))
|
||||||
})
|
})
|
||||||
.and_then(move |res| {
|
.and_then(move |res| {
|
||||||
println!("GOT1 {:?}", res);
|
|
||||||
let wid = res.as_u64().unwrap();
|
let wid = res.as_u64().unwrap();
|
||||||
//let dir_path = PathBuf::from("../casync-pve");
|
|
||||||
//let dir_path = PathBuf::from(".");
|
|
||||||
|
|
||||||
//upload_pxar(h2, known_chunks, &dir_path, wid).unwrap()
|
|
||||||
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
|
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
|
||||||
.and_then(move |_size| {
|
.and_then(move |_size| {
|
||||||
Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid })))
|
h2_4.post("dynamic_close", Some(json!({ "wid": wid })))
|
||||||
})
|
})
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn upload_stream(
|
fn upload_stream(
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
h2: H2Client,
|
||||||
wid: u64,
|
wid: u64,
|
||||||
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
|
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
|
||||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
@ -576,7 +541,6 @@ impl BackupClient {
|
|||||||
|
|
||||||
let start_time = std::time::Instant::now();
|
let start_time = std::time::Instant::now();
|
||||||
|
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.for_each(move |data| {
|
.for_each(move |data| {
|
||||||
let h2 = h2.clone();
|
let h2 = h2.clone();
|
||||||
@ -597,17 +561,17 @@ impl BackupClient {
|
|||||||
if chunk_is_known {
|
if chunk_is_known {
|
||||||
println!("append existing chunk ({} bytes)", data.len());
|
println!("append existing chunk ({} bytes)", data.len());
|
||||||
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
|
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
|
||||||
request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
|
request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
|
||||||
upload_data = None;
|
upload_data = None;
|
||||||
} else {
|
} else {
|
||||||
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len());
|
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len());
|
||||||
known_chunks.insert(digest);
|
known_chunks.insert(digest);
|
||||||
let param = json!({ "wid": wid, "size" : data.len() });
|
let param = json!({ "wid": wid, "size" : data.len() });
|
||||||
request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
|
request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
|
||||||
upload_data = Some(bytes::Bytes::from(data));
|
upload_data = Some(bytes::Bytes::from(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::send_request(h2, request, upload_data)
|
h2.send_request(request, upload_data)
|
||||||
.and_then(move |response| {
|
.and_then(move |response| {
|
||||||
upload_queue.send(response)
|
upload_queue.send(response)
|
||||||
.map(|_| ()).map_err(Error::from)
|
.map(|_| ()).map_err(Error::from)
|
||||||
@ -666,8 +630,8 @@ impl BackupClient {
|
|||||||
let upload_queue = upload_queue.clone();
|
let upload_queue = upload_queue.clone();
|
||||||
|
|
||||||
println!("send test data ({} bytes)", data.len());
|
println!("send test data ({} bytes)", data.len());
|
||||||
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
|
let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
|
||||||
Self::send_request(h2, request, Some(bytes::Bytes::from(data)))
|
h2.send_request(request, Some(bytes::Bytes::from(data)))
|
||||||
.and_then(move |response| {
|
.and_then(move |response| {
|
||||||
upload_queue.send(response)
|
upload_queue.send(response)
|
||||||
.map(|_| ()).map_err(Error::from)
|
.map(|_| ()).map_err(Error::from)
|
||||||
@ -690,13 +654,58 @@ impl BackupClient {
|
|||||||
Ok(speed)
|
Ok(speed)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct H2Client {
|
||||||
|
h2: h2::client::SendRequest<bytes::Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl H2Client {
|
||||||
|
|
||||||
|
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
|
||||||
|
Self { h2 }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let req = Self::request_builder("localhost", "GET", path, param).unwrap();
|
||||||
|
self.request(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
|
||||||
|
self.request(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let req = Self::request_builder("localhost", "POST", path, param).unwrap();
|
||||||
|
self.request(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
||||||
|
|
||||||
|
|
||||||
|
self.h2.clone()
|
||||||
|
.ready()
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(move |mut send_request| {
|
||||||
|
let (response, stream) = send_request.send_request(request, false).unwrap();
|
||||||
|
PipeToSendStream::new(bytes::Bytes::from(data), stream)
|
||||||
|
.and_then(|_| {
|
||||||
|
response
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(Self::h2api_response)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn request(
|
fn request(
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
&self,
|
||||||
request: Request<()>,
|
request: Request<()>,
|
||||||
) -> impl Future<Item=Value, Error=Error> {
|
) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
|
||||||
Self::send_request(h2, request, None)
|
self.send_request(request, None)
|
||||||
.and_then(move |response| {
|
.and_then(move |response| {
|
||||||
response
|
response
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
@ -705,12 +714,12 @@ impl BackupClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn send_request(
|
fn send_request(
|
||||||
h2: h2::client::SendRequest<bytes::Bytes>,
|
&self,
|
||||||
request: Request<()>,
|
request: Request<()>,
|
||||||
data: Option<bytes::Bytes>,
|
data: Option<bytes::Bytes>,
|
||||||
) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
|
) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
|
||||||
|
|
||||||
h2
|
self.h2.clone()
|
||||||
.ready()
|
.ready()
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.and_then(move |mut send_request| {
|
.and_then(move |mut send_request| {
|
||||||
|
Loading…
Reference in New Issue
Block a user