diff --git a/src/backup.rs b/src/backup.rs index 5e1147b4..7bf29a5a 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -257,8 +257,5 @@ pub use verify::*; mod catalog_shell; pub use catalog_shell::*; -mod async_index_reader; -pub use async_index_reader::*; - mod cached_chunk_reader; pub use cached_chunk_reader::*; diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs deleted file mode 100644 index 20a37e7e..00000000 --- a/src/backup/async_index_reader.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::future::Future; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::io::SeekFrom; - -use anyhow::Error; -use futures::future::FutureExt; -use futures::ready; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; - -use proxmox::sys::error::io_err_other; -use proxmox::io_format_err; - -use super::IndexFile; -use super::read_chunk::AsyncReadChunk; -use super::index::ChunkReadInfo; - -type ReadFuture = dyn Future), Error>> + Send + 'static; - -// FIXME: This enum may not be required? -// - Put the `WaitForData` case directly into a `read_future: Option<>` -// - make the read loop as follows: -// * if read_buffer is not empty: -// use it -// * else if read_future is there: -// poll it -// if read: move data to read_buffer -// * else -// create read future -#[allow(clippy::enum_variant_names)] -enum AsyncIndexReaderState { - NoData, - WaitForData(Pin>>), - HaveData, -} - -pub struct AsyncIndexReader { - store: Option, - index: I, - read_buffer: Vec, - current_chunk_offset: u64, - current_chunk_idx: usize, - current_chunk_info: Option, - position: u64, - seek_to_pos: i64, - state: AsyncIndexReaderState, -} - -// ok because the only public interfaces operates on &mut Self -unsafe impl Sync for AsyncIndexReader {} - -impl AsyncIndexReader { - pub fn new(index: I, store: S) -> Self { - Self { - store: Some(store), - index, - read_buffer: Vec::with_capacity(1024 * 1024), - current_chunk_offset: 0, - current_chunk_idx: 0, - current_chunk_info: None, - position: 0, - seek_to_pos: 0, - state: AsyncIndexReaderState::NoData, - } - } -} - -impl AsyncRead for AsyncIndexReader -where - S: AsyncReadChunk + Unpin + Sync + 'static, - I: IndexFile + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let this = Pin::get_mut(self); - loop { - match &mut this.state { - AsyncIndexReaderState::NoData => { - let (idx, offset) = if this.current_chunk_info.is_some() && - this.position == this.current_chunk_info.as_ref().unwrap().range.end - { - // optimization for sequential chunk read - let next_idx = this.current_chunk_idx + 1; - (next_idx, 0) - } else { - match this.index.chunk_from_offset(this.position) { - Some(res) => res, - None => return Poll::Ready(Ok(())) - } - }; - - if idx >= this.index.index_count() { - return Poll::Ready(Ok(())); - } - - let info = this - .index - .chunk_info(idx) - .ok_or_else(|| io_format_err!("could not get digest"))?; - - this.current_chunk_offset = offset; - this.current_chunk_idx = idx; - let old_info = this.current_chunk_info.replace(info.clone()); - - if let Some(old_info) = old_info { - if old_info.digest == info.digest { - // hit, chunk is currently in cache - this.state = AsyncIndexReaderState::HaveData; - continue; - } - } - - // miss, need to download new chunk - let store = match this.store.take() { - Some(store) => store, - None => { - return Poll::Ready(Err(io_format_err!("could not find store"))); - } - }; - - let future = async move { - store.read_chunk(&info.digest) - .await - .map(move |x| (store, x)) - }; - - this.state = AsyncIndexReaderState::WaitForData(future.boxed()); - } - AsyncIndexReaderState::WaitForData(ref mut future) => { - match ready!(future.as_mut().poll(cx)) { - Ok((store, chunk_data)) => { - this.read_buffer = chunk_data; - this.state = AsyncIndexReaderState::HaveData; - this.store = Some(store); - } - Err(err) => { - return Poll::Ready(Err(io_err_other(err))); - } - }; - } - AsyncIndexReaderState::HaveData => { - let offset = this.current_chunk_offset as usize; - let len = this.read_buffer.len(); - let n = if len - offset < buf.remaining() { - len - offset - } else { - buf.remaining() - }; - - buf.put_slice(&this.read_buffer[offset..(offset + n)]); - this.position += n as u64; - - if offset + n == len { - this.state = AsyncIndexReaderState::NoData; - } else { - this.current_chunk_offset += n as u64; - this.state = AsyncIndexReaderState::HaveData; - } - - return Poll::Ready(Ok(())); - } - } - } - } -} - -impl AsyncSeek for AsyncIndexReader -where - S: AsyncReadChunk + Unpin + Sync + 'static, - I: IndexFile + Unpin, -{ - fn start_seek( - self: Pin<&mut Self>, - pos: SeekFrom, - ) -> tokio::io::Result<()> { - let this = Pin::get_mut(self); - this.seek_to_pos = match pos { - SeekFrom::Start(offset) => { - offset as i64 - }, - SeekFrom::End(offset) => { - this.index.index_bytes() as i64 + offset - }, - SeekFrom::Current(offset) => { - this.position as i64 + offset - } - }; - Ok(()) - } - - fn poll_complete( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - let this = Pin::get_mut(self); - - let index_bytes = this.index.index_bytes(); - if this.seek_to_pos < 0 { - return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); - } else if this.seek_to_pos > index_bytes as i64 { - this.position = index_bytes; - } else { - this.position = this.seek_to_pos as u64; - } - - // even if seeking within one chunk, we need to go to NoData to - // recalculate the current_chunk_offset (data is cached anyway) - this.state = AsyncIndexReaderState::NoData; - - Poll::Ready(Ok(this.position)) - } -}