src/client/pxar_backup_stream.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-23 14:36:51 +02:00
parent 2107bb40c1
commit 369a87e3a2
1 changed files with 19 additions and 11 deletions

View File

@ -1,12 +1,13 @@
use failure::*; use std::collections::HashSet;
use std::io::{Write, Seek}; use std::io::{Seek, Write};
use std::thread;
use std::sync::{Arc, Mutex};
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::collections::HashSet; use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::thread;
use futures::Poll; use failure::*;
use futures::stream::Stream; use futures::stream::Stream;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
@ -38,6 +39,7 @@ impl Drop for PxarBackupStream {
} }
impl PxarBackupStream { impl PxarBackupStream {
pin_utils::unsafe_pinned!(stream: Option<WrappedReaderStream<std::fs::File>>);
pub fn new<W: Write + Seek + Send + 'static>( pub fn new<W: Write + Seek + Send + 'static>(
mut dir: Dir, mut dir: Dir,
@ -93,16 +95,22 @@ impl PxarBackupStream {
impl Stream for PxarBackupStream { impl Stream for PxarBackupStream {
type Item = Vec<u8>; type Item = Result<Vec<u8>, Error>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
{ // limit lock scope { // limit lock scope
let error = self.error.lock().unwrap(); let error = self.error.lock().unwrap();
if let Some(ref msg) = *error { if let Some(ref msg) = *error {
return Err(format_err!("{}", msg)); return Poll::Ready(Some(Err(format_err!("{}", msg))));
} }
} }
self.stream.as_mut().unwrap().poll().map_err(Error::from) let res = self.as_mut()
.stream()
.as_pin_mut()
.unwrap()
.poll_next(cx);
Poll::Ready(futures::ready!(res)
.map(|v| v.map_err(Error::from))
)
} }
} }