http proxy: implement read_connect_response()
Limit memory usage in case we get strange data from proxy.
This commit is contained in:
parent
4adf47b606
commit
b6c06dce9d
@ -12,9 +12,9 @@ use openssl::ssl::{SslConnector, SslMethod};
|
|||||||
use futures::*;
|
use futures::*;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{
|
io::{
|
||||||
AsyncBufReadExt,
|
AsyncRead,
|
||||||
|
AsyncReadExt,
|
||||||
AsyncWriteExt,
|
AsyncWriteExt,
|
||||||
BufStream,
|
|
||||||
},
|
},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
};
|
};
|
||||||
@ -158,35 +158,51 @@ impl HttpsConnector {
|
|||||||
Ok(MaybeTlsStream::Secured(conn))
|
Ok(MaybeTlsStream::Secured(conn))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn parse_connect_status(
|
async fn read_connect_response<R: AsyncRead + Unpin>(
|
||||||
stream: &mut BufStream<TcpStream>,
|
stream: &mut R,
|
||||||
|
) -> Result<String, Error> {
|
||||||
|
|
||||||
|
let mut data: Vec<u8> = Vec::new();
|
||||||
|
let mut buffer = [0u8; 256];
|
||||||
|
const END_MARK: &[u8; 4] = b"\r\n\r\n";
|
||||||
|
|
||||||
|
'outer: loop {
|
||||||
|
let n = stream.read(&mut buffer[..]).await?;
|
||||||
|
if n == 0 { break; }
|
||||||
|
let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
|
||||||
|
data.extend(&buffer[..n]);
|
||||||
|
if data.len() >= END_MARK.len() {
|
||||||
|
if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
|
||||||
|
if pos != data.len() - END_MARK.len() {
|
||||||
|
bail!("unexpected data after connect response");
|
||||||
|
}
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if data.len() > 1024*32 { // max 32K (random chosen limit)
|
||||||
|
bail!("too many bytes");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = String::from_utf8_lossy(&data);
|
||||||
|
|
||||||
|
match response.split("\r\n").next() {
|
||||||
|
Some(status) => Ok(status.to_owned()),
|
||||||
|
None => bail!("missing newline"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn parse_connect_status<R: AsyncRead + Unpin>(
|
||||||
|
stream: &mut R,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let mut status_str = String::new();
|
let status_str = Self::read_connect_response(stream).await
|
||||||
|
.map_err(|err| format_err!("invalid connect response: {}", err))?;
|
||||||
// TODO: limit read-length
|
|
||||||
|
|
||||||
if stream.read_line(&mut status_str).await? == 0 {
|
|
||||||
bail!("proxy connect failed - unexpected EOF")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
|
if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
|
||||||
bail!("proxy connect failed - invalid status: {}", status_str)
|
bail!("proxy connect failed - invalid status: {}", status_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
|
||||||
// skip rest until \r\n
|
|
||||||
let mut response = String::new();
|
|
||||||
if stream.read_line(&mut response).await? == 0 {
|
|
||||||
bail!("proxy connect failed - unexpected EOF")
|
|
||||||
}
|
|
||||||
if response.len() > 8192 {
|
|
||||||
bail!("proxy connect failed - long lines in connect rtesponse")
|
|
||||||
}
|
|
||||||
if response == "\r\n" {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -233,14 +249,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
|
|||||||
if use_connect {
|
if use_connect {
|
||||||
async move {
|
async move {
|
||||||
|
|
||||||
let proxy_stream = connector
|
let mut tcp_stream = connector
|
||||||
.call(proxy_uri)
|
.call(proxy_uri)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
|
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
|
||||||
|
|
||||||
let _ = set_tcp_keepalive(proxy_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||||
|
|
||||||
let mut stream = BufStream::new(proxy_stream);
|
|
||||||
|
|
||||||
let connect_request = format!(
|
let connect_request = format!(
|
||||||
"CONNECT {0}:{1} HTTP/1.1\r\n\
|
"CONNECT {0}:{1} HTTP/1.1\r\n\
|
||||||
@ -248,12 +262,10 @@ impl hyper::service::Service<Uri> for HttpsConnector {
|
|||||||
host, port,
|
host, port,
|
||||||
);
|
);
|
||||||
|
|
||||||
stream.write_all(connect_request.as_bytes()).await?;
|
tcp_stream.write_all(connect_request.as_bytes()).await?;
|
||||||
stream.flush().await?;
|
tcp_stream.flush().await?;
|
||||||
|
|
||||||
Self::parse_connect_status(&mut stream).await?;
|
Self::parse_connect_status(&mut tcp_stream).await?;
|
||||||
|
|
||||||
let tcp_stream = stream.into_inner();
|
|
||||||
|
|
||||||
if is_https {
|
if is_https {
|
||||||
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
|
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
|
||||||
|
Loading…
Reference in New Issue
Block a user