HttpsConnector: add proxy support

This commit is contained in:
Dietmar Maurer 2021-04-21 13:17:02 +02:00 committed by Thomas Lamprecht
parent 02a58862dd
commit 9104152a83
3 changed files with 156 additions and 24 deletions

View File

@ -194,7 +194,7 @@ fn apt_get_changelog(
bail!("Package '{}' not found", name); bail!("Package '{}' not found", name);
} }
let mut client = SimpleHttp::new(); let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
let changelog_url = &pkg_info[0].change_log_url; let changelog_url = &pkg_info[0].change_log_url;
// FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it

View File

@ -10,7 +10,14 @@ use hyper::client::{Client, HttpConnector};
use http::{Request, Response}; use http::{Request, Response};
use openssl::ssl::{SslConnector, SslMethod}; use openssl::ssl::{SslConnector, SslMethod};
use futures::*; use futures::*;
use tokio::net::TcpStream; use tokio::{
io::{
AsyncBufReadExt,
AsyncWriteExt,
BufStream,
},
net::TcpStream,
};
use tokio_openssl::SslStream; use tokio_openssl::SslStream;
use crate::tools::{ use crate::tools::{
@ -21,6 +28,14 @@ use crate::tools::{
}, },
}; };
/// HTTP Proxy Configuration
#[derive(Clone)]
pub struct ProxyConfig {
pub host: String,
pub port: u16,
pub force_connect: bool,
}
/// Asyncrounous HTTP client implementation /// Asyncrounous HTTP client implementation
pub struct SimpleHttp { pub struct SimpleHttp {
client: Client<HttpsConnector, Body>, client: Client<HttpsConnector, Body>,
@ -28,18 +43,27 @@ pub struct SimpleHttp {
impl SimpleHttp { impl SimpleHttp {
pub fn new() -> Self { pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build(); let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
Self::with_ssl_connector(ssl_connector) Self::with_ssl_connector(ssl_connector, proxy_config)
} }
pub fn with_ssl_connector(ssl_connector: SslConnector) -> Self { pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
let connector = HttpConnector::new(); let connector = HttpConnector::new();
let https = HttpsConnector::with_connector(connector, ssl_connector); let mut https = HttpsConnector::with_connector(connector, ssl_connector);
if let Some(proxy_config) = proxy_config {
https.set_proxy(proxy_config);
}
let client = Client::builder().build(https); let client = Client::builder().build(https);
Self { client } Self { client }
} }
pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
self.client.request(request)
.map_err(Error::from)
.await
}
pub async fn post( pub async fn post(
&mut self, &mut self,
uri: &str, uri: &str,
@ -106,6 +130,7 @@ impl SimpleHttp {
pub struct HttpsConnector { pub struct HttpsConnector {
connector: HttpConnector, connector: HttpConnector,
ssl_connector: Arc<SslConnector>, ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
} }
impl HttpsConnector { impl HttpsConnector {
@ -114,8 +139,56 @@ impl HttpsConnector {
Self { Self {
connector, connector,
ssl_connector: Arc::new(ssl_connector), ssl_connector: Arc::new(ssl_connector),
proxy: None,
} }
} }
pub fn set_proxy(&mut self, proxy: ProxyConfig) {
self.proxy = Some(proxy);
}
async fn secure_stream(
tcp_stream: TcpStream,
ssl_connector: &SslConnector,
host: &str,
) -> Result<MaybeTlsStream<TcpStream>, Error> {
let config = ssl_connector.configure()?;
let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
async fn parse_connect_status(
stream: &mut BufStream<TcpStream>,
) -> Result<(), Error> {
let mut status_str = String::new();
// TODO: limit read-length
if stream.read_line(&mut status_str).await? == 0 {
bail!("proxy connect failed - unexpected EOF")
}
if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
bail!("proxy connect failed - invalid status: {}", status_str)
}
loop {
// skip rest until \r\n
let mut response = String::new();
if stream.read_line(&mut response).await? == 0 {
bail!("proxy connect failed - unexpected EOF")
}
if response.len() > 8192 {
bail!("proxy connect failed - long lines in connect rtesponse")
}
if response == "\r\n" {
break;
}
}
Ok(())
}
} }
impl hyper::service::Service<Uri> for HttpsConnector { impl hyper::service::Service<Uri> for HttpsConnector {
@ -124,9 +197,10 @@ impl hyper::service::Service<Uri> for HttpsConnector {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// This connector is always ready, but others might not be. self.connector
Poll::Ready(Ok(())) .poll_ready(ctx)
.map_err(|err| err.into())
} }
fn call(&mut self, dst: Uri) -> Self::Future { fn call(&mut self, dst: Uri) -> Self::Future {
@ -139,24 +213,82 @@ impl hyper::service::Service<Uri> for HttpsConnector {
return futures::future::err(format_err!("missing URL scheme")).boxed(); return futures::future::err(format_err!("missing URL scheme")).boxed();
} }
}; };
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
async move { if let Some(ref proxy) = self.proxy {
let config = ssl_connector.configure()?;
let dst_str = dst.to_string(); // for error messages
let conn = connector
.call(dst)
.await
.map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); let use_connect = is_https || proxy.force_connect;
if is_https { let proxy_url = format!("{}:{}", proxy.host, proxy.port);
let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(&host)?, conn)?; let proxy_uri = match Uri::builder()
Pin::new(&mut conn).connect().await?; .scheme("http")
Ok(MaybeTlsStream::Secured(conn)) .authority(proxy_url.as_str())
.path_and_query("/")
.build()
{
Ok(uri) => uri,
Err(err) => return futures::future::err(err.into()).boxed(),
};
if use_connect {
async move {
let proxy_stream = connector
.call(proxy_uri)
.await
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
let _ = set_tcp_keepalive(proxy_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
let mut stream = BufStream::new(proxy_stream);
let connect_request = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\
Host: {0}:{1}\r\n\r\n",
host, port,
);
stream.write_all(connect_request.as_bytes()).await?;
stream.flush().await?;
Self::parse_connect_status(&mut stream).await?;
let tcp_stream = stream.into_inner();
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
Ok(MaybeTlsStream::Normal(tcp_stream))
}
}.boxed()
} else { } else {
Ok(MaybeTlsStream::Normal(conn)) async move {
let tcp_stream = connector
.call(proxy_uri)
.await
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
Ok(MaybeTlsStream::Proxied(tcp_stream))
}.boxed()
} }
}.boxed() } else {
async move {
let dst_str = dst.to_string(); // for error messages
let tcp_stream = connector
.call(dst)
.await
.map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
Ok(MaybeTlsStream::Normal(tcp_stream))
}
}.boxed()
}
} }
} }

View File

@ -102,7 +102,7 @@ async fn register_subscription(
"check_token": challenge, "check_token": challenge,
}); });
let mut client = SimpleHttp::new(); let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
let uri = "https://shop.maurer-it.com/modules/servers/licensing/verify.php"; let uri = "https://shop.maurer-it.com/modules/servers/licensing/verify.php";
let query = tools::json_object_to_query(params)?; let query = tools::json_object_to_query(params)?;