proxy: factor out accept_connection

no functional changes, moved code and named the channel's
type for more readability

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2021-05-11 15:53:54 +02:00 committed by Thomas Lamprecht
parent 137309cc4e
commit a5e3be4992
1 changed files with 79 additions and 70 deletions

View File

@ -170,87 +170,96 @@ async fn run() -> Result<(), Error> {
Ok(())
}
type ClientStreamResult =
Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>;
const MAX_PENDING_ACCEPTS: usize = 1024;
fn accept_connections(
listener: tokio::net::TcpListener,
acceptor: Arc<openssl::ssl::SslAcceptor>,
debug: bool,
) -> tokio::sync::mpsc::Receiver<Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>> {
const MAX_PENDING_ACCEPTS: usize = 1024;
) -> tokio::sync::mpsc::Receiver<ClientStreamResult> {
let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS);
let accept_counter = Arc::new(());
tokio::spawn(async move {
loop {
match listener.accept().await {
Err(err) => {
eprintln!("error accepting tcp connection: {}", err);
}
Ok((sock, _addr)) => {
sock.set_nodelay(true).unwrap();
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
let acceptor = Arc::clone(&acceptor);
let ssl = match openssl::ssl::Ssl::new(acceptor.context()) {
Ok(ssl) => ssl,
Err(err) => {
eprintln!("failed to create Ssl object from Acceptor context - {}", err);
continue;
},
};
let stream = match tokio_openssl::SslStream::new(ssl, sock) {
Ok(stream) => stream,
Err(err) => {
eprintln!("failed to create SslStream using ssl and connection socket - {}", err);
continue;
},
};
let mut stream = Box::pin(stream);
let sender = sender.clone();
if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
eprintln!("connection rejected - to many open connections");
continue;
}
let accept_counter = accept_counter.clone();
tokio::spawn(async move {
let accept_future = tokio::time::timeout(
Duration::new(10, 0), stream.as_mut().accept());
let result = accept_future.await;
match result {
Ok(Ok(())) => {
if sender.send(Ok(stream)).await.is_err() && debug {
eprintln!("detect closed connection channel");
}
}
Ok(Err(err)) => {
if debug {
eprintln!("https handshake failed - {}", err);
}
}
Err(_) => {
if debug {
eprintln!("https handshake timeout");
}
}
}
drop(accept_counter); // decrease reference count
});
}
}
}
});
tokio::spawn(accept_connection(listener, acceptor, debug, sender));
receiver
}
async fn accept_connection(
listener: tokio::net::TcpListener,
acceptor: Arc<openssl::ssl::SslAcceptor>,
debug: bool,
sender: tokio::sync::mpsc::Sender<ClientStreamResult>,
) {
let accept_counter = Arc::new(());
loop {
match listener.accept().await {
Err(err) => {
eprintln!("error accepting tcp connection: {}", err);
}
Ok((sock, _addr)) => {
sock.set_nodelay(true).unwrap();
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
let acceptor = Arc::clone(&acceptor);
let ssl = match openssl::ssl::Ssl::new(acceptor.context()) {
Ok(ssl) => ssl,
Err(err) => {
eprintln!("failed to create Ssl object from Acceptor context - {}", err);
continue;
},
};
let stream = match tokio_openssl::SslStream::new(ssl, sock) {
Ok(stream) => stream,
Err(err) => {
eprintln!("failed to create SslStream using ssl and connection socket - {}", err);
continue;
},
};
let mut stream = Box::pin(stream);
let sender = sender.clone();
if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
eprintln!("connection rejected - to many open connections");
continue;
}
let accept_counter = accept_counter.clone();
tokio::spawn(async move {
let accept_future = tokio::time::timeout(
Duration::new(10, 0), stream.as_mut().accept());
let result = accept_future.await;
match result {
Ok(Ok(())) => {
if sender.send(Ok(stream)).await.is_err() && debug {
eprintln!("detect closed connection channel");
}
}
Ok(Err(err)) => {
if debug {
eprintln!("https handshake failed - {}", err);
}
}
Err(_) => {
if debug {
eprintln!("https handshake timeout");
}
}
}
drop(accept_counter); // decrease reference count
});
}
}
}
}
fn start_stat_generator() {
let abort_future = server::shutdown_future();
let future = Box::pin(run_stat_generator());