src/client/http_client.rs: implement http2 client wrapper
This commit is contained in:
parent
9e391bb7f0
commit
b57cb26406
|
@ -1,47 +1,9 @@
|
||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
|
|
||||||
use serde_json::{json, Value};
|
use serde_json::json;
|
||||||
use proxmox_backup::client::*;
|
use proxmox_backup::client::*;
|
||||||
|
|
||||||
fn get(mut h2: h2::client::SendRequest<bytes::Bytes>, path: &str) -> impl Future<Item=Value, Error=Error> {
|
|
||||||
|
|
||||||
let request = http::Request::builder()
|
|
||||||
.method("GET")
|
|
||||||
.uri(format!("https://localhost/{}", path))
|
|
||||||
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
|
|
||||||
.body(()).unwrap();
|
|
||||||
|
|
||||||
println!("SEND GET {} REQUEST", path);
|
|
||||||
let (response, _stream) = h2.send_request(request, true).unwrap();
|
|
||||||
|
|
||||||
response
|
|
||||||
.map_err(Error::from)
|
|
||||||
.and_then(|response| {
|
|
||||||
let (head, mut body) = response.into_parts();
|
|
||||||
|
|
||||||
println!("Received response: {:?}", head);
|
|
||||||
|
|
||||||
// The `release_capacity` handle allows the caller to manage
|
|
||||||
// flow control.
|
|
||||||
//
|
|
||||||
// Whenever data is received, the caller is responsible for
|
|
||||||
// releasing capacity back to the server once it has freed
|
|
||||||
// the data from memory.
|
|
||||||
let mut release_capacity = body.release_capacity().clone();
|
|
||||||
|
|
||||||
body
|
|
||||||
.concat2()
|
|
||||||
.map_err(Error::from)
|
|
||||||
.and_then(move |data| {
|
|
||||||
println!("RX: {:?}", data);
|
|
||||||
|
|
||||||
// fixme:
|
|
||||||
Ok(Value::Null)
|
|
||||||
})
|
|
||||||
}).map_err(Error::from)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run() -> Result<(), Error> {
|
fn run() -> Result<(), Error> {
|
||||||
|
|
||||||
let host = "localhost";
|
let host = "localhost";
|
||||||
|
@ -51,13 +13,13 @@ fn run() -> Result<(), Error> {
|
||||||
let mut client = HttpClient::new(host, username)?;
|
let mut client = HttpClient::new(host, username)?;
|
||||||
|
|
||||||
let param = json!({"backup-type": "host", "backup-id": "test" });
|
let param = json!({"backup-type": "host", "backup-id": "test" });
|
||||||
let h2client = client.h2upgrade("/api2/json/admin/datastore/store2/backup", Some(param));
|
let upgrade = client.h2upgrade("/api2/json/admin/datastore/store2/backup", Some(param));
|
||||||
|
|
||||||
let res = h2client.and_then(|mut h2| {
|
let res = upgrade.and_then(|send_request| {
|
||||||
println!("start http2");
|
println!("start http2");
|
||||||
|
let h2 = H2Client::new(send_request);
|
||||||
let result1 = get(h2.clone(), "test1");
|
let result1 = h2.get("test1", None);
|
||||||
let result2 = get(h2.clone(), "test2");
|
let result2 = h2.get("test2", None);
|
||||||
|
|
||||||
result1.join(result2)
|
result1.join(result2)
|
||||||
}).wait()?;
|
}).wait()?;
|
||||||
|
|
|
@ -6,7 +6,7 @@ use hyper::client::Client;
|
||||||
use xdg::BaseDirectories;
|
use xdg::BaseDirectories;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
|
||||||
use http::Request;
|
use http::{Request, Response};
|
||||||
use http::header::HeaderValue;
|
use http::header::HeaderValue;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
@ -385,3 +385,109 @@ impl HttpClient {
|
||||||
Ok(request)
|
Ok(request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct H2Client {
|
||||||
|
h2: h2::client::SendRequest<bytes::Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl H2Client {
|
||||||
|
|
||||||
|
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
|
||||||
|
Self { h2 }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let req = Self::request_builder("localhost", "GET", path, param).unwrap();
|
||||||
|
self.request(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
let req = Self::request_builder("localhost", "POST", path, param).unwrap();
|
||||||
|
self.request(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request(
|
||||||
|
&self,
|
||||||
|
request: Request<()>,
|
||||||
|
) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
|
||||||
|
self.h2.clone().ready().map_err(Error::from).
|
||||||
|
and_then(move |mut send_request| {
|
||||||
|
// fixme: what about stream/upload?
|
||||||
|
let (response, _stream) = send_request.send_request(request, true).unwrap();
|
||||||
|
response
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(Self::h2api_response)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn h2api_response(response: Response<h2::RecvStream>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
|
||||||
|
let status = response.status();
|
||||||
|
|
||||||
|
let (_head, mut body) = response.into_parts();
|
||||||
|
|
||||||
|
// The `release_capacity` handle allows the caller to manage
|
||||||
|
// flow control.
|
||||||
|
//
|
||||||
|
// Whenever data is received, the caller is responsible for
|
||||||
|
// releasing capacity back to the server once it has freed
|
||||||
|
// the data from memory.
|
||||||
|
let mut release_capacity = body.release_capacity().clone();
|
||||||
|
|
||||||
|
body
|
||||||
|
.map(move |chunk| {
|
||||||
|
println!("RX: {} bytes", chunk.len());
|
||||||
|
// Let the server send more data.
|
||||||
|
let _ = release_capacity.release_capacity(chunk.len());
|
||||||
|
chunk
|
||||||
|
})
|
||||||
|
.concat2()
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(move |data| {
|
||||||
|
println!("RX: {:?}", data);
|
||||||
|
let text = String::from_utf8(data.to_vec()).unwrap();
|
||||||
|
if status.is_success() {
|
||||||
|
if text.len() > 0 {
|
||||||
|
let mut value: Value = serde_json::from_str(&text)?;
|
||||||
|
if let Some(map) = value.as_object_mut() {
|
||||||
|
if let Some(data) = map.remove("data") {
|
||||||
|
return Ok(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bail!("got result without data property");
|
||||||
|
} else {
|
||||||
|
Ok(Value::Null)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
bail!("HTTP Error {}: {}", status, text);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<()>, Error> {
|
||||||
|
let path = path.trim_matches('/');
|
||||||
|
let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
|
||||||
|
|
||||||
|
if let Some(data) = data {
|
||||||
|
let query = tools::json_object_to_query(data)?;
|
||||||
|
let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
|
||||||
|
let request = Request::builder()
|
||||||
|
.method(method)
|
||||||
|
.uri(url)
|
||||||
|
.header("User-Agent", "proxmox-backup-client/1.0")
|
||||||
|
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
|
||||||
|
.body(())?;
|
||||||
|
return Ok(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
let request = Request::builder()
|
||||||
|
.method(method)
|
||||||
|
.uri(url)
|
||||||
|
.header("User-Agent", "proxmox-backup-client/1.0")
|
||||||
|
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
|
||||||
|
.body(())?;
|
||||||
|
|
||||||
|
Ok(request)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue