fix #3106: correctly queue incoming connections
This commit is contained in:
parent
641862ddad
commit
48aa2b93b7
@ -1,4 +1,4 @@
|
||||
use std::sync::{Arc};
|
||||
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
@ -116,25 +116,12 @@ async fn run() -> Result<(), Error> {
|
||||
let server = daemon::create_daemon(
|
||||
([0,0,0,0,0,0,0,0], 8007).into(),
|
||||
|listener, ready| {
|
||||
let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
|
||||
.map_err(Error::from)
|
||||
.try_filter_map(move |(sock, _addr)| {
|
||||
let acceptor = Arc::clone(&acceptor);
|
||||
async move {
|
||||
sock.set_nodelay(true).unwrap();
|
||||
|
||||
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
|
||||
Ok(tokio_openssl::accept(&acceptor, sock)
|
||||
.await
|
||||
.ok() // handshake errors aren't be fatal, so return None to filter
|
||||
)
|
||||
}
|
||||
});
|
||||
let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
|
||||
let connections = accept_connections(listener, acceptor);
|
||||
let connections = hyper::server::accept::from_stream(connections);
|
||||
|
||||
Ok(ready
|
||||
.and_then(|_| hyper::Server::builder(connections)
|
||||
.and_then(|_| hyper::Server::builder(connections)
|
||||
.serve(rest_server)
|
||||
.with_graceful_shutdown(server::shutdown_future())
|
||||
.map_err(Error::from)
|
||||
@ -170,6 +157,66 @@ async fn run() -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn accept_connections(
|
||||
mut listener: tokio::net::TcpListener,
|
||||
acceptor: Arc<openssl::ssl::SslAcceptor>,
|
||||
) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> {
|
||||
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1024);
|
||||
|
||||
let accept_counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
const MAX_PENDING_ACCEPTS: usize = 1024;
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Err(err) => {
|
||||
eprintln!("error accepting tcp connection: {}", err);
|
||||
}
|
||||
Ok((sock, _addr)) => {
|
||||
sock.set_nodelay(true).unwrap();
|
||||
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
let acceptor = Arc::clone(&acceptor);
|
||||
let mut sender = sender.clone();
|
||||
|
||||
if accept_counter.load(Ordering::SeqCst) > MAX_PENDING_ACCEPTS {
|
||||
eprintln!("connection rejected - to many open connections");
|
||||
continue;
|
||||
}
|
||||
accept_counter.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let accept_counter = accept_counter.clone();
|
||||
tokio::spawn(async move {
|
||||
let accept_future = tokio::time::timeout(
|
||||
Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock));
|
||||
|
||||
let result = accept_future.await;
|
||||
|
||||
match result {
|
||||
Ok(Ok(connection)) => {
|
||||
if let Err(_) = sender.send(Ok(connection)).await {
|
||||
eprintln!("detect closed connection channel");
|
||||
}
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
eprintln!("https handshake failed - {}", err);
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("https handshake timeout");
|
||||
}
|
||||
}
|
||||
|
||||
accept_counter.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
receiver
|
||||
}
|
||||
|
||||
fn start_stat_generator() {
|
||||
let abort_future = server::shutdown_future();
|
||||
let future = Box::pin(run_stat_generator());
|
||||
|
Loading…
Reference in New Issue
Block a user