tokio 1.0: update to new tokio-openssl interface
connect/accept are now happening on pinned SslStreams Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
parent
7c66701366
commit
0f860f712f
|
@ -167,7 +167,7 @@ fn accept_connections(
|
||||||
mut listener: tokio::net::TcpListener,
|
mut listener: tokio::net::TcpListener,
|
||||||
acceptor: Arc<openssl::ssl::SslAcceptor>,
|
acceptor: Arc<openssl::ssl::SslAcceptor>,
|
||||||
debug: bool,
|
debug: bool,
|
||||||
) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> {
|
) -> tokio::sync::mpsc::Receiver<Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>> {
|
||||||
|
|
||||||
const MAX_PENDING_ACCEPTS: usize = 1024;
|
const MAX_PENDING_ACCEPTS: usize = 1024;
|
||||||
|
|
||||||
|
@ -185,7 +185,24 @@ fn accept_connections(
|
||||||
sock.set_nodelay(true).unwrap();
|
sock.set_nodelay(true).unwrap();
|
||||||
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||||
let acceptor = Arc::clone(&acceptor);
|
let acceptor = Arc::clone(&acceptor);
|
||||||
let mut sender = sender.clone();
|
|
||||||
|
let ssl = match openssl::ssl::Ssl::new(acceptor.context()) {
|
||||||
|
Ok(ssl) => ssl,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("failed to create Ssl object from Acceptor context - {}", err);
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let stream = match tokio_openssl::SslStream::new(ssl, sock) {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("failed to create SslStream using ssl and connection socket - {}", err);
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut stream = Box::pin(stream);
|
||||||
|
let sender = sender.clone();
|
||||||
|
|
||||||
if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
|
if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
|
||||||
eprintln!("connection rejected - to many open connections");
|
eprintln!("connection rejected - to many open connections");
|
||||||
|
@ -195,13 +212,13 @@ fn accept_connections(
|
||||||
let accept_counter = accept_counter.clone();
|
let accept_counter = accept_counter.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let accept_future = tokio::time::timeout(
|
let accept_future = tokio::time::timeout(
|
||||||
Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock));
|
Duration::new(10, 0), stream.as_mut().accept());
|
||||||
|
|
||||||
let result = accept_future.await;
|
let result = accept_future.await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(connection)) => {
|
Ok(Ok(())) => {
|
||||||
if let Err(_) = sender.send(Ok(connection)).await {
|
if let Err(_) = sender.send(Ok(stream)).await {
|
||||||
if debug {
|
if debug {
|
||||||
eprintln!("detect closed connection channel");
|
eprintln!("detect closed connection channel");
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ impl RestServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl tower_service::Service<&tokio_openssl::SslStream<tokio::net::TcpStream>> for RestServer {
|
impl tower_service::Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>> for RestServer {
|
||||||
type Response = ApiService;
|
type Response = ApiService;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
|
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
|
||||||
|
@ -74,7 +74,7 @@ impl tower_service::Service<&tokio_openssl::SslStream<tokio::net::TcpStream>> fo
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, ctx: &tokio_openssl::SslStream<tokio::net::TcpStream>) -> Self::Future {
|
fn call(&mut self, ctx: &Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>) -> Self::Future {
|
||||||
match ctx.get_ref().peer_addr() {
|
match ctx.get_ref().peer_addr() {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
future::err(format_err!("unable to get peer address - {}", err)).boxed()
|
future::err(format_err!("unable to get peer address - {}", err)).boxed()
|
||||||
|
|
|
@ -74,7 +74,7 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
|
||||||
// we need this for crate::client::http_client:
|
// we need this for crate::client::http_client:
|
||||||
impl Connection for EitherStream<
|
impl Connection for EitherStream<
|
||||||
tokio::net::TcpStream,
|
tokio::net::TcpStream,
|
||||||
tokio_openssl::SslStream<tokio::net::TcpStream>,
|
Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
|
||||||
> {
|
> {
|
||||||
fn connected(&self) -> hyper::client::connect::Connected {
|
fn connected(&self) -> hyper::client::connect::Connected {
|
||||||
match self {
|
match self {
|
||||||
|
|
|
@ -3,6 +3,7 @@ use lazy_static::lazy_static;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::os::unix::io::AsRawFd;
|
use std::os::unix::io::AsRawFd;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
use hyper::{Uri, Body};
|
use hyper::{Uri, Body};
|
||||||
use hyper::client::{Client, HttpConnector};
|
use hyper::client::{Client, HttpConnector};
|
||||||
|
@ -101,7 +102,7 @@ impl HttpsConnector {
|
||||||
|
|
||||||
type MaybeTlsStream = EitherStream<
|
type MaybeTlsStream = EitherStream<
|
||||||
tokio::net::TcpStream,
|
tokio::net::TcpStream,
|
||||||
tokio_openssl::SslStream<tokio::net::TcpStream>,
|
Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
impl hyper::service::Service<Uri> for HttpsConnector {
|
impl hyper::service::Service<Uri> for HttpsConnector {
|
||||||
|
@ -123,10 +124,6 @@ impl hyper::service::Service<Uri> for HttpsConnector {
|
||||||
.scheme()
|
.scheme()
|
||||||
.ok_or_else(|| format_err!("missing URL scheme"))?
|
.ok_or_else(|| format_err!("missing URL scheme"))?
|
||||||
== "https";
|
== "https";
|
||||||
let host = dst
|
|
||||||
.host()
|
|
||||||
.ok_or_else(|| format_err!("missing hostname in destination url?"))?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
let config = this.ssl_connector.configure();
|
let config = this.ssl_connector.configure();
|
||||||
let dst_str = dst.to_string(); // for error messages
|
let dst_str = dst.to_string(); // for error messages
|
||||||
|
@ -139,7 +136,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
|
||||||
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||||
|
|
||||||
if is_https {
|
if is_https {
|
||||||
let conn = tokio_openssl::connect(config?, &host, conn).await?;
|
let conn: tokio_openssl::SslStream<tokio::net::TcpStream> = tokio_openssl::SslStream::new(config?.into_ssl(&dst_str)?, conn)?;
|
||||||
|
let mut conn = Box::pin(conn);
|
||||||
|
conn.as_mut().connect().await?;
|
||||||
Ok(MaybeTlsStream::Right(conn))
|
Ok(MaybeTlsStream::Right(conn))
|
||||||
} else {
|
} else {
|
||||||
Ok(MaybeTlsStream::Left(conn))
|
Ok(MaybeTlsStream::Left(conn))
|
||||||
|
|
Loading…
Reference in New Issue