move MaybeTlsStream wrapper to proxmox_http

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-05-14 15:44:52 +02:00 committed by Dietmar Maurer
parent 3241392117
commit 5b43cc4487
3 changed files with 4 additions and 123 deletions

View File

@ -57,7 +57,7 @@ proxmox = { version = "0.11.5", features = [ "sortable-macro", "api-macro" ] }
#proxmox = { git = "git://git.proxmox.com/git/proxmox", version = "0.1.2", features = [ "sortable-macro", "api-macro" ] } #proxmox = { git = "git://git.proxmox.com/git/proxmox", version = "0.1.2", features = [ "sortable-macro", "api-macro" ] }
#proxmox = { path = "../proxmox/proxmox", features = [ "sortable-macro", "api-macro" ] } #proxmox = { path = "../proxmox/proxmox", features = [ "sortable-macro", "api-macro" ] }
proxmox-fuse = "0.1.1" proxmox-fuse = "0.1.1"
proxmox-http = { version = "0.1.0", path = "../proxmox/proxmox-http", features = [ "websocket" ] } proxmox-http = { version = "0.1.0", path = "../proxmox/proxmox-http", features = [ "http-helpers", "websocket" ] }
pxar = { version = "0.10.1", features = [ "tokio-io" ] } pxar = { version = "0.10.1", features = [ "tokio-io" ] }
#pxar = { path = "../pxar", features = [ "tokio-io" ] } #pxar = { path = "../pxar", features = [ "tokio-io" ] }
regex = "1.2" regex = "1.2"

View File

@ -1,131 +1,14 @@
//! AsyncRead/AsyncWrite utilities. //! AsyncRead/AsyncWrite utilities.
use std::io;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::stream::{Stream, TryStream}; use futures::stream::{Stream, TryStream};
use futures::ready; use futures::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_openssl::SslStream;
use hyper::client::connect::{Connection, Connected};
/// Asynchronous stream, possibly encrypted and proxied
///
/// Usefule for HTTP client implementations using hyper.
pub enum MaybeTlsStream<S> {
Normal(S),
Proxied(S),
Secured(SslStream<S>),
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
}
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
}
}
fn is_write_vectored(&self) -> bool {
match self {
MaybeTlsStream::Normal(s) => s.is_write_vectored(),
MaybeTlsStream::Proxied(s) => s.is_write_vectored(),
MaybeTlsStream::Secured(s) => s.is_write_vectored(),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
}
}
}
// we need this for the hyper http client
impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
{
fn connected(&self) -> Connected {
match self {
MaybeTlsStream::Normal(s) => s.connected(),
MaybeTlsStream::Proxied(s) => s.connected().proxy(true),
MaybeTlsStream::Secured(s) => s.get_ref().connected(),
}
}
}
/// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard /// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard
/// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener` /// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener`

View File

@ -19,11 +19,9 @@ use tokio::{
use tokio_openssl::SslStream; use tokio_openssl::SslStream;
use proxmox::sys::linux::socket::set_tcp_keepalive; use proxmox::sys::linux::socket::set_tcp_keepalive;
use proxmox_http::http::MaybeTlsStream;
use crate::tools::{ use crate::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
async_io::MaybeTlsStream,
};
// Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses // Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses
pub(crate) fn build_authority(host: &str, port: u16) -> Result<Authority, Error> { pub(crate) fn build_authority(host: &str, port: u16) -> Result<Authority, Error> {