src/backup/chunk_stream.rs: switch to async
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		| @ -1,70 +1,72 @@ | |||||||
|  | use std::pin::Pin; | ||||||
|  | use std::task::{Context, Poll}; | ||||||
|  |  | ||||||
| use bytes::BytesMut; | use bytes::BytesMut; | ||||||
| use failure::*; | use failure::*; | ||||||
| use futures::stream::Stream; | use futures::ready; | ||||||
| use futures::{Async, Poll}; | use futures::stream::{Stream, TryStream}; | ||||||
|  |  | ||||||
| use super::Chunker; | use super::Chunker; | ||||||
|  |  | ||||||
| /// Split input stream into dynamic sized chunks | /// Split input stream into dynamic sized chunks | ||||||
| pub struct ChunkStream<S> { | pub struct ChunkStream<S: Unpin> { | ||||||
|     input: S, |     input: S, | ||||||
|     chunker: Chunker, |     chunker: Chunker, | ||||||
|     buffer: BytesMut, |     buffer: BytesMut, | ||||||
|     scan_pos: usize, |     scan_pos: usize, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl <S> ChunkStream<S> { | impl<S: Unpin> ChunkStream<S> { | ||||||
|     pub fn new(input: S, chunk_size: Option<usize>) -> Self { |     pub fn new(input: S, chunk_size: Option<usize>) -> Self { | ||||||
|         Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} |         Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl <S> Stream for ChunkStream<S> | impl<S: Unpin> Unpin for ChunkStream<S> {} | ||||||
|     where S: Stream, |  | ||||||
|           S::Item: AsRef<[u8]>, | impl<S: Unpin> Stream for ChunkStream<S> | ||||||
|  | where | ||||||
|  |     S: TryStream, | ||||||
|  |     S::Ok: AsRef<[u8]>, | ||||||
|     S::Error: Into<Error>, |     S::Error: Into<Error>, | ||||||
| { | { | ||||||
|  |  | ||||||
|     type Item = BytesMut; |     type Item = Result<BytesMut, Error>; | ||||||
|     type Error = Error; |  | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Option<BytesMut>, Error> { |     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||||||
|  |         let this = self.get_mut(); | ||||||
|         loop { |         loop { | ||||||
|  |             if this.scan_pos < this.buffer.len() { | ||||||
|  |                 let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); | ||||||
|  |  | ||||||
|             if self.scan_pos < self.buffer.len() { |                 let chunk_size = this.scan_pos + boundary; | ||||||
|                 let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]); |  | ||||||
|  |  | ||||||
|                 let chunk_size = self.scan_pos + boundary; |  | ||||||
|  |  | ||||||
|                 if boundary == 0 { |                 if boundary == 0 { | ||||||
|                     self.scan_pos = self.buffer.len(); |                     this.scan_pos = this.buffer.len(); | ||||||
|                     // continue poll |                     // continue poll | ||||||
|                 } else if chunk_size <= self.buffer.len() { |                 } else if chunk_size <= this.buffer.len() { | ||||||
|                     let result = self.buffer.split_to(chunk_size); |                     let result = this.buffer.split_to(chunk_size); | ||||||
|                     self.scan_pos = 0; |                     this.scan_pos = 0; | ||||||
|                     return Ok(Async::Ready(Some(result))); |                     return Poll::Ready(Some(Ok(result))); | ||||||
|                 } else { |                 } else { | ||||||
|                     panic!("got unexpected chunk boundary from chunker"); |                     panic!("got unexpected chunk boundary from chunker"); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             match self.input.poll() { |             match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { | ||||||
|                 Err(err) => { |                 Some(Err(err)) => { | ||||||
|                     return Err(err.into()); |                     return Poll::Ready(Some(Err(err.into()))); | ||||||
|                 } |                 } | ||||||
|                 Ok(Async::NotReady) => { |                 None => { | ||||||
|                     return Ok(Async::NotReady); |                     this.scan_pos = 0; | ||||||
|                 } |                     if this.buffer.len() > 0 { | ||||||
|                 Ok(Async::Ready(None)) => { |                         return Poll::Ready(Some(Ok(this.buffer.take()))); | ||||||
|                     self.scan_pos = 0; |  | ||||||
|                     if self.buffer.len() > 0 { |  | ||||||
|                         return Ok(Async::Ready(Some(self.buffer.take()))); |  | ||||||
|                     } else { |                     } else { | ||||||
|                         return Ok(Async::Ready(None)); |                         return Poll::Ready(None); | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|                 Ok(Async::Ready(Some(data))) => { |                 Some(Ok(data)) => { | ||||||
|                     self.buffer.extend_from_slice(data.as_ref()); |                     this.buffer.extend_from_slice(data.as_ref()); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @ -72,54 +74,51 @@ impl <S> Stream for ChunkStream<S> | |||||||
| } | } | ||||||
|  |  | ||||||
| /// Split input stream into fixed sized chunks | /// Split input stream into fixed sized chunks | ||||||
| pub struct FixedChunkStream<S> { | pub struct FixedChunkStream<S: Unpin> { | ||||||
|     input: S, |     input: S, | ||||||
|     chunk_size: usize, |     chunk_size: usize, | ||||||
|     buffer: BytesMut, |     buffer: BytesMut, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl <S> FixedChunkStream<S> { | impl<S: Unpin> FixedChunkStream<S> { | ||||||
|  |  | ||||||
|     pub fn new(input: S, chunk_size: usize) -> Self { |     pub fn new(input: S, chunk_size: usize) -> Self { | ||||||
|         Self { input, chunk_size, buffer: BytesMut::new() } |         Self { input, chunk_size, buffer: BytesMut::new() } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl <S> Stream for FixedChunkStream<S> | impl<S: Unpin> Unpin for FixedChunkStream<S> {} | ||||||
|     where S: Stream, |  | ||||||
|           S::Item: AsRef<[u8]>, | impl<S: Unpin> Stream for FixedChunkStream<S> | ||||||
|  | where | ||||||
|  |     S: TryStream, | ||||||
|  |     S::Ok: AsRef<[u8]>, | ||||||
| { | { | ||||||
|  |     type Item = Result<BytesMut, S::Error>; | ||||||
|  |  | ||||||
|     type Item = BytesMut; |     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> { | ||||||
|     type Error = S::Error; |         let this = self.get_mut(); | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> { |  | ||||||
|         loop { |         loop { | ||||||
|  |             if this.buffer.len() == this.chunk_size { | ||||||
|             if self.buffer.len() == self.chunk_size { |                 return Poll::Ready(Some(Ok(this.buffer.take()))); | ||||||
|                 return Ok(Async::Ready(Some(self.buffer.take()))); |             } else if this.buffer.len() > this.chunk_size { | ||||||
|             } else if self.buffer.len() > self.chunk_size { |                 let result = this.buffer.split_to(this.chunk_size); | ||||||
|                 let result = self.buffer.split_to(self.chunk_size); |                 return Poll::Ready(Some(Ok(result))); | ||||||
|                 return Ok(Async::Ready(Some(result))); |  | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             match self.input.poll() { |             match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { | ||||||
|                 Err(err) => { |                 Some(Err(err)) => { | ||||||
|                     return Err(err); |                     return Poll::Ready(Some(Err(err))); | ||||||
|                 } |                 } | ||||||
|                 Ok(Async::NotReady) => { |                 None => { | ||||||
|                     return Ok(Async::NotReady); |  | ||||||
|                 } |  | ||||||
|                 Ok(Async::Ready(None)) => { |  | ||||||
|                     // last chunk can have any size |                     // last chunk can have any size | ||||||
|                     if self.buffer.len() > 0 { |                     if this.buffer.len() > 0 { | ||||||
|                         return Ok(Async::Ready(Some(self.buffer.take()))); |                         return Poll::Ready(Some(Ok(this.buffer.take()))); | ||||||
|                     } else { |                     } else { | ||||||
|                         return Ok(Async::Ready(None)); |                         return Poll::Ready(None); | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|                 Ok(Async::Ready(Some(data))) => { |                 Some(Ok(data)) => { | ||||||
|                     self.buffer.extend_from_slice(data.as_ref()); |                     this.buffer.extend_from_slice(data.as_ref()); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user