From a5e3be499281382fd6ea2adf76f01d7928bf368e Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 11 May 2021 15:53:54 +0200 Subject: [PATCH] proxy: factor out accept_connection no functional changes, moved code and named the channel's type for more readability Signed-off-by: Wolfgang Bumiller --- src/bin/proxmox-backup-proxy.rs | 149 +++++++++++++++++--------------- 1 file changed, 79 insertions(+), 70 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 31dc8332..27d1cbeb 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -170,87 +170,96 @@ async fn run() -> Result<(), Error> { Ok(()) } +type ClientStreamResult = + Result>>, Error>; +const MAX_PENDING_ACCEPTS: usize = 1024; + fn accept_connections( listener: tokio::net::TcpListener, acceptor: Arc, debug: bool, -) -> tokio::sync::mpsc::Receiver>>, Error>> { - - const MAX_PENDING_ACCEPTS: usize = 1024; +) -> tokio::sync::mpsc::Receiver { let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS); - let accept_counter = Arc::new(()); - - tokio::spawn(async move { - loop { - match listener.accept().await { - Err(err) => { - eprintln!("error accepting tcp connection: {}", err); - } - Ok((sock, _addr)) => { - sock.set_nodelay(true).unwrap(); - let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); - let acceptor = Arc::clone(&acceptor); - - let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { - Ok(ssl) => ssl, - Err(err) => { - eprintln!("failed to create Ssl object from Acceptor context - {}", err); - continue; - }, - }; - let stream = match tokio_openssl::SslStream::new(ssl, sock) { - Ok(stream) => stream, - Err(err) => { - eprintln!("failed to create SslStream using ssl and connection socket - {}", err); - continue; - }, - }; - - let mut stream = Box::pin(stream); - let sender = sender.clone(); - - if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { - eprintln!("connection rejected - to many open connections"); - continue; - } - - let accept_counter = accept_counter.clone(); - tokio::spawn(async move { - let accept_future = tokio::time::timeout( - Duration::new(10, 0), stream.as_mut().accept()); - - let result = accept_future.await; - - match result { - Ok(Ok(())) => { - if sender.send(Ok(stream)).await.is_err() && debug { - eprintln!("detect closed connection channel"); - } - } - Ok(Err(err)) => { - if debug { - eprintln!("https handshake failed - {}", err); - } - } - Err(_) => { - if debug { - eprintln!("https handshake timeout"); - } - } - } - - drop(accept_counter); // decrease reference count - }); - } - } - } - }); + tokio::spawn(accept_connection(listener, acceptor, debug, sender)); receiver } +async fn accept_connection( + listener: tokio::net::TcpListener, + acceptor: Arc, + debug: bool, + sender: tokio::sync::mpsc::Sender, +) { + let accept_counter = Arc::new(()); + + loop { + match listener.accept().await { + Err(err) => { + eprintln!("error accepting tcp connection: {}", err); + } + Ok((sock, _addr)) => { + sock.set_nodelay(true).unwrap(); + let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); + let acceptor = Arc::clone(&acceptor); + + let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { + Ok(ssl) => ssl, + Err(err) => { + eprintln!("failed to create Ssl object from Acceptor context - {}", err); + continue; + }, + }; + let stream = match tokio_openssl::SslStream::new(ssl, sock) { + Ok(stream) => stream, + Err(err) => { + eprintln!("failed to create SslStream using ssl and connection socket - {}", err); + continue; + }, + }; + + let mut stream = Box::pin(stream); + let sender = sender.clone(); + + if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { + eprintln!("connection rejected - to many open connections"); + continue; + } + + let accept_counter = accept_counter.clone(); + tokio::spawn(async move { + let accept_future = tokio::time::timeout( + Duration::new(10, 0), stream.as_mut().accept()); + + let result = accept_future.await; + + match result { + Ok(Ok(())) => { + if sender.send(Ok(stream)).await.is_err() && debug { + eprintln!("detect closed connection channel"); + } + } + Ok(Err(err)) => { + if debug { + eprintln!("https handshake failed - {}", err); + } + } + Err(_) => { + if debug { + eprintln!("https handshake timeout"); + } + } + } + + drop(accept_counter); // decrease reference count + }); + } + } + } +} + fn start_stat_generator() { let abort_future = server::shutdown_future(); let future = Box::pin(run_stat_generator());