remove unused qemu-io code
This commit is contained in:
parent
239d9bae95
commit
693f5d5ee8
@ -1,29 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "qemu-io"
|
|
||||||
version = "0.1.0"
|
|
||||||
authors = [
|
|
||||||
"Wolfgang Bumiller <w.bumiller@proxmox.com>",
|
|
||||||
]
|
|
||||||
edition = "2018"
|
|
||||||
|
|
||||||
#[lib]
|
|
||||||
#crate-type = ['lib', 'cdylib']
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
failure = "0.1"
|
|
||||||
mio = "0.6"
|
|
||||||
|
|
||||||
# In this crate 'future' by default means standard-future.
|
|
||||||
# The 0.1-futures are exposed under the name 'futures_01'.
|
|
||||||
|
|
||||||
[dependencies.futures-preview]
|
|
||||||
version = "0.3.0-alpha.15"
|
|
||||||
features = ["compat", "io-compat"]
|
|
||||||
|
|
||||||
[dependencies.futures_01]
|
|
||||||
package = "futures"
|
|
||||||
version = "0.1"
|
|
||||||
|
|
||||||
[features]
|
|
||||||
default = ["standalone"]
|
|
||||||
standalone = []
|
|
@ -1,11 +0,0 @@
|
|||||||
//! Provides a handle to an AioContext.
|
|
||||||
|
|
||||||
#[cfg(feature="standalone")]
|
|
||||||
mod standalone;
|
|
||||||
#[cfg(feature="standalone")]
|
|
||||||
pub use standalone::AioContext;
|
|
||||||
|
|
||||||
// TODO: Add the non-standalone variant to be linked with Qemu:
|
|
||||||
// The AioContext struct should provide a high-level version of `set_fd_handler` with the same
|
|
||||||
// interface the standalone version provides out of the box (transparently turning closures into
|
|
||||||
// `extern "C" fn(opaque: *const c_void)` calls.
|
|
@ -1,188 +0,0 @@
|
|||||||
//! This implements the parts of qemu's AioContext interface we need for testing outside qemu.
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::os::unix::io::RawFd;
|
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
use failure::Error;
|
|
||||||
use mio::{Events, Poll, Token};
|
|
||||||
use mio::unix::EventedFd;
|
|
||||||
|
|
||||||
use crate::util::{AioCb, AioHandlerState};
|
|
||||||
|
|
||||||
/// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context
|
|
||||||
/// with a polling thread.
|
|
||||||
#[derive(Clone)]
|
|
||||||
#[repr(transparent)]
|
|
||||||
pub struct AioContext(Arc<AioContextImpl>);
|
|
||||||
|
|
||||||
impl std::ops::Deref for AioContext {
|
|
||||||
type Target = AioContextImpl;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&*self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AioContext {
|
|
||||||
/// Create a new `AioContext` instance with an associated polling thread, which will live as
|
|
||||||
/// long as there are references to it.
|
|
||||||
pub fn new() -> Result<Self, Error> {
|
|
||||||
Ok(Self(AioContextImpl::new()?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct AioContextImpl {
|
|
||||||
poll: Poll,
|
|
||||||
handlers: RwLock<HashMap<Token, AioHandlerState>>,
|
|
||||||
poll_thread: Mutex<Option<thread::JoinHandle<()>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AioContextImpl {
|
|
||||||
pub fn new() -> Result<Arc<Self>, Error> {
|
|
||||||
let this = Arc::new(Self {
|
|
||||||
poll: Poll::new()?,
|
|
||||||
handlers: RwLock::new(HashMap::new()),
|
|
||||||
poll_thread: Mutex::new(None),
|
|
||||||
});
|
|
||||||
|
|
||||||
let this2 = Arc::clone(&this);
|
|
||||||
this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop()));
|
|
||||||
|
|
||||||
Ok(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation
|
|
||||||
/// as we don't use it.
|
|
||||||
/// ```
|
|
||||||
/// void aio_set_fd_handler(AioContext *ctx,
|
|
||||||
/// int fd,
|
|
||||||
/// bool is_external,
|
|
||||||
/// IOHandler *io_read,
|
|
||||||
/// IOHandler *io_write,
|
|
||||||
/// AioPollFn *io_poll,
|
|
||||||
/// void *opaque);
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// Since this does not have any ways of returning errors, wrong usage will cause a panic in
|
|
||||||
/// this test implementation.
|
|
||||||
pub fn set_fd_handler(
|
|
||||||
&self,
|
|
||||||
fd: RawFd,
|
|
||||||
io_read: Option<AioCb>,
|
|
||||||
io_write: Option<AioCb>,
|
|
||||||
// skipping io_poll,
|
|
||||||
//opaque: *const (),
|
|
||||||
) {
|
|
||||||
self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This is going to be a proposed new api for Qemu's AioContext.
|
|
||||||
pub fn set_fd_handler_edge(
|
|
||||||
&self,
|
|
||||||
fd: RawFd,
|
|
||||||
io_read: Option<AioCb>,
|
|
||||||
io_write: Option<AioCb>,
|
|
||||||
// skipping io_poll,
|
|
||||||
//opaque: *const (),
|
|
||||||
) {
|
|
||||||
self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_fd_handler_impl(
|
|
||||||
&self,
|
|
||||||
fd: RawFd,
|
|
||||||
io_read: Option<AioCb>,
|
|
||||||
io_write: Option<AioCb>,
|
|
||||||
// skipping io_poll,
|
|
||||||
//opaque: *const (),
|
|
||||||
poll_opt: mio::PollOpt,
|
|
||||||
) {
|
|
||||||
if io_read.is_none() && io_write.is_none() {
|
|
||||||
return self.remove_fd_handler(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
let handlers = AioHandlerState {
|
|
||||||
read: io_read,
|
|
||||||
write: io_write,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mio_ready = handlers.mio_ready();
|
|
||||||
|
|
||||||
let token = Token(fd as usize);
|
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
|
||||||
match self.handlers.write().unwrap().entry(token) {
|
|
||||||
Entry::Vacant(entry) => {
|
|
||||||
self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt)
|
|
||||||
.expect("failed to register a new fd for polling");
|
|
||||||
entry.insert(handlers);
|
|
||||||
}
|
|
||||||
Entry::Occupied(mut entry) => {
|
|
||||||
self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt)
|
|
||||||
.expect("failed to update an existing poll fd");
|
|
||||||
entry.insert(handlers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_fd_handler(&self, fd: RawFd) {
|
|
||||||
let mut guard = self.handlers.write().unwrap();
|
|
||||||
self.poll.deregister(&EventedFd(&fd))
|
|
||||||
.expect("failed to remove an existing poll fd");
|
|
||||||
guard.remove(&Token(fd as usize));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// We don't use qemu's aio_poll, so let's make this easy:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// bool aio_poll(AioContext *ctx, bool blocking);
|
|
||||||
/// ```
|
|
||||||
pub fn poll(&self) -> Result<(), Error> {
|
|
||||||
let timeout = Some(std::time::Duration::from_millis(100));
|
|
||||||
|
|
||||||
let mut events = Events::with_capacity(16);
|
|
||||||
|
|
||||||
if self.poll.poll(&mut events, timeout)? == 0 {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
for event in events.iter() {
|
|
||||||
let token = event.token();
|
|
||||||
let ready = event.readiness();
|
|
||||||
// NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!!
|
|
||||||
// because they need to be edge-triggered and therefore *update* this handler list!
|
|
||||||
//
|
|
||||||
// While we could instead do this here (or use edge triggering from mio), this would
|
|
||||||
// not properly simulate Qemu's AioContext, so we enforce this behavior here as well.
|
|
||||||
//
|
|
||||||
// This means we cannot just hold a read lock during the events.iter() iteration
|
|
||||||
// though.
|
|
||||||
let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState {
|
|
||||||
// Those are Option<Arc>!
|
|
||||||
read: h.read.clone(),
|
|
||||||
write: h.write.clone(),
|
|
||||||
});
|
|
||||||
if let Some(handler) = handler {
|
|
||||||
if ready.is_readable() {
|
|
||||||
handler.read.as_ref().map(|func| func());
|
|
||||||
}
|
|
||||||
if ready.is_writable() {
|
|
||||||
handler.write.as_ref().map(|func| func());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main_loop(mut self: Arc<Self>) {
|
|
||||||
while Arc::get_mut(&mut self).is_none() {
|
|
||||||
if let Err(err) = self.poll() {
|
|
||||||
dbg!("error AioContextImpl::poll(): {}", err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
// used for testing
|
|
||||||
|
|
||||||
mod util;
|
|
||||||
mod with_aio_context;
|
|
||||||
|
|
||||||
#[cfg(feature="standalone")]
|
|
||||||
mod aio_context;
|
|
||||||
|
|
||||||
pub use with_aio_context::WithAioContext;
|
|
||||||
pub use aio_context::AioContext;
|
|
@ -1,39 +0,0 @@
|
|||||||
//! Some types used by both our internal testing AioContext implementation as well as our
|
|
||||||
//! WithAioContext wrapper.
|
|
||||||
|
|
||||||
/// An Aio Callback. Qemu's AioContext actually uses a void function taking an opaque pointer.
|
|
||||||
/// For simplicity we stick to closures for now.
|
|
||||||
pub type AioCb = std::sync::Arc<dyn Fn() + Send + Sync>;
|
|
||||||
|
|
||||||
/// This keeps track of our poll state (whether we wait to be notified for read or write
|
|
||||||
/// readiness.)
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct AioHandlerState {
|
|
||||||
pub read: Option<AioCb>,
|
|
||||||
pub write: Option<AioCb>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AioHandlerState {
|
|
||||||
/// Get an mio::Ready with readable set if `read` is `Some`, and writable
|
|
||||||
/// set if `write` is `Some`.
|
|
||||||
pub fn mio_ready(&self) -> mio::Ready {
|
|
||||||
use mio::Ready;
|
|
||||||
|
|
||||||
let mut ready = Ready::empty();
|
|
||||||
if self.read.is_some() {
|
|
||||||
ready |= Ready::readable();
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.write.is_some() {
|
|
||||||
ready |= Ready::writable();
|
|
||||||
}
|
|
||||||
|
|
||||||
ready
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shortcut
|
|
||||||
pub fn clear(&mut self) {
|
|
||||||
self.read = None;
|
|
||||||
self.write = None;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,211 +0,0 @@
|
|||||||
//! This module provides `WithAioContext`, which is a helper to connect any raw I/O file descriptor
|
|
||||||
//! (`T: AsRawFd`) with an `AioContext`.
|
|
||||||
|
|
||||||
use std::io;
|
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::{Arc, Mutex, MutexGuard};
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use mio::Ready;
|
|
||||||
|
|
||||||
use crate::AioContext;
|
|
||||||
use crate::util::{AioCb, AioHandlerState};
|
|
||||||
|
|
||||||
/// This provides a basic mechanism to connect a type containing a file descriptor (i.e. it
|
|
||||||
/// implements `AsRawFd`) to an `AioContext`.
|
|
||||||
///
|
|
||||||
/// If the underlying type implements `Read` this wrapper also provides an `AsyncRead`
|
|
||||||
/// implementation. Likewise it'll provide `AsyncWrite` for types implementing `Write`.
|
|
||||||
/// For this to function properly, the underlying type needs to return `io::Error` of kind
|
|
||||||
/// `io::ErrorKind::WouldBlock` on blocking operations which should be retried when the file
|
|
||||||
/// descriptor becomes ready.
|
|
||||||
///
|
|
||||||
/// `WithAioContext` _owns_ the underlying object. This is because our Drop handler wants to
|
|
||||||
/// unregister the file descriptor, but systems like linux' epoll do that automatically when the fd
|
|
||||||
/// is closed, so we cannot have our file descriptor vanish before de-registering it, otherwise we
|
|
||||||
/// may be de-registering an already re-used number.
|
|
||||||
///
|
|
||||||
/// Implements `Deref<T>` so any methods of `T` still work on a `WithAioContext<T>`.
|
|
||||||
pub struct WithAioContext<T: AsRawFd> {
|
|
||||||
aio_context: AioContext,
|
|
||||||
fd: RawFd,
|
|
||||||
handlers: Arc<Mutex<AioHandlerState>>,
|
|
||||||
inner: Option<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRawFd> std::ops::Deref for WithAioContext<T> {
|
|
||||||
type Target = T;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
self.inner.as_ref().unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRawFd> std::ops::DerefMut for WithAioContext<T> {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
self.inner.as_mut().unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRawFd> WithAioContext<T> {
|
|
||||||
pub fn new(aio_context: AioContext, inner: T) -> Self {
|
|
||||||
Self {
|
|
||||||
aio_context,
|
|
||||||
fd: inner.as_raw_fd(),
|
|
||||||
handlers: Arc::new(Mutex::new(Default::default())),
|
|
||||||
inner: Some(inner),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deregister from the `AioContext` and return the inner file handle.
|
|
||||||
pub fn into_inner(mut self) -> T {
|
|
||||||
let out = self.inner.take().unwrap();
|
|
||||||
std::mem::drop(self);
|
|
||||||
out
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shortcut around the `unwrap()`. The `Option<>` around `inner` is only there because we have
|
|
||||||
/// a `Drop` implementation which prevents us to move-out the value in the `into_inner()`
|
|
||||||
/// method.
|
|
||||||
fn inner_mut(&mut self) -> &mut T {
|
|
||||||
self.inner.as_mut().unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shortcut around the `unwrap()`, immutable variant:
|
|
||||||
//fn inner(&self) -> &T {
|
|
||||||
// self.inner.as_ref().unwrap()
|
|
||||||
//}
|
|
||||||
|
|
||||||
/// Shortcut to set_fd_handlers. For the "real" qemu interface we'll have to turn the closures
|
|
||||||
/// into raw function pointers here (they'll get an opaque pointer parameter).
|
|
||||||
fn commit_handlers(
|
|
||||||
aio_context: &AioContext,
|
|
||||||
fd: RawFd,
|
|
||||||
handlers: &mut MutexGuard<AioHandlerState>,
|
|
||||||
) {
|
|
||||||
aio_context.set_fd_handler(
|
|
||||||
fd,
|
|
||||||
handlers.read.as_ref().map(|x| (*x).clone()),
|
|
||||||
handlers.write.as_ref().map(|x| (*x).clone()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a waker closure for a context for a specific ready state. When a file descriptor is
|
|
||||||
/// ready for reading or writing, we need to remove the corresponding handler from the
|
|
||||||
/// `AioContext` (make it an edge-trigger instead of a level trigger) before finally calling
|
|
||||||
/// `waker.wake_by_ref()` to queue the task for polling.
|
|
||||||
fn make_wake_fn(&self, cx: &mut Context, ready: Ready) -> AioCb {
|
|
||||||
let waker = cx.waker().clone();
|
|
||||||
|
|
||||||
// we don't want to be publicly clonable so clone manually here:
|
|
||||||
let aio_context = self.aio_context.clone();
|
|
||||||
let fd = self.fd;
|
|
||||||
let handlers = Arc::clone(&self.handlers);
|
|
||||||
Arc::new(move || {
|
|
||||||
let mut guard = handlers.lock().unwrap();
|
|
||||||
|
|
||||||
if ready.is_readable() {
|
|
||||||
guard.read = None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ready.is_writable() {
|
|
||||||
guard.write = None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::commit_handlers(&aio_context, fd, &mut guard);
|
|
||||||
waker.wake_by_ref();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register our file descriptor with the `AioContext` for reading or writing.
|
|
||||||
/// This only affects the directions present in the provided `ready` value, and will leave the
|
|
||||||
/// other directions unchanged.
|
|
||||||
pub fn register(&self, cx: &mut Context, ready: Ready) {
|
|
||||||
let mut guard = self.handlers.lock().unwrap();
|
|
||||||
|
|
||||||
if ready.is_readable() {
|
|
||||||
guard.read = Some(self.make_wake_fn(cx, ready));
|
|
||||||
}
|
|
||||||
|
|
||||||
if ready.is_writable() {
|
|
||||||
guard.write = Some(self.make_wake_fn(cx, ready));
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::commit_handlers(&self.aio_context, self.fd, &mut guard)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Helper to handle an `io::Result<T>`, turning `Result<T>` into `Poll<Result<T>>`, by
|
|
||||||
/// changing an `io::ErrorKind::WouldBlock` into `Poll::Pending` and taking care of registering
|
|
||||||
/// the file descriptor with the AioContext for the next wake-up.
|
|
||||||
/// `Ok` and errors other than the above will be passed through wrapped in `Poll::Ready`.
|
|
||||||
pub fn handle_aio_result<R>(
|
|
||||||
&self,
|
|
||||||
cx: &mut Context,
|
|
||||||
result: io::Result<R>,
|
|
||||||
ready: Ready,
|
|
||||||
) -> Poll<io::Result<R>> {
|
|
||||||
match result {
|
|
||||||
Ok(res) => Poll::Ready(Ok(res)),
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.register(cx, ready);
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
Err(err) => Poll::Ready(Err(err)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRawFd> Drop for WithAioContext<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let mut guard = self.handlers.lock().unwrap();
|
|
||||||
(*guard).clear();
|
|
||||||
if !guard.mio_ready().is_empty() {
|
|
||||||
Self::commit_handlers(&self.aio_context, self.fd, &mut guard);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> futures::io::AsyncRead for WithAioContext<T>
|
|
||||||
where
|
|
||||||
T: AsRawFd + io::Read + Unpin,
|
|
||||||
{
|
|
||||||
fn poll_read(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
buf: &mut [u8],
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
let res = self.inner_mut().read(buf);
|
|
||||||
self.handle_aio_result(cx, res, mio::Ready::readable())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> futures::io::AsyncWrite for WithAioContext<T>
|
|
||||||
where
|
|
||||||
T: AsRawFd + io::Write + Unpin,
|
|
||||||
{
|
|
||||||
fn poll_write(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
buf: &[u8],
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
let result = self.inner_mut().write(buf);
|
|
||||||
self.handle_aio_result(cx, result, mio::Ready::writable())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
|
||||||
let result = self.inner_mut().flush();
|
|
||||||
self.handle_aio_result(cx, result, mio::Ready::writable())
|
|
||||||
}
|
|
||||||
|
|
||||||
// I'm not sure what they expect me to do here. The `close()` syscall has no async variant, so
|
|
||||||
// all I can do is `flush()` and then drop the inner stream...
|
|
||||||
//
|
|
||||||
// Using `.into_inner()` after this will cause a panic.
|
|
||||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
|
||||||
let result = self.inner_mut().flush();
|
|
||||||
let _ = futures::ready!(self.handle_aio_result(cx, result, mio::Ready::writable()));
|
|
||||||
std::mem::drop(self.inner.take());
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user