diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs index 844afaa9..963f6fdd 100644 --- a/src/tools/async_io.rs +++ b/src/tools/async_io.rs @@ -1,4 +1,4 @@ -//! Generic AsyncRead/AsyncWrite utilities. +//! AsyncRead/AsyncWrite utilities. use std::io; use std::os::unix::io::{AsRawFd, RawFd}; @@ -9,41 +9,52 @@ use futures::stream::{Stream, TryStream}; use futures::ready; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpListener; -use hyper::client::connect::Connection; +use tokio_openssl::SslStream; +use hyper::client::connect::{Connection, Connected}; -pub enum EitherStream { - Left(L), - Right(R), +/// Asynchronous stream, possibly encrypted and proxied +/// +/// Usefule for HTTP client implementations using hyper. +pub enum MaybeTlsStream { + Normal(S), + Proxied(S), + Secured(SslStream), } -impl AsyncRead for EitherStream { +impl AsyncRead for MaybeTlsStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf, ) -> Poll> { match self.get_mut() { - EitherStream::Left(ref mut s) => { + MaybeTlsStream::Normal(ref mut s) => { Pin::new(s).poll_read(cx, buf) } - EitherStream::Right(ref mut s) => { + MaybeTlsStream::Proxied(ref mut s) => { + Pin::new(s).poll_read(cx, buf) + } + MaybeTlsStream::Secured(ref mut s) => { Pin::new(s).poll_read(cx, buf) } } } } -impl AsyncWrite for EitherStream { +impl AsyncWrite for MaybeTlsStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll> { match self.get_mut() { - EitherStream::Left(ref mut s) => { + MaybeTlsStream::Normal(ref mut s) => { Pin::new(s).poll_write(cx, buf) } - EitherStream::Right(ref mut s) => { + MaybeTlsStream::Proxied(ref mut s) => { + Pin::new(s).poll_write(cx, buf) + } + MaybeTlsStream::Secured(ref mut s) => { Pin::new(s).poll_write(cx, buf) } } @@ -51,10 +62,13 @@ impl AsyncWrite for EitherStream, cx: &mut Context) -> Poll> { match self.get_mut() { - EitherStream::Left(ref mut s) => { + MaybeTlsStream::Normal(ref mut s) => { Pin::new(s).poll_flush(cx) } - EitherStream::Right(ref mut s) => { + MaybeTlsStream::Proxied(ref mut s) => { + Pin::new(s).poll_flush(cx) + } + MaybeTlsStream::Secured(ref mut s) => { Pin::new(s).poll_flush(cx) } } @@ -62,25 +76,27 @@ impl AsyncWrite for EitherStream, cx: &mut Context) -> Poll> { match self.get_mut() { - EitherStream::Left(ref mut s) => { + MaybeTlsStream::Normal(ref mut s) => { Pin::new(s).poll_shutdown(cx) } - EitherStream::Right(ref mut s) => { + MaybeTlsStream::Proxied(ref mut s) => { + Pin::new(s).poll_shutdown(cx) + } + MaybeTlsStream::Secured(ref mut s) => { Pin::new(s).poll_shutdown(cx) } } } } -// we need this for crate::client::http_client: -impl Connection for EitherStream< - tokio::net::TcpStream, - Pin>>, -> { - fn connected(&self) -> hyper::client::connect::Connected { +// we need this for the hyper http client +impl Connection for MaybeTlsStream +{ + fn connected(&self) -> Connected { match self { - EitherStream::Left(s) => s.connected(), - EitherStream::Right(s) => s.get_ref().connected(), + MaybeTlsStream::Normal(s) => s.connected(), + MaybeTlsStream::Proxied(s) => s.connected().proxy(true), + MaybeTlsStream::Secured(s) => s.get_ref().connected(), } } } diff --git a/src/tools/http.rs b/src/tools/http.rs index d08ce451..3cd3af4e 100644 --- a/src/tools/http.rs +++ b/src/tools/http.rs @@ -10,9 +10,11 @@ use hyper::client::{Client, HttpConnector}; use http::{Request, Response}; use openssl::ssl::{SslConnector, SslMethod}; use futures::*; +use tokio::net::TcpStream; +use tokio_openssl::SslStream; use crate::tools::{ - async_io::EitherStream, + async_io::MaybeTlsStream, socket::{ set_tcp_keepalive, PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, @@ -100,13 +102,8 @@ impl HttpsConnector { } } -type MaybeTlsStream = EitherStream< - tokio::net::TcpStream, - Pin>>, ->; - impl hyper::service::Service for HttpsConnector { - type Response = MaybeTlsStream; + type Response = MaybeTlsStream; type Error = Error; #[allow(clippy::type_complexity)] type Future = Pin> + Send + 'static>>; @@ -140,12 +137,11 @@ impl hyper::service::Service for HttpsConnector { let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); if is_https { - let conn: tokio_openssl::SslStream = tokio_openssl::SslStream::new(config?.into_ssl(&host)?, conn)?; - let mut conn = Box::pin(conn); - conn.as_mut().connect().await?; - Ok(MaybeTlsStream::Right(conn)) + let mut conn: SslStream = SslStream::new(config?.into_ssl(&host)?, conn)?; + Pin::new(&mut conn).connect().await?; + Ok(MaybeTlsStream::Secured(conn)) } else { - Ok(MaybeTlsStream::Left(conn)) + Ok(MaybeTlsStream::Normal(conn)) } }.boxed() }