update to tokio 0.2.0-alpha.4

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-09-02 15:16:21 +02:00
parent 1434f4f8df
commit 083ff3fd5d
8 changed files with 72 additions and 54 deletions

View File

@ -19,7 +19,6 @@ futures-preview = "0.3.0-alpha"
h2 = { version = "0.2.0-alpha", git = "https://github.com/hyperium/h2", features = ["stream"] } h2 = { version = "0.2.0-alpha", git = "https://github.com/hyperium/h2", features = ["stream"] }
http = "0.1" http = "0.1"
hyper = { version = "0.13.0-a.0", git = "https://github.com/hyperium/hyper" } hyper = { version = "0.13.0-a.0", git = "https://github.com/hyperium/hyper" }
hyper-openssl = { version = "0.7", path = "hyper-openssl" }
lazy_static = "1.3" lazy_static = "1.3"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
@ -39,9 +38,9 @@ shellwords = "1.0"
siphasher = "0.3" siphasher = "0.3"
syslog = "4.0" syslog = "4.0"
textwrap = "0.11" textwrap = "0.11"
tokio = { version = "0.2.0-alpha.2" } tokio = { version = "0.2.0-alpha.4" }
tokio-executor = { version = "0.2.0-alpha.2" } tokio-executor = { version = "0.2.0-alpha.4" }
tokio-net = { version = "0.2.0-alpha.2", features = ["signal"] } tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
tokio-openssl = "0.4.0-alpha.2" tokio-openssl = "0.4.0-alpha.2"
tower-service = "0.3.0-alpha.1" tower-service = "0.3.0-alpha.1"
url = "1.7" url = "1.7"

View File

@ -74,7 +74,7 @@ async fn main() -> Result<(), Error> {
let start = std::time::SystemTime::now(); let start = std::time::SystemTime::now();
let conn = TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()) let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008)))
.await?; .await?;
let (client, h2) = h2::client::Builder::new() let (client, h2) = h2::client::Builder::new()

View File

@ -71,7 +71,8 @@ fn send_request(
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
let start = std::time::SystemTime::now(); let start = std::time::SystemTime::now();
let conn = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()).await?; let conn =
tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
conn.set_nodelay(true).unwrap(); conn.set_nodelay(true).unwrap();
conn.set_recv_buffer_size(1024*1024).unwrap(); conn.set_recv_buffer_size(1024*1024).unwrap();

View File

@ -24,7 +24,7 @@ async fn main() -> Result<(), Error> {
let acceptor = Arc::new(acceptor.build()); let acceptor = Arc::new(acceptor.build());
let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap(); let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
println!("listening on {:?}", listener.local_addr()); println!("listening on {:?}", listener.local_addr());

View File

@ -10,7 +10,7 @@ use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap(); let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
println!("listening on {:?}", listener.local_addr()); println!("listening on {:?}", listener.local_addr());

View File

@ -55,10 +55,13 @@ async fn run() -> Result<(), Error> {
// http server future: // http server future:
let server = daemon::create_daemon( let server = daemon::create_daemon(
([127,0,0,1], 82).into(), ([127,0,0,1], 82).into(),
move |listener| { move |listener, ready| {
Ok(hyper::Server::builder(listener.incoming()) Ok(ready
.and_then(|_| hyper::Server::builder(listener.incoming())
.serve(rest_server) .serve(rest_server)
.with_graceful_shutdown(server::shutdown_future()) .with_graceful_shutdown(server::shutdown_future())
.map_err(Error::from)
)
.map(|e| { .map(|e| {
if let Err(e) = e { if let Err(e) = e {
eprintln!("server error: {}", e); eprintln!("server error: {}", e);
@ -66,7 +69,7 @@ async fn run() -> Result<(), Error> {
}) })
) )
}, },
)?; );
daemon::systemd_notify(daemon::SystemdNotify::Ready)?; daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
@ -80,7 +83,7 @@ async fn run() -> Result<(), Error> {
bail!("unable to start daemon - {}", err); bail!("unable to start daemon - {}", err);
} }
server.await; server.await?;
log::info!("done - exit server"); log::info!("done - exit server");

View File

@ -71,7 +71,7 @@ async fn run() -> Result<(), Error> {
let server = daemon::create_daemon( let server = daemon::create_daemon(
([0,0,0,0,0,0,0,0], 8007).into(), ([0,0,0,0,0,0,0,0], 8007).into(),
|listener| { |listener, ready| {
let connections = listener let connections = listener
.incoming() .incoming()
.map_err(Error::from) .map_err(Error::from)
@ -87,14 +87,18 @@ async fn run() -> Result<(), Error> {
) )
} }
}); });
Ok(hyper::Server::builder(connections)
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
.serve(rest_server) .serve(rest_server)
.with_graceful_shutdown(server::shutdown_future()) .with_graceful_shutdown(server::shutdown_future())
.map_err(Error::from)
)
.map_err(|err| eprintln!("server error: {}", err)) .map_err(|err| eprintln!("server error: {}", err))
.map(|_| ()) .map(|_| ())
) )
}, },
)?; );
daemon::systemd_notify(daemon::SystemdNotify::Ready)?; daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
@ -108,7 +112,7 @@ async fn run() -> Result<(), Error> {
bail!("unable to start daemon - {}", err); bail!("unable to start daemon - {}", err);
} }
server.await; server.await?;
log::info!("done - exit server"); log::info!("done - exit server");
Ok(()) Ok(())

View File

@ -1,13 +1,15 @@
//! Helpers for daemons/services. //! Helpers for daemons/services.
use std::ffi::CString; use std::ffi::CString;
use std::future::Future;
use std::os::raw::{c_char, c_int}; use std::os::raw::{c_char, c_int};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::panic::UnwindSafe; use std::panic::UnwindSafe;
use std::pin::Pin;
use std::task::{Context, Poll};
use failure::*; use failure::*;
use tokio::prelude::*;
use proxmox::tools::io::{ReadExt, WriteExt}; use proxmox::tools::io::{ReadExt, WriteExt};
@ -48,14 +50,15 @@ impl Reloader {
/// the function provided in the `or_create` parameter to instantiate the new "first" instance. /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
/// ///
/// Values created via this method will be remembered for later re-execution. /// Values created via this method will be remembered for later re-execution.
pub fn restore<T, F>(&mut self, name: &'static str, or_create: F) -> Result<T, Error> pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
where where
T: Reloadable, T: Reloadable,
F: FnOnce() -> Result<T, Error>, F: FnOnce() -> U,
U: Future<Output = Result<T, Error>>,
{ {
let res = match std::env::var(name) { let res = match std::env::var(name) {
Ok(varstr) => T::restore(&varstr)?, Ok(varstr) => T::restore(&varstr)?,
Err(std::env::VarError::NotPresent) => or_create()?, Err(std::env::VarError::NotPresent) => or_create().await?,
Err(_) => bail!("variable {} has invalid value", name), Err(_) => bail!("variable {} has invalid value", name),
}; };
@ -194,33 +197,42 @@ impl Reloadable for tokio::net::TcpListener {
} }
} }
pub struct NotifyReady;
impl Future for NotifyReady {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
systemd_notify(SystemdNotify::Ready)?;
Poll::Ready(Ok(()))
}
}
/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
/// If this is started regularly, a listening socket is created. In this case, the file descriptor /// If this is started regularly, a listening socket is created. In this case, the file descriptor
/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
/// If the variable already exists, its contents will instead be used to restore the listening /// If the variable already exists, its contents will instead be used to restore the listening
/// socket. The finished listening socket is then passed to the `create_service` function which /// socket. The finished listening socket is then passed to the `create_service` function which
/// can be used to setup the TLS and the HTTP daemon. /// can be used to setup the TLS and the HTTP daemon.
pub fn create_daemon<F, S>( pub async fn create_daemon<F, S>(
address: std::net::SocketAddr, address: std::net::SocketAddr,
create_service: F, create_service: F,
) -> Result<impl Future<Output = ()>, Error> ) -> Result<(), Error>
where where
F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>, F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
S: Future<Output = ()>, S: Future<Output = ()>,
{ {
let mut reloader = Reloader::new(); let mut reloader = Reloader::new();
let listener: tokio::net::TcpListener = reloader.restore( let listener: tokio::net::TcpListener = reloader.restore(
"PROXMOX_BACKUP_LISTEN_FD", "PROXMOX_BACKUP_LISTEN_FD",
move || Ok(tokio::net::TcpListener::bind(&address)?), move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
)?; ).await?;
let service = create_service(listener)?; create_service(listener, NotifyReady)?.await;
let mut reloader = Some(reloader); let mut reloader = Some(reloader);
Ok(service
.map(move |_| {
crate::tools::request_shutdown(); // make sure we are in shutdown mode crate::tools::request_shutdown(); // make sure we are in shutdown mode
if server::is_reload_request() { if server::is_reload_request() {
log::info!("daemon reload..."); log::info!("daemon reload...");
@ -234,8 +246,7 @@ where
} else { } else {
log::info!("daemon shutting down..."); log::info!("daemon shutting down...");
} }
}) Ok(())
)
} }
#[link(name = "systemd")] #[link(name = "systemd")]