introduce new runtime tokio helpers

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2020-01-20 12:52:22 +01:00
parent aac9dbf635
commit d973aa827c
13 changed files with 173 additions and 43 deletions

View File

@ -8,10 +8,12 @@ use std::convert::TryFrom;
use chrono::offset::{TimeZone, Local};
use proxmox::tools::io::ReadExt;
use proxmox::sys::error::io_err_other;
use crate::pxar::catalog::BackupCatalogWriter;
use crate::pxar::{MatchPattern, MatchPatternSlice, MatchType};
use crate::backup::file_formats::PROXMOX_CATALOG_FILE_MAGIC_1_0;
use crate::tools::runtime::block_on;
#[repr(u8)]
#[derive(Copy,Clone,PartialEq)]
@ -384,12 +386,12 @@ impl SenderWriter {
impl Write for SenderWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
tokio::task::block_in_place(|| {
futures::executor::block_on(async move {
self.0.send(Ok(buf.to_vec())).await
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
Ok(buf.len())
})
block_on(async move {
self.0
.send(Ok(buf.to_vec()))
.await
.map_err(io_err_other)
.and(Ok(buf.len()))
})
}

View File

@ -57,7 +57,7 @@ async fn run() -> Result<(), Error> {
#[tokio::main]
async fn main() {
if let Err(err) = run().await {
if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
eprintln!("ERROR: {}", err);
}
println!("DONE");

View File

@ -69,8 +69,11 @@ fn send_request(
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
fn main() -> Result<(), Error> {
proxmox_backup::tools::runtime::main(run())
}
async fn run() -> Result<(), Error> {
let start = std::time::SystemTime::now();

View File

@ -67,8 +67,11 @@ fn send_request(
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
fn main() -> Result<(), Error> {
proxmox_backup::tools::runtime::main(run())
}
async fn run() -> Result<(), Error> {
let start = std::time::SystemTime::now();
let conn =

View File

@ -10,8 +10,11 @@ use proxmox_backup::configdir;
// Simple H2 server to test H2 speed with h2s-client.rs
#[tokio::main]
async fn main() -> Result<(), Error> {
fn main() -> Result<(), Error> {
proxmox_backup::tools::runtime::main(run())
}
async fn run() -> Result<(), Error> {
let key_path = configdir!("/proxy.key");
let cert_path = configdir!("/proxy.pem");

View File

@ -8,8 +8,11 @@ use tokio::io::{AsyncRead, AsyncWrite};
use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
#[tokio::main]
async fn main() -> Result<(), Error> {
fn main() -> Result<(), Error> {
proxmox_backup::tools::runtime::main(run())
}
async fn run() -> Result<(), Error> {
let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
println!("listening on {:?}", listener.local_addr());

View File

@ -13,9 +13,8 @@ use proxmox_backup::auth_helpers::*;
use proxmox_backup::config;
use proxmox_backup::buildcfg;
#[tokio::main]
async fn main() {
if let Err(err) = run().await {
fn main() {
if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
eprintln!("Error: {}", err);
std::process::exit(-1);
}

View File

@ -15,9 +15,8 @@ use proxmox_backup::tools::daemon;
use proxmox_backup::server::{ApiConfig, rest::*};
use proxmox_backup::auth_helpers::*;
#[tokio::main]
async fn main() {
if let Err(err) = run().await {
fn main() {
if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
eprintln!("Error: {}", err);
std::process::exit(-1);
}

View File

@ -12,9 +12,8 @@ use proxmox_backup::backup::*;
//
// Note: I can currently get about 830MB/s
#[tokio::main]
async fn main() {
if let Err(err) = run().await {
fn main() {
if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
panic!("ERROR: {}", err);
}
}

View File

@ -21,9 +21,8 @@ async fn upload_speed() -> Result<usize, Error> {
Ok(res)
}
#[tokio::main]
async fn main() {
match upload_speed().await {
fn main() {
match proxmox_backup::tools::runtime::main(upload_speed()) {
Ok(mbs) => {
println!("average upload speed: {} MB/s", mbs);
}

View File

@ -5,6 +5,7 @@ use failure::*;
use super::BackupReader;
use crate::backup::{ReadChunk, DataBlob, CryptConfig};
use crate::tools::runtime::block_on;
/// Read chunks from remote host using ``BackupReader``
pub struct RemoteChunkReader {
@ -35,7 +36,14 @@ impl ReadChunk for RemoteChunkReader {
let mut chunk_data = Vec::with_capacity(4*1024*1024);
tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?;
//tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?;
block_on(async {
// download_chunk returns the writer back to us, but we need to return a 'static value
self.client
.download_chunk(&digest, &mut chunk_data)
.await
.map(drop)
})?;
let chunk = DataBlob::from_raw(chunk_data)?;
chunk.verify_crc()?;

View File

@ -1,20 +1,131 @@
//! Helpers for quirks of the current tokio runtime.
use std::cell::RefCell;
use std::future::Future;
pub fn main<F, T>(fut: F) -> T
where
F: Future<Output = T> + Send + 'static,
T: std::fmt::Debug + Send + 'static,
{
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (tx, rx) = tokio::sync::oneshot::channel();
use lazy_static::lazy_static;
use tokio::runtime::{self, Runtime};
tokio::spawn(async move {
tx.send(fut.await).unwrap()
});
rx.await.unwrap()
})
thread_local! {
static HAS_RUNTIME: RefCell<bool> = RefCell::new(false);
static IN_TOKIO: RefCell<bool> = RefCell::new(false);
}
fn is_in_tokio() -> bool {
IN_TOKIO.with(|v| *v.borrow())
}
fn has_runtime() -> bool {
HAS_RUNTIME.with(|v| *v.borrow())
}
struct RuntimeGuard(bool);
impl RuntimeGuard {
fn enter() -> Self {
Self(HAS_RUNTIME.with(|v| {
let old = *v.borrow();
*v.borrow_mut() = true;
old
}))
}
}
impl Drop for RuntimeGuard {
fn drop(&mut self) {
HAS_RUNTIME.with(|v| {
*v.borrow_mut() = self.0;
});
}
}
lazy_static! {
static ref RUNTIME: Runtime = {
runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.on_thread_start(|| IN_TOKIO.with(|v| *v.borrow_mut() = true))
.build()
.expect("failed to spawn tokio runtime")
};
}
/// Get or create the current main tokio runtime.
///
/// This makes sure that tokio's worker threads are marked for us so that we know whether we
/// can/need to use `block_in_place` in our `block_on` helper.
pub fn get_runtime() -> &'static Runtime {
&RUNTIME
}
/// Associate the current newly spawned thread with the main tokio runtime.
pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R {
let _guard = RuntimeGuard::enter();
get_runtime().enter(f)
}
/// Block on a synchronous piece of code.
pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
if is_in_tokio() {
// we are in an actual tokio worker thread, block it:
tokio::task::block_in_place(fut)
} else {
// we're not inside a tokio worker, so just run the code:
fut()
}
}
/// Block on a future in this thread.
pub fn block_on<R, F>(fut: F) -> R
where
R: Send + 'static,
F: Future<Output = R> + Send,
{
if is_in_tokio() {
// inside a tokio worker we need to tell tokio that we're about to really block:
tokio::task::block_in_place(move || futures::executor::block_on(fut))
} else if has_runtime() {
// we're already associated with a runtime, but we're not a worker-thread, we can just
// block this thread directly
// This is not strictly necessary, but it's a bit quicker tha the else branch below.
futures::executor::block_on(fut)
} else {
// not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
// it on demand if necessary), then enter it:
enter_runtime(move || futures::executor::block_on(fut))
}
}
/*
fn block_on_impl<F>(mut fut: F) -> F::Output
where
F: Future + Send,
F::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
tokio::spawn(async move {
let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
tx
.send(fut.await)
.map_err(drop)
.expect("failed to send block_on result to channel")
});
futures::executor::block_on(async move {
rx.await.expect("failed to receive block_on result from channel")
})
std::mem::forget(fut);
}
*/
/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
/// compatibility, which will perform all the necessary tasks on-demand anyway.
pub fn main<F>(fut: F) -> F::Output
where
F: Future + Send,
F::Output: Send + 'static,
{
block_on(fut)
}

View File

@ -2,9 +2,10 @@ use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::block_in_place;
use futures::stream::Stream;
use crate::tools::runtime::block_in_place;
pub struct WrappedReaderStream<R: Read + Unpin> {
reader: R,
buffer: Vec<u8>,