runtime: fix blocking strategy:

- do not "double"-block_in_place() (it may not be nested)
- do not call block_in_place() in non-worker threads

is_in_tokio() isn't sufficient, we need to actually know
that we're in a worker-thread, so we do this by remembering
that we're blocking.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2020-02-17 09:48:22 +01:00
parent 1283d58ca9
commit 9e003074cb

View File

@ -10,7 +10,7 @@ use lazy_static::lazy_static;
use tokio::runtime::{self, Runtime}; use tokio::runtime::{self, Runtime};
thread_local! { thread_local! {
static HAS_RUNTIME: RefCell<bool> = RefCell::new(false); static BLOCKING: RefCell<bool> = RefCell::new(false);
} }
fn is_in_tokio() -> bool { fn is_in_tokio() -> bool {
@ -18,15 +18,15 @@ fn is_in_tokio() -> bool {
.is_ok() .is_ok()
} }
fn has_runtime() -> bool { fn is_blocking() -> bool {
HAS_RUNTIME.with(|v| *v.borrow()) BLOCKING.with(|v| *v.borrow())
} }
struct RuntimeGuard(bool); struct BlockingGuard(bool);
impl RuntimeGuard { impl BlockingGuard {
fn enter() -> Self { fn set() -> Self {
Self(HAS_RUNTIME.with(|v| { Self(BLOCKING.with(|v| {
let old = *v.borrow(); let old = *v.borrow();
*v.borrow_mut() = true; *v.borrow_mut() = true;
old old
@ -34,9 +34,9 @@ impl RuntimeGuard {
} }
} }
impl Drop for RuntimeGuard { impl Drop for BlockingGuard {
fn drop(&mut self) { fn drop(&mut self) {
HAS_RUNTIME.with(|v| { BLOCKING.with(|v| {
*v.borrow_mut() = self.0; *v.borrow_mut() = self.0;
}); });
} }
@ -60,37 +60,37 @@ pub fn get_runtime() -> &'static Runtime {
&RUNTIME &RUNTIME
} }
/// Associate the current newly spawned thread with the main tokio runtime.
pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R {
let _guard = RuntimeGuard::enter();
get_runtime().enter(f)
}
/// Block on a synchronous piece of code. /// Block on a synchronous piece of code.
pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R { pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
if is_in_tokio() { // don't double-exit the context (tokio doesn't like that)
// we are in an actual tokio worker thread, block it: // also, if we're not actually in a tokio-worker we must not use block_in_place() either
tokio::task::block_in_place(fut) if is_blocking() || !is_in_tokio() {
} else {
// we're not inside a tokio worker, so just run the code:
fut() fut()
} else {
// we are in an actual tokio worker thread, block it:
tokio::task::block_in_place(move || {
let _guard = BlockingGuard::set();
fut()
})
} }
} }
/// Block on a future in this thread. /// Block on a future in this thread.
pub fn block_on<F: Future>(fut: F) -> F::Output { pub fn block_on<F: Future>(fut: F) -> F::Output {
if is_in_tokio() { // don't double-exit the context (tokio doesn't like that)
// inside a tokio worker we need to tell tokio that we're about to really block: if is_blocking() {
tokio::task::block_in_place(move || block_on_local_future(fut))
} else if has_runtime() {
// we're already associated with a runtime, but we're not a worker-thread, we can just
// block this thread directly
// This is not strictly necessary, but it's a bit quicker tha the else branch below.
block_on_local_future(fut) block_on_local_future(fut)
} else if is_in_tokio() {
// inside a tokio worker we need to tell tokio that we're about to really block:
tokio::task::block_in_place(move || {
let _guard = BlockingGuard::set();
block_on_local_future(fut)
})
} else { } else {
// not a worker thread, not associated with a runtime, make sure we have a runtime (spawn // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
// it on demand if necessary), then enter it: // it on demand if necessary), then enter it
enter_runtime(move || block_on_local_future(fut)) let _guard = BlockingGuard::set();
get_runtime().enter(move || block_on_local_future(fut))
} }
} }