implement AsyncSeek for AsyncIndexReader

Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
Stefan Reiter 2020-07-22 15:56:22 +02:00 committed by Thomas Lamprecht
parent d0463b67ca
commit ec5f9d3525
2 changed files with 97 additions and 20 deletions

View File

@ -1,30 +1,35 @@
use std::future::Future; use std::future::Future;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::pin::Pin; use std::pin::Pin;
use std::io::SeekFrom;
use anyhow::Error; use anyhow::Error;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::ready; use futures::ready;
use tokio::io::AsyncRead; use tokio::io::{AsyncRead, AsyncSeek};
use proxmox::sys::error::io_err_other; use proxmox::sys::error::io_err_other;
use proxmox::io_format_err; use proxmox::io_format_err;
use super::IndexFile; use super::IndexFile;
use super::read_chunk::AsyncReadChunk; use super::read_chunk::AsyncReadChunk;
use super::index::ChunkReadInfo;
enum AsyncIndexReaderState<S> { enum AsyncIndexReaderState<S> {
NoData, NoData,
WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>), WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
HaveData(usize), HaveData,
} }
pub struct AsyncIndexReader<S, I: IndexFile> { pub struct AsyncIndexReader<S, I: IndexFile> {
store: Option<S>, store: Option<S>,
index: I, index: I,
read_buffer: Vec<u8>, read_buffer: Vec<u8>,
current_chunk_offset: u64,
current_chunk_idx: usize, current_chunk_idx: usize,
current_chunk_digest: [u8; 32], current_chunk_info: Option<ChunkReadInfo>,
position: u64,
seek_to_pos: i64,
state: AsyncIndexReaderState<S>, state: AsyncIndexReaderState<S>,
} }
@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
store: Some(store), store: Some(store),
index, index,
read_buffer: Vec::with_capacity(1024 * 1024), read_buffer: Vec::with_capacity(1024 * 1024),
current_chunk_offset: 0,
current_chunk_idx: 0, current_chunk_idx: 0,
current_chunk_digest: [0u8; 32], current_chunk_info: None,
position: 0,
seek_to_pos: 0,
state: AsyncIndexReaderState::NoData, state: AsyncIndexReaderState::NoData,
} }
} }
@ -58,23 +66,41 @@ where
loop { loop {
match &mut this.state { match &mut this.state {
AsyncIndexReaderState::NoData => { AsyncIndexReaderState::NoData => {
if this.current_chunk_idx >= this.index.index_count() { let (idx, offset) = if this.current_chunk_info.is_some() &&
this.position == this.current_chunk_info.as_ref().unwrap().range.end
{
// optimization for sequential chunk read
let next_idx = this.current_chunk_idx + 1;
(next_idx, 0)
} else {
match this.index.chunk_from_offset(this.position) {
Some(res) => res,
None => return Poll::Ready(Ok(0))
}
};
if idx >= this.index.index_count() {
return Poll::Ready(Ok(0)); return Poll::Ready(Ok(0));
} }
let digest = this let info = this
.index .index
.index_digest(this.current_chunk_idx) .chunk_info(idx)
.ok_or(io_format_err!("could not get digest"))? .ok_or(io_format_err!("could not get digest"))?;
.clone();
if digest == this.current_chunk_digest { this.current_chunk_offset = offset;
this.state = AsyncIndexReaderState::HaveData(0); this.current_chunk_idx = idx;
let old_info = this.current_chunk_info.replace(info.clone());
if let Some(old_info) = old_info {
if old_info.digest == info.digest {
// hit, chunk is currently in cache
this.state = AsyncIndexReaderState::HaveData;
continue; continue;
} }
}
this.current_chunk_digest = digest; // miss, need to download new chunk
let store = match this.store.take() { let store = match this.store.take() {
Some(store) => store, Some(store) => store,
None => { None => {
@ -83,7 +109,7 @@ where
}; };
let future = async move { let future = async move {
store.read_chunk(&digest) store.read_chunk(&info.digest)
.await .await
.map(move |x| (store, x)) .map(move |x| (store, x))
}; };
@ -95,7 +121,7 @@ where
Ok((store, mut chunk_data)) => { Ok((store, mut chunk_data)) => {
this.read_buffer.clear(); this.read_buffer.clear();
this.read_buffer.append(&mut chunk_data); this.read_buffer.append(&mut chunk_data);
this.state = AsyncIndexReaderState::HaveData(0); this.state = AsyncIndexReaderState::HaveData;
this.store = Some(store); this.store = Some(store);
} }
Err(err) => { Err(err) => {
@ -103,8 +129,8 @@ where
} }
}; };
} }
AsyncIndexReaderState::HaveData(offset) => { AsyncIndexReaderState::HaveData => {
let offset = *offset; let offset = this.current_chunk_offset as usize;
let len = this.read_buffer.len(); let len = this.read_buffer.len();
let n = if len - offset < buf.len() { let n = if len - offset < buf.len() {
len - offset len - offset
@ -113,11 +139,13 @@ where
}; };
buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]); buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
this.position += n as u64;
if offset + n == len { if offset + n == len {
this.state = AsyncIndexReaderState::NoData; this.state = AsyncIndexReaderState::NoData;
this.current_chunk_idx += 1;
} else { } else {
this.state = AsyncIndexReaderState::HaveData(offset + n); this.current_chunk_offset += n as u64;
this.state = AsyncIndexReaderState::HaveData;
} }
return Poll::Ready(Ok(n)); return Poll::Ready(Ok(n));
@ -126,3 +154,51 @@ where
} }
} }
} }
impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
where
S: AsyncReadChunk + Unpin + Sync + 'static,
I: IndexFile + Unpin,
{
fn start_seek(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<tokio::io::Result<()>> {
let this = Pin::get_mut(self);
this.seek_to_pos = match pos {
SeekFrom::Start(offset) => {
offset as i64
},
SeekFrom::End(offset) => {
this.index.index_bytes() as i64 + offset
},
SeekFrom::Current(offset) => {
this.position as i64 + offset
}
};
Poll::Ready(Ok(()))
}
fn poll_complete(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<tokio::io::Result<u64>> {
let this = Pin::get_mut(self);
let index_bytes = this.index.index_bytes();
if this.seek_to_pos < 0 {
return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
} else if this.seek_to_pos > index_bytes as i64 {
this.position = index_bytes;
} else {
this.position = this.seek_to_pos as u64;
}
// even if seeking within one chunk, we need to go to NoData to
// recalculate the current_chunk_offset (data is cached anyway)
this.state = AsyncIndexReaderState::NoData;
Poll::Ready(Ok(this.position))
}
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::ops::Range; use std::ops::Range;
#[derive(Clone)]
pub struct ChunkReadInfo { pub struct ChunkReadInfo {
pub range: Range<u64>, pub range: Range<u64>,
pub digest: [u8; 32], pub digest: [u8; 32],