update a chunk of stuff to the hyper release
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
@ -267,7 +267,21 @@ impl BackupWriter {
|
||||
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
|
||||
let (verify_result_tx, verify_result_rx) = oneshot::channel();
|
||||
|
||||
hyper::rt::spawn(
|
||||
// FIXME: check if this works as expected as replacement for the combinator below?
|
||||
// tokio::spawn(async move {
|
||||
// let result: Result<(), Error> = (async move {
|
||||
// while let Some(response) = verify_queue_rx.recv().await {
|
||||
// match H2Client::h2api_response(response.await?).await {
|
||||
// Ok(result) => println!("RESPONSE: {:?}", result),
|
||||
// Err(err) => bail!("pipelined request failed: {}", err),
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// }).await;
|
||||
// let _ignore_closed_channel = verify_result_tx.send(result);
|
||||
// });
|
||||
// old code for reference?
|
||||
tokio::spawn(
|
||||
verify_queue_rx
|
||||
.map(Ok::<_, Error>)
|
||||
.try_for_each(|response: h2::client::ResponseFuture| {
|
||||
@ -294,7 +308,8 @@ impl BackupWriter {
|
||||
|
||||
let h2_2 = h2.clone();
|
||||
|
||||
hyper::rt::spawn(
|
||||
// FIXME: async-block-ify this code!
|
||||
tokio::spawn(
|
||||
verify_queue_rx
|
||||
.map(Ok::<_, Error>)
|
||||
.and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
|
||||
@ -329,7 +344,7 @@ impl BackupWriter {
|
||||
println!("append chunks list len ({})", digest_list.len());
|
||||
let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
|
||||
let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
|
||||
let param_data = bytes::Bytes::from(param.to_string().as_bytes());
|
||||
let param_data = bytes::Bytes::from(param.to_string().into_bytes());
|
||||
let upload_data = Some(param_data);
|
||||
h2_2.send_request(request, upload_data)
|
||||
.and_then(move |response| {
|
||||
@ -373,12 +388,12 @@ impl BackupWriter {
|
||||
}
|
||||
|
||||
let mut body = resp.into_body();
|
||||
let mut release_capacity = body.release_capacity().clone();
|
||||
let mut flow_control = body.flow_control().clone();
|
||||
|
||||
let mut stream = DigestListDecoder::new(body.map_err(Error::from));
|
||||
|
||||
while let Some(chunk) = stream.try_next().await? {
|
||||
let _ = release_capacity.release_capacity(chunk.len());
|
||||
let _ = flow_control.release_capacity(chunk.len());
|
||||
println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
|
||||
known_chunks.lock().unwrap().insert(chunk);
|
||||
}
|
||||
@ -466,7 +481,7 @@ impl BackupWriter {
|
||||
println!("upload new chunk {} ({} bytes, offset {})", digest_str,
|
||||
chunk_info.chunk_len, offset);
|
||||
|
||||
let chunk_data = chunk_info.chunk.raw_data();
|
||||
let chunk_data = chunk_info.chunk.into_inner();
|
||||
let param = json!({
|
||||
"wid": wid,
|
||||
"digest": digest_str,
|
||||
@ -487,7 +502,7 @@ impl BackupWriter {
|
||||
upload_queue
|
||||
.send((new_info, Some(response)))
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
.map_err(|err| format_err!("failed to send to upload queue: {}", err))
|
||||
})
|
||||
)
|
||||
} else {
|
||||
@ -496,7 +511,7 @@ impl BackupWriter {
|
||||
upload_queue
|
||||
.send((merged_chunk_info, None))
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
.map_err(|err| format_err!("failed to send to upload queue: {}", err))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::io::Write;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use chrono::Utc;
|
||||
use failure::*;
|
||||
@ -329,7 +330,7 @@ impl HttpClient {
|
||||
let connection = connection.map(|_| ());
|
||||
|
||||
// Spawn a new task to drive the connection state
|
||||
hyper::rt::spawn(connection);
|
||||
tokio::spawn(connection);
|
||||
|
||||
// Wait until the `SendRequest` handle has available capacity.
|
||||
let c = h2.ready().await?;
|
||||
@ -358,10 +359,7 @@ impl HttpClient {
|
||||
|
||||
async fn api_response(response: Response<Body>) -> Result<Value, Error> {
|
||||
let status = response.status();
|
||||
let data = response
|
||||
.into_body()
|
||||
.try_concat()
|
||||
.await?;
|
||||
let data = hyper::body::to_bytes(response.into_body()).await?;
|
||||
|
||||
let text = String::from_utf8(data.to_vec()).unwrap();
|
||||
if status.is_success() {
|
||||
@ -487,10 +485,9 @@ impl H2Client {
|
||||
}
|
||||
|
||||
let mut body = resp.into_body();
|
||||
let mut release_capacity = body.release_capacity().clone();
|
||||
|
||||
while let Some(chunk) = body.try_next().await? {
|
||||
let _ = release_capacity.release_capacity(chunk.len());
|
||||
while let Some(chunk) = body.data().await {
|
||||
let chunk = chunk?;
|
||||
body.flow_control().release_capacity(chunk.len())?;
|
||||
output.write_all(&chunk)?;
|
||||
}
|
||||
|
||||
@ -561,18 +558,14 @@ impl H2Client {
|
||||
|
||||
let (_head, mut body) = response.into_parts();
|
||||
|
||||
// The `release_capacity` handle allows the caller to manage
|
||||
// flow control.
|
||||
//
|
||||
// Whenever data is received, the caller is responsible for
|
||||
// releasing capacity back to the server once it has freed
|
||||
// the data from memory.
|
||||
let mut release_capacity = body.release_capacity().clone();
|
||||
|
||||
let mut data = Vec::new();
|
||||
while let Some(chunk) = body.try_next().await? {
|
||||
while let Some(chunk) = body.data().await {
|
||||
let chunk = chunk?;
|
||||
// Whenever data is received, the caller is responsible for
|
||||
// releasing capacity back to the server once it has freed
|
||||
// the data from memory.
|
||||
// Let the server send more data.
|
||||
let _ = release_capacity.release_capacity(chunk.len());
|
||||
body.flow_control().release_capacity(chunk.len())?;
|
||||
data.extend(chunk);
|
||||
}
|
||||
|
||||
@ -632,9 +625,10 @@ impl H2Client {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpsConnector {
|
||||
http: HttpConnector,
|
||||
ssl_connector: SslConnector,
|
||||
ssl_connector: std::sync::Arc<SslConnector>,
|
||||
}
|
||||
|
||||
impl HttpsConnector {
|
||||
@ -643,7 +637,7 @@ impl HttpsConnector {
|
||||
|
||||
Self {
|
||||
http,
|
||||
ssl_connector,
|
||||
ssl_connector: std::sync::Arc::new(ssl_connector),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -653,29 +647,38 @@ type MaybeTlsStream = EitherStream<
|
||||
tokio_openssl::SslStream<tokio::net::TcpStream>,
|
||||
>;
|
||||
|
||||
impl hyper::client::connect::Connect for HttpsConnector {
|
||||
type Transport = MaybeTlsStream;
|
||||
impl hyper::service::Service<Uri> for HttpsConnector {
|
||||
type Response = MaybeTlsStream;
|
||||
type Error = Error;
|
||||
type Future = Box<dyn Future<Output = Result<(
|
||||
Self::Transport,
|
||||
hyper::client::connect::Connected,
|
||||
), Error>> + Send + Unpin + 'static>;
|
||||
type Future = std::pin::Pin<Box<
|
||||
dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
|
||||
>>;
|
||||
|
||||
fn connect(&self, dst: hyper::client::connect::Destination) -> Self::Future {
|
||||
let is_https = dst.scheme() == "https";
|
||||
let host = dst.host().to_string();
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// This connector is always ready, but others might not be.
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
let config = self.ssl_connector.configure();
|
||||
let conn = self.http.connect(dst);
|
||||
fn call(&mut self, dst: Uri) -> Self::Future {
|
||||
let mut this = self.clone();
|
||||
async move {
|
||||
let is_https = dst
|
||||
.scheme()
|
||||
.ok_or_else(|| format_err!("missing URL scheme"))?
|
||||
== "https";
|
||||
let host = dst
|
||||
.host()
|
||||
.ok_or_else(|| format_err!("missing hostname in destination url?"))?
|
||||
.to_string();
|
||||
|
||||
Box::new(Box::pin(async move {
|
||||
let (conn, connected) = conn.await?;
|
||||
let config = this.ssl_connector.configure();
|
||||
let conn = this.http.call(dst).await?;
|
||||
if is_https {
|
||||
let conn = tokio_openssl::connect(config?, &host, conn).await?;
|
||||
Ok((MaybeTlsStream::Right(conn), connected))
|
||||
Ok(MaybeTlsStream::Right(conn))
|
||||
} else {
|
||||
Ok((MaybeTlsStream::Left(conn), connected))
|
||||
Ok(MaybeTlsStream::Left(conn))
|
||||
}
|
||||
}))
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user