diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs index ff476e37..c9ca4773 100644 --- a/src/backup/cached_chunk_reader.rs +++ b/src/backup/cached_chunk_reader.rs @@ -1,12 +1,19 @@ //! An async and concurrency safe data reader backed by a local LRU cache. use anyhow::Error; +use futures::future::Future; +use futures::ready; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -use std::future::Future; +use std::io::SeekFrom; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::backup::{AsyncReadChunk, IndexFile}; +use super::{AsyncReadChunk, IndexFile}; use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; +use proxmox::io_format_err; +use proxmox::sys::error::io_err_other; struct AsyncChunkCacher { reader: Arc, @@ -87,3 +94,96 @@ impl CachedChunkReader< Ok(read) } } + +impl + CachedChunkReader +{ + /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and + /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred + /// otherwise. + pub fn seekable(self) -> SeekableCachedChunkReader { + SeekableCachedChunkReader { + index_bytes: self.index.index_bytes(), + reader: Arc::new(self), + position: 0, + read_future: None, + } + } +} + +pub struct SeekableCachedChunkReader< + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +> { + reader: Arc>, + index_bytes: u64, + position: u64, + read_future: Option, usize), Error>> + Send>>>, +} + +impl AsyncSeek for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { + let this = Pin::get_mut(self); + let seek_to_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => this.index_bytes as i64 + offset, + SeekFrom::Current(offset) => this.position as i64 + offset, + }; + if seek_to_pos < 0 { + return Err(io_format_err!("cannot seek to negative values")); + } else if seek_to_pos > this.index_bytes as i64 { + this.position = this.index_bytes; + } else { + this.position = seek_to_pos as u64; + } + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(self.position)) + } +} + +impl AsyncRead for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf, + ) -> Poll> { + let this = Pin::get_mut(self); + + let offset = this.position; + let wanted = buf.capacity(); + let reader = Arc::clone(&this.reader); + + let fut = this.read_future.get_or_insert_with(|| { + Box::pin(async move { + let mut read_buf = vec![0u8; wanted]; + let read = reader.read_at(&mut read_buf[..wanted], offset).await?; + Ok((read_buf, read)) + }) + }); + + let ret = match ready!(fut.as_mut().poll(cx)) { + Ok((read_buf, read)) => { + buf.put_slice(&read_buf[..read]); + this.position += read as u64; + Ok(()) + } + Err(err) => Err(io_err_other(err)), + }; + + // future completed, drop + this.read_future = None; + + Poll::Ready(ret) + } +}