diff --git a/qemu-io/Cargo.toml b/qemu-io/Cargo.toml deleted file mode 100644 index 75a00f7c..00000000 --- a/qemu-io/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "qemu-io" -version = "0.1.0" -authors = [ - "Wolfgang Bumiller ", -] -edition = "2018" - -#[lib] -#crate-type = ['lib', 'cdylib'] - -[dependencies] -failure = "0.1" -mio = "0.6" - -# In this crate 'future' by default means standard-future. -# The 0.1-futures are exposed under the name 'futures_01'. - -[dependencies.futures-preview] -version = "0.3.0-alpha.15" -features = ["compat", "io-compat"] - -[dependencies.futures_01] -package = "futures" -version = "0.1" - -[features] -default = ["standalone"] -standalone = [] diff --git a/qemu-io/src/aio_context.rs b/qemu-io/src/aio_context.rs deleted file mode 100644 index b2b1c10b..00000000 --- a/qemu-io/src/aio_context.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Provides a handle to an AioContext. - -#[cfg(feature="standalone")] -mod standalone; -#[cfg(feature="standalone")] -pub use standalone::AioContext; - -// TODO: Add the non-standalone variant to be linked with Qemu: -// The AioContext struct should provide a high-level version of `set_fd_handler` with the same -// interface the standalone version provides out of the box (transparently turning closures into -// `extern "C" fn(opaque: *const c_void)` calls. diff --git a/qemu-io/src/aio_context/standalone.rs b/qemu-io/src/aio_context/standalone.rs deleted file mode 100644 index 5b425675..00000000 --- a/qemu-io/src/aio_context/standalone.rs +++ /dev/null @@ -1,188 +0,0 @@ -//! This implements the parts of qemu's AioContext interface we need for testing outside qemu. - -use std::collections::HashMap; -use std::os::unix::io::RawFd; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; - -use failure::Error; -use mio::{Events, Poll, Token}; -use mio::unix::EventedFd; - -use crate::util::{AioCb, AioHandlerState}; - -/// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context -/// with a polling thread. -#[derive(Clone)] -#[repr(transparent)] -pub struct AioContext(Arc); - -impl std::ops::Deref for AioContext { - type Target = AioContextImpl; - - fn deref(&self) -> &Self::Target { - &*self.0 - } -} - -impl AioContext { - /// Create a new `AioContext` instance with an associated polling thread, which will live as - /// long as there are references to it. - pub fn new() -> Result { - Ok(Self(AioContextImpl::new()?)) - } -} - -pub struct AioContextImpl { - poll: Poll, - handlers: RwLock>, - poll_thread: Mutex>>, -} - -impl AioContextImpl { - pub fn new() -> Result, Error> { - let this = Arc::new(Self { - poll: Poll::new()?, - handlers: RwLock::new(HashMap::new()), - poll_thread: Mutex::new(None), - }); - - let this2 = Arc::clone(&this); - this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop())); - - Ok(this) - } - - /// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation - /// as we don't use it. - /// ``` - /// void aio_set_fd_handler(AioContext *ctx, - /// int fd, - /// bool is_external, - /// IOHandler *io_read, - /// IOHandler *io_write, - /// AioPollFn *io_poll, - /// void *opaque); - /// ``` - /// - /// Since this does not have any ways of returning errors, wrong usage will cause a panic in - /// this test implementation. - pub fn set_fd_handler( - &self, - fd: RawFd, - io_read: Option, - io_write: Option, - // skipping io_poll, - //opaque: *const (), - ) { - self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level()) - } - - /// This is going to be a proposed new api for Qemu's AioContext. - pub fn set_fd_handler_edge( - &self, - fd: RawFd, - io_read: Option, - io_write: Option, - // skipping io_poll, - //opaque: *const (), - ) { - self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge()) - } - - fn set_fd_handler_impl( - &self, - fd: RawFd, - io_read: Option, - io_write: Option, - // skipping io_poll, - //opaque: *const (), - poll_opt: mio::PollOpt, - ) { - if io_read.is_none() && io_write.is_none() { - return self.remove_fd_handler(fd); - } - - let handlers = AioHandlerState { - read: io_read, - write: io_write, - }; - - let mio_ready = handlers.mio_ready(); - - let token = Token(fd as usize); - - use std::collections::hash_map::Entry; - match self.handlers.write().unwrap().entry(token) { - Entry::Vacant(entry) => { - self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt) - .expect("failed to register a new fd for polling"); - entry.insert(handlers); - } - Entry::Occupied(mut entry) => { - self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt) - .expect("failed to update an existing poll fd"); - entry.insert(handlers); - } - } - } - - fn remove_fd_handler(&self, fd: RawFd) { - let mut guard = self.handlers.write().unwrap(); - self.poll.deregister(&EventedFd(&fd)) - .expect("failed to remove an existing poll fd"); - guard.remove(&Token(fd as usize)); - } - - /// We don't use qemu's aio_poll, so let's make this easy: - /// - /// ``` - /// bool aio_poll(AioContext *ctx, bool blocking); - /// ``` - pub fn poll(&self) -> Result<(), Error> { - let timeout = Some(std::time::Duration::from_millis(100)); - - let mut events = Events::with_capacity(16); - - if self.poll.poll(&mut events, timeout)? == 0 { - return Ok(()); - } - - for event in events.iter() { - let token = event.token(); - let ready = event.readiness(); - // NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!! - // because they need to be edge-triggered and therefore *update* this handler list! - // - // While we could instead do this here (or use edge triggering from mio), this would - // not properly simulate Qemu's AioContext, so we enforce this behavior here as well. - // - // This means we cannot just hold a read lock during the events.iter() iteration - // though. - let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState { - // Those are Option! - read: h.read.clone(), - write: h.write.clone(), - }); - if let Some(handler) = handler { - if ready.is_readable() { - handler.read.as_ref().map(|func| func()); - } - if ready.is_writable() { - handler.write.as_ref().map(|func| func()); - } - } - } - - Ok(()) - } - - fn main_loop(mut self: Arc) { - while Arc::get_mut(&mut self).is_none() { - if let Err(err) = self.poll() { - dbg!("error AioContextImpl::poll(): {}", err); - break; - } - } - } -} diff --git a/qemu-io/src/lib.rs b/qemu-io/src/lib.rs deleted file mode 100644 index 073c2c43..00000000 --- a/qemu-io/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -// used for testing - -mod util; -mod with_aio_context; - -#[cfg(feature="standalone")] -mod aio_context; - -pub use with_aio_context::WithAioContext; -pub use aio_context::AioContext; diff --git a/qemu-io/src/util.rs b/qemu-io/src/util.rs deleted file mode 100644 index 6fed0164..00000000 --- a/qemu-io/src/util.rs +++ /dev/null @@ -1,39 +0,0 @@ -//! Some types used by both our internal testing AioContext implementation as well as our -//! WithAioContext wrapper. - -/// An Aio Callback. Qemu's AioContext actually uses a void function taking an opaque pointer. -/// For simplicity we stick to closures for now. -pub type AioCb = std::sync::Arc; - -/// This keeps track of our poll state (whether we wait to be notified for read or write -/// readiness.) -#[derive(Default)] -pub struct AioHandlerState { - pub read: Option, - pub write: Option, -} - -impl AioHandlerState { - /// Get an mio::Ready with readable set if `read` is `Some`, and writable - /// set if `write` is `Some`. - pub fn mio_ready(&self) -> mio::Ready { - use mio::Ready; - - let mut ready = Ready::empty(); - if self.read.is_some() { - ready |= Ready::readable(); - } - - if self.write.is_some() { - ready |= Ready::writable(); - } - - ready - } - - /// Shortcut - pub fn clear(&mut self) { - self.read = None; - self.write = None; - } -} diff --git a/qemu-io/src/with_aio_context.rs b/qemu-io/src/with_aio_context.rs deleted file mode 100644 index 3ed0b94b..00000000 --- a/qemu-io/src/with_aio_context.rs +++ /dev/null @@ -1,211 +0,0 @@ -//! This module provides `WithAioContext`, which is a helper to connect any raw I/O file descriptor -//! (`T: AsRawFd`) with an `AioContext`. - -use std::io; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::pin::Pin; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::task::{Context, Poll}; - -use mio::Ready; - -use crate::AioContext; -use crate::util::{AioCb, AioHandlerState}; - -/// This provides a basic mechanism to connect a type containing a file descriptor (i.e. it -/// implements `AsRawFd`) to an `AioContext`. -/// -/// If the underlying type implements `Read` this wrapper also provides an `AsyncRead` -/// implementation. Likewise it'll provide `AsyncWrite` for types implementing `Write`. -/// For this to function properly, the underlying type needs to return `io::Error` of kind -/// `io::ErrorKind::WouldBlock` on blocking operations which should be retried when the file -/// descriptor becomes ready. -/// -/// `WithAioContext` _owns_ the underlying object. This is because our Drop handler wants to -/// unregister the file descriptor, but systems like linux' epoll do that automatically when the fd -/// is closed, so we cannot have our file descriptor vanish before de-registering it, otherwise we -/// may be de-registering an already re-used number. -/// -/// Implements `Deref` so any methods of `T` still work on a `WithAioContext`. -pub struct WithAioContext { - aio_context: AioContext, - fd: RawFd, - handlers: Arc>, - inner: Option, -} - -impl std::ops::Deref for WithAioContext { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref().unwrap() - } -} - -impl std::ops::DerefMut for WithAioContext { - fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.as_mut().unwrap() - } -} - -impl WithAioContext { - pub fn new(aio_context: AioContext, inner: T) -> Self { - Self { - aio_context, - fd: inner.as_raw_fd(), - handlers: Arc::new(Mutex::new(Default::default())), - inner: Some(inner), - } - } - - /// Deregister from the `AioContext` and return the inner file handle. - pub fn into_inner(mut self) -> T { - let out = self.inner.take().unwrap(); - std::mem::drop(self); - out - } - - /// Shortcut around the `unwrap()`. The `Option<>` around `inner` is only there because we have - /// a `Drop` implementation which prevents us to move-out the value in the `into_inner()` - /// method. - fn inner_mut(&mut self) -> &mut T { - self.inner.as_mut().unwrap() - } - - /// Shortcut around the `unwrap()`, immutable variant: - //fn inner(&self) -> &T { - // self.inner.as_ref().unwrap() - //} - - /// Shortcut to set_fd_handlers. For the "real" qemu interface we'll have to turn the closures - /// into raw function pointers here (they'll get an opaque pointer parameter). - fn commit_handlers( - aio_context: &AioContext, - fd: RawFd, - handlers: &mut MutexGuard, - ) { - aio_context.set_fd_handler( - fd, - handlers.read.as_ref().map(|x| (*x).clone()), - handlers.write.as_ref().map(|x| (*x).clone()), - ) - } - - /// Create a waker closure for a context for a specific ready state. When a file descriptor is - /// ready for reading or writing, we need to remove the corresponding handler from the - /// `AioContext` (make it an edge-trigger instead of a level trigger) before finally calling - /// `waker.wake_by_ref()` to queue the task for polling. - fn make_wake_fn(&self, cx: &mut Context, ready: Ready) -> AioCb { - let waker = cx.waker().clone(); - - // we don't want to be publicly clonable so clone manually here: - let aio_context = self.aio_context.clone(); - let fd = self.fd; - let handlers = Arc::clone(&self.handlers); - Arc::new(move || { - let mut guard = handlers.lock().unwrap(); - - if ready.is_readable() { - guard.read = None; - } - - if ready.is_writable() { - guard.write = None; - } - - Self::commit_handlers(&aio_context, fd, &mut guard); - waker.wake_by_ref(); - }) - } - - /// Register our file descriptor with the `AioContext` for reading or writing. - /// This only affects the directions present in the provided `ready` value, and will leave the - /// other directions unchanged. - pub fn register(&self, cx: &mut Context, ready: Ready) { - let mut guard = self.handlers.lock().unwrap(); - - if ready.is_readable() { - guard.read = Some(self.make_wake_fn(cx, ready)); - } - - if ready.is_writable() { - guard.write = Some(self.make_wake_fn(cx, ready)); - } - - Self::commit_handlers(&self.aio_context, self.fd, &mut guard) - } - - /// Helper to handle an `io::Result`, turning `Result` into `Poll>`, by - /// changing an `io::ErrorKind::WouldBlock` into `Poll::Pending` and taking care of registering - /// the file descriptor with the AioContext for the next wake-up. - /// `Ok` and errors other than the above will be passed through wrapped in `Poll::Ready`. - pub fn handle_aio_result( - &self, - cx: &mut Context, - result: io::Result, - ready: Ready, - ) -> Poll> { - match result { - Ok(res) => Poll::Ready(Ok(res)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.register(cx, ready); - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - } -} - -impl Drop for WithAioContext { - fn drop(&mut self) { - let mut guard = self.handlers.lock().unwrap(); - (*guard).clear(); - if !guard.mio_ready().is_empty() { - Self::commit_handlers(&self.aio_context, self.fd, &mut guard); - } - } -} - -impl futures::io::AsyncRead for WithAioContext -where - T: AsRawFd + io::Read + Unpin, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll> { - let res = self.inner_mut().read(buf); - self.handle_aio_result(cx, res, mio::Ready::readable()) - } -} - -impl futures::io::AsyncWrite for WithAioContext -where - T: AsRawFd + io::Write + Unpin, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> Poll> { - let result = self.inner_mut().write(buf); - self.handle_aio_result(cx, result, mio::Ready::writable()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let result = self.inner_mut().flush(); - self.handle_aio_result(cx, result, mio::Ready::writable()) - } - - // I'm not sure what they expect me to do here. The `close()` syscall has no async variant, so - // all I can do is `flush()` and then drop the inner stream... - // - // Using `.into_inner()` after this will cause a panic. - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let result = self.inner_mut().flush(); - let _ = futures::ready!(self.handle_aio_result(cx, result, mio::Ready::writable())); - std::mem::drop(self.inner.take()); - Poll::Ready(Ok(())) - } -}