add qemu-io crate, AioContext reactor helper
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		
							
								
								
									
										11
									
								
								qemu-io/src/aio_context.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								qemu-io/src/aio_context.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,11 @@
 | 
			
		||||
//! Provides a handle to an AioContext.
 | 
			
		||||
 | 
			
		||||
#[cfg(feature="standalone")]
 | 
			
		||||
mod standalone;
 | 
			
		||||
#[cfg(feature="standalone")]
 | 
			
		||||
pub use standalone::AioContext;
 | 
			
		||||
 | 
			
		||||
// TODO: Add the non-standalone variant to be linked with Qemu:
 | 
			
		||||
//    The AioContext struct should provide a high-level version of `set_fd_handler` with the same
 | 
			
		||||
//    interface the standalone version provides out of the box (transparently turning closures into
 | 
			
		||||
//    `extern "C" fn(opaque: *const c_void)` calls.
 | 
			
		||||
							
								
								
									
										188
									
								
								qemu-io/src/aio_context/standalone.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										188
									
								
								qemu-io/src/aio_context/standalone.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,188 @@
 | 
			
		||||
//! This implements the parts of qemu's AioContext interface we need for testing outside qemu.
 | 
			
		||||
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::os::unix::io::RawFd;
 | 
			
		||||
use std::sync::{Arc, Mutex, RwLock};
 | 
			
		||||
use std::thread;
 | 
			
		||||
 | 
			
		||||
use failure::Error;
 | 
			
		||||
use mio::{Events, Poll, Token};
 | 
			
		||||
use mio::unix::EventedFd;
 | 
			
		||||
 | 
			
		||||
use crate::util::{AioCb, AioHandlerState};
 | 
			
		||||
 | 
			
		||||
/// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context
 | 
			
		||||
/// with a polling thread.
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
#[repr(transparent)]
 | 
			
		||||
pub struct AioContext(Arc<AioContextImpl>);
 | 
			
		||||
 | 
			
		||||
impl std::ops::Deref for AioContext {
 | 
			
		||||
    type Target = AioContextImpl;
 | 
			
		||||
 | 
			
		||||
    fn deref(&self) -> &Self::Target {
 | 
			
		||||
        &*self.0
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl AioContext {
 | 
			
		||||
    /// Create a new `AioContext` instance with an associated polling thread, which will live as
 | 
			
		||||
    /// long as there are references to it.
 | 
			
		||||
    pub fn new() -> Result<Self, Error> {
 | 
			
		||||
        Ok(Self(AioContextImpl::new()?))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct AioContextImpl {
 | 
			
		||||
    poll: Poll,
 | 
			
		||||
    handlers: RwLock<HashMap<Token, AioHandlerState>>,
 | 
			
		||||
    poll_thread: Mutex<Option<thread::JoinHandle<()>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl AioContextImpl {
 | 
			
		||||
    pub fn new() -> Result<Arc<Self>, Error> {
 | 
			
		||||
        let this = Arc::new(Self {
 | 
			
		||||
            poll: Poll::new()?,
 | 
			
		||||
            handlers: RwLock::new(HashMap::new()),
 | 
			
		||||
            poll_thread: Mutex::new(None),
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let this2 = Arc::clone(&this);
 | 
			
		||||
        this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop()));
 | 
			
		||||
 | 
			
		||||
        Ok(this)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation
 | 
			
		||||
    /// as we don't use it.
 | 
			
		||||
    /// ```
 | 
			
		||||
    /// void aio_set_fd_handler(AioContext *ctx,
 | 
			
		||||
    ///                         int fd,
 | 
			
		||||
    ///                         bool is_external,
 | 
			
		||||
    ///                         IOHandler *io_read,
 | 
			
		||||
    ///                         IOHandler *io_write,
 | 
			
		||||
    ///                         AioPollFn *io_poll,
 | 
			
		||||
    ///                         void *opaque);
 | 
			
		||||
    /// ```
 | 
			
		||||
    ///
 | 
			
		||||
    /// Since this does not have any ways of returning errors, wrong usage will cause a panic in
 | 
			
		||||
    /// this test implementation.
 | 
			
		||||
    pub fn set_fd_handler(
 | 
			
		||||
        &self,
 | 
			
		||||
        fd: RawFd,
 | 
			
		||||
        io_read: Option<AioCb>,
 | 
			
		||||
        io_write: Option<AioCb>,
 | 
			
		||||
        // skipping io_poll,
 | 
			
		||||
        //opaque: *const (),
 | 
			
		||||
    ) {
 | 
			
		||||
        self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// This is going to be a proposed new api for Qemu's AioContext.
 | 
			
		||||
    pub fn set_fd_handler_edge(
 | 
			
		||||
        &self,
 | 
			
		||||
        fd: RawFd,
 | 
			
		||||
        io_read: Option<AioCb>,
 | 
			
		||||
        io_write: Option<AioCb>,
 | 
			
		||||
        // skipping io_poll,
 | 
			
		||||
        //opaque: *const (),
 | 
			
		||||
    ) {
 | 
			
		||||
        self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn set_fd_handler_impl(
 | 
			
		||||
        &self,
 | 
			
		||||
        fd: RawFd,
 | 
			
		||||
        io_read: Option<AioCb>,
 | 
			
		||||
        io_write: Option<AioCb>,
 | 
			
		||||
        // skipping io_poll,
 | 
			
		||||
        //opaque: *const (),
 | 
			
		||||
        poll_opt: mio::PollOpt,
 | 
			
		||||
    ) {
 | 
			
		||||
        if io_read.is_none() && io_write.is_none() {
 | 
			
		||||
            return self.remove_fd_handler(fd);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let handlers = AioHandlerState {
 | 
			
		||||
            read: io_read,
 | 
			
		||||
            write: io_write,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mio_ready = handlers.mio_ready();
 | 
			
		||||
 | 
			
		||||
        let token = Token(fd as usize);
 | 
			
		||||
 | 
			
		||||
        use std::collections::hash_map::Entry;
 | 
			
		||||
        match self.handlers.write().unwrap().entry(token) {
 | 
			
		||||
            Entry::Vacant(entry) => {
 | 
			
		||||
                self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt)
 | 
			
		||||
                    .expect("failed to register a new fd for polling");
 | 
			
		||||
                entry.insert(handlers);
 | 
			
		||||
            }
 | 
			
		||||
            Entry::Occupied(mut entry) => {
 | 
			
		||||
                self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt)
 | 
			
		||||
                    .expect("failed to update an existing poll fd");
 | 
			
		||||
                entry.insert(handlers);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn remove_fd_handler(&self, fd: RawFd) {
 | 
			
		||||
        let mut guard = self.handlers.write().unwrap();
 | 
			
		||||
        self.poll.deregister(&EventedFd(&fd))
 | 
			
		||||
            .expect("failed to remove an existing poll fd");
 | 
			
		||||
        guard.remove(&Token(fd as usize));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// We don't use qemu's aio_poll, so let's make this easy:
 | 
			
		||||
    ///
 | 
			
		||||
    /// ```
 | 
			
		||||
    /// bool aio_poll(AioContext *ctx, bool blocking);
 | 
			
		||||
    /// ```
 | 
			
		||||
    pub fn poll(&self) -> Result<(), Error> {
 | 
			
		||||
        let timeout = Some(std::time::Duration::from_millis(100));
 | 
			
		||||
 | 
			
		||||
        let mut events = Events::with_capacity(16);
 | 
			
		||||
 | 
			
		||||
        if self.poll.poll(&mut events, timeout)? == 0 {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for event in events.iter() {
 | 
			
		||||
            let token = event.token();
 | 
			
		||||
            let ready = event.readiness();
 | 
			
		||||
            // NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!!
 | 
			
		||||
            // because they need to be edge-triggered and therefore *update* this handler list!
 | 
			
		||||
            //
 | 
			
		||||
            // While we could instead do this here (or use edge triggering from mio), this would
 | 
			
		||||
            // not properly simulate Qemu's AioContext, so we enforce this behavior here as well.
 | 
			
		||||
            //
 | 
			
		||||
            // This means we cannot just hold a read lock during the events.iter() iteration
 | 
			
		||||
            // though.
 | 
			
		||||
            let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState {
 | 
			
		||||
                // Those are Option<Arc>!
 | 
			
		||||
                read: h.read.clone(),
 | 
			
		||||
                write: h.write.clone(),
 | 
			
		||||
            });
 | 
			
		||||
            if let Some(handler) = handler {
 | 
			
		||||
                if ready.is_readable() {
 | 
			
		||||
                    handler.read.as_ref().map(|func| func());
 | 
			
		||||
                }
 | 
			
		||||
                if ready.is_writable() {
 | 
			
		||||
                    handler.write.as_ref().map(|func| func());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn main_loop(mut self: Arc<Self>) {
 | 
			
		||||
        while Arc::get_mut(&mut self).is_none() {
 | 
			
		||||
            if let Err(err) = self.poll() {
 | 
			
		||||
                dbg!("error AioContextImpl::poll(): {}", err);
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										10
									
								
								qemu-io/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								qemu-io/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,10 @@
 | 
			
		||||
// used for testing
 | 
			
		||||
 | 
			
		||||
mod util;
 | 
			
		||||
mod with_aio_context;
 | 
			
		||||
 | 
			
		||||
#[cfg(feature="standalone")]
 | 
			
		||||
mod aio_context;
 | 
			
		||||
 | 
			
		||||
pub use with_aio_context::WithAioContext;
 | 
			
		||||
pub use aio_context::AioContext;
 | 
			
		||||
							
								
								
									
										39
									
								
								qemu-io/src/util.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								qemu-io/src/util.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,39 @@
 | 
			
		||||
//! Some types used by both our internal testing AioContext implementation as well as our
 | 
			
		||||
//! WithAioContext wrapper.
 | 
			
		||||
 | 
			
		||||
/// An Aio Callback. Qemu's AioContext actually uses a void function taking an opaque pointer.
 | 
			
		||||
/// For simplicity we stick to closures for now.
 | 
			
		||||
pub type AioCb = std::sync::Arc<dyn Fn() + Send + Sync>;
 | 
			
		||||
 | 
			
		||||
/// This keeps track of our poll state (whether we wait to be notified for read or write
 | 
			
		||||
/// readiness.)
 | 
			
		||||
#[derive(Default)]
 | 
			
		||||
pub struct AioHandlerState {
 | 
			
		||||
    pub read: Option<AioCb>,
 | 
			
		||||
    pub write: Option<AioCb>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl AioHandlerState {
 | 
			
		||||
    /// Get an mio::Ready with readable set if `read` is `Some`, and writable
 | 
			
		||||
    /// set if `write` is `Some`.
 | 
			
		||||
    pub fn mio_ready(&self) -> mio::Ready {
 | 
			
		||||
        use mio::Ready;
 | 
			
		||||
 | 
			
		||||
        let mut ready = Ready::empty();
 | 
			
		||||
        if self.read.is_some() {
 | 
			
		||||
            ready |= Ready::readable();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if self.write.is_some() {
 | 
			
		||||
            ready |= Ready::writable();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ready
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Shortcut
 | 
			
		||||
    pub fn clear(&mut self) {
 | 
			
		||||
        self.read = None;
 | 
			
		||||
        self.write = None;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										211
									
								
								qemu-io/src/with_aio_context.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										211
									
								
								qemu-io/src/with_aio_context.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,211 @@
 | 
			
		||||
//! This module provides `WithAioContext`, which is a helper to connect any raw I/O file descriptor
 | 
			
		||||
//! (`T: AsRawFd`) with an `AioContext`.
 | 
			
		||||
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::os::unix::io::{AsRawFd, RawFd};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::sync::{Arc, Mutex, MutexGuard};
 | 
			
		||||
use std::task::{Context, Poll};
 | 
			
		||||
 | 
			
		||||
use mio::Ready;
 | 
			
		||||
 | 
			
		||||
use crate::AioContext;
 | 
			
		||||
use crate::util::{AioCb, AioHandlerState};
 | 
			
		||||
 | 
			
		||||
/// This provides a basic mechanism to connect a type containing a file descriptor (i.e. it
 | 
			
		||||
/// implements `AsRawFd`) to an `AioContext`.
 | 
			
		||||
///
 | 
			
		||||
/// If the underlying type implements `Read` this wrapper also provides an `AsyncRead`
 | 
			
		||||
/// implementation. Likewise it'll provide `AsyncWrite` for types implementing `Write`.
 | 
			
		||||
/// For this to function properly, the underlying type needs to return `io::Error` of kind
 | 
			
		||||
/// `io::ErrorKind::WouldBlock` on blocking operations which should be retried when the file
 | 
			
		||||
/// descriptor becomes ready.
 | 
			
		||||
///
 | 
			
		||||
/// `WithAioContext` _owns_ the underlying object. This is because our Drop handler wants to
 | 
			
		||||
/// unregister the file descriptor, but systems like linux' epoll do that automatically when the fd
 | 
			
		||||
/// is closed, so we cannot have our file descriptor vanish before de-registering it, otherwise we
 | 
			
		||||
/// may be de-registering an already re-used number.
 | 
			
		||||
///
 | 
			
		||||
/// Implements `Deref<T>` so any methods of `T` still work on a `WithAioContext<T>`.
 | 
			
		||||
pub struct WithAioContext<T: AsRawFd> {
 | 
			
		||||
    aio_context: AioContext,
 | 
			
		||||
    fd: RawFd,
 | 
			
		||||
    handlers: Arc<Mutex<AioHandlerState>>,
 | 
			
		||||
    inner: Option<T>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsRawFd> std::ops::Deref for WithAioContext<T> {
 | 
			
		||||
    type Target = T;
 | 
			
		||||
 | 
			
		||||
    fn deref(&self) -> &Self::Target {
 | 
			
		||||
        self.inner.as_ref().unwrap()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsRawFd> std::ops::DerefMut for WithAioContext<T> {
 | 
			
		||||
    fn deref_mut(&mut self) -> &mut Self::Target {
 | 
			
		||||
        self.inner.as_mut().unwrap()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsRawFd> WithAioContext<T> {
 | 
			
		||||
    pub fn new(aio_context: AioContext, inner: T) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            aio_context,
 | 
			
		||||
            fd: inner.as_raw_fd(),
 | 
			
		||||
            handlers: Arc::new(Mutex::new(Default::default())),
 | 
			
		||||
            inner: Some(inner),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Deregister from the `AioContext` and return the inner file handle.
 | 
			
		||||
    pub fn into_inner(mut self) -> T {
 | 
			
		||||
        let out = self.inner.take().unwrap();
 | 
			
		||||
        std::mem::drop(self);
 | 
			
		||||
        out
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Shortcut around the `unwrap()`. The `Option<>` around `inner` is only there because we have
 | 
			
		||||
    /// a `Drop` implementation which prevents us to move-out the value in the `into_inner()`
 | 
			
		||||
    /// method.
 | 
			
		||||
    fn inner_mut(&mut self) -> &mut T {
 | 
			
		||||
        self.inner.as_mut().unwrap()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Shortcut around the `unwrap()`, immutable variant:
 | 
			
		||||
    //fn inner(&self) -> &T {
 | 
			
		||||
    //    self.inner.as_ref().unwrap()
 | 
			
		||||
    //}
 | 
			
		||||
 | 
			
		||||
    /// Shortcut to set_fd_handlers. For the "real" qemu interface we'll have to turn the closures
 | 
			
		||||
    /// into raw function pointers here (they'll get an opaque pointer parameter).
 | 
			
		||||
    fn commit_handlers(
 | 
			
		||||
        aio_context: &AioContext,
 | 
			
		||||
        fd: RawFd,
 | 
			
		||||
        handlers: &mut MutexGuard<AioHandlerState>,
 | 
			
		||||
    ) {
 | 
			
		||||
        aio_context.set_fd_handler(
 | 
			
		||||
            fd,
 | 
			
		||||
            handlers.read.as_ref().map(|x| (*x).clone()),
 | 
			
		||||
            handlers.write.as_ref().map(|x| (*x).clone()),
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Create a waker closure for a context for a specific ready state. When a file descriptor is
 | 
			
		||||
    /// ready for reading or writing, we need to remove the corresponding handler from the
 | 
			
		||||
    /// `AioContext` (make it an edge-trigger instead of a level trigger) before finally calling
 | 
			
		||||
    /// `waker.wake_by_ref()` to queue the task for polling.
 | 
			
		||||
    fn make_wake_fn(&self, cx: &mut Context, ready: Ready) -> AioCb {
 | 
			
		||||
        let waker = cx.waker().clone();
 | 
			
		||||
 | 
			
		||||
        // we don't want to be publicly clonable so clone manually here:
 | 
			
		||||
        let aio_context = self.aio_context.clone();
 | 
			
		||||
        let fd = self.fd;
 | 
			
		||||
        let handlers = Arc::clone(&self.handlers);
 | 
			
		||||
        Arc::new(move || {
 | 
			
		||||
            let mut guard = handlers.lock().unwrap();
 | 
			
		||||
 | 
			
		||||
            if ready.is_readable() {
 | 
			
		||||
                guard.read = None;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if ready.is_writable() {
 | 
			
		||||
                guard.write = None;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            Self::commit_handlers(&aio_context, fd, &mut guard);
 | 
			
		||||
            waker.wake_by_ref();
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Register our file descriptor with the `AioContext` for reading or writing.
 | 
			
		||||
    /// This only affects the directions present in the provided `ready` value, and will leave the
 | 
			
		||||
    /// other directions unchanged.
 | 
			
		||||
    pub fn register(&self, cx: &mut Context, ready: Ready) {
 | 
			
		||||
        let mut guard = self.handlers.lock().unwrap();
 | 
			
		||||
 | 
			
		||||
        if ready.is_readable() {
 | 
			
		||||
            guard.read = Some(self.make_wake_fn(cx, ready));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ready.is_writable() {
 | 
			
		||||
            guard.write = Some(self.make_wake_fn(cx, ready));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Self::commit_handlers(&self.aio_context, self.fd, &mut guard)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Helper to handle an `io::Result<T>`, turning `Result<T>` into `Poll<Result<T>>`, by
 | 
			
		||||
    /// changing an `io::ErrorKind::WouldBlock` into `Poll::Pending` and taking care of registering
 | 
			
		||||
    /// the file descriptor with the AioContext for the next wake-up.
 | 
			
		||||
    /// `Ok` and errors other than the above will be passed through wrapped in `Poll::Ready`.
 | 
			
		||||
    pub fn handle_aio_result<R>(
 | 
			
		||||
        &self,
 | 
			
		||||
        cx: &mut Context,
 | 
			
		||||
        result: io::Result<R>,
 | 
			
		||||
        ready: Ready,
 | 
			
		||||
    ) -> Poll<io::Result<R>> {
 | 
			
		||||
        match result {
 | 
			
		||||
            Ok(res) => Poll::Ready(Ok(res)),
 | 
			
		||||
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
 | 
			
		||||
                self.register(cx, ready);
 | 
			
		||||
                Poll::Pending
 | 
			
		||||
            }
 | 
			
		||||
            Err(err) => Poll::Ready(Err(err)),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsRawFd> Drop for WithAioContext<T> {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        let mut guard = self.handlers.lock().unwrap();
 | 
			
		||||
        (*guard).clear();
 | 
			
		||||
        if !guard.mio_ready().is_empty() {
 | 
			
		||||
            Self::commit_handlers(&self.aio_context, self.fd, &mut guard);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T> futures::io::AsyncRead for WithAioContext<T>
 | 
			
		||||
where
 | 
			
		||||
    T: AsRawFd + io::Read + Unpin,
 | 
			
		||||
{
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        mut self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context,
 | 
			
		||||
        buf: &mut [u8],
 | 
			
		||||
    ) -> Poll<io::Result<usize>> {
 | 
			
		||||
        let res = self.inner_mut().read(buf);
 | 
			
		||||
        self.handle_aio_result(cx, res, mio::Ready::readable())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T> futures::io::AsyncWrite for WithAioContext<T>
 | 
			
		||||
where
 | 
			
		||||
    T: AsRawFd + io::Write + Unpin,
 | 
			
		||||
{
 | 
			
		||||
    fn poll_write(
 | 
			
		||||
        mut self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context,
 | 
			
		||||
        buf: &[u8],
 | 
			
		||||
    ) -> Poll<io::Result<usize>> {
 | 
			
		||||
        let result = self.inner_mut().write(buf);
 | 
			
		||||
        self.handle_aio_result(cx, result, mio::Ready::writable())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
 | 
			
		||||
        let result = self.inner_mut().flush();
 | 
			
		||||
        self.handle_aio_result(cx, result, mio::Ready::writable())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // I'm not sure what they expect me to do here. The `close()` syscall has no async variant, so
 | 
			
		||||
    // all I can do is `flush()` and then drop the inner stream...
 | 
			
		||||
    //
 | 
			
		||||
    // Using `.into_inner()` after this will cause a panic.
 | 
			
		||||
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
 | 
			
		||||
        let result = self.inner_mut().flush();
 | 
			
		||||
        let _ = futures::ready!(self.handle_aio_result(cx, result, mio::Ready::writable()));
 | 
			
		||||
        std::mem::drop(self.inner.take());
 | 
			
		||||
        Poll::Ready(Ok(()))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user