backup/archive_index.rs: implement BufferedArchiveReader
Implement relativly fast random read using binary search.
This commit is contained in:
		@ -116,12 +116,26 @@ impl <'a> ArchiveIndexReader<'a> {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn chunk_end(&self, pos: usize) -> u64 {
 | 
			
		||||
        if pos >= self.index_entries {
 | 
			
		||||
            panic!("chunk index out of range");
 | 
			
		||||
        }
 | 
			
		||||
        unsafe { *(self.index.add(pos*40) as *const u64) }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn chunk_digest(&self, pos: usize) -> &[u8] {
 | 
			
		||||
        if pos >= self.index_entries {
 | 
			
		||||
            panic!("chunk index out of range");
 | 
			
		||||
        }
 | 
			
		||||
        unsafe {  std::slice::from_raw_parts(self.index.add(pos*40+8), 32) }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
 | 
			
		||||
 | 
			
		||||
        for pos in 0..self.index_entries {
 | 
			
		||||
            let offset = unsafe { *(self.index.add(pos*40) as *const u64) };
 | 
			
		||||
            let digest = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
 | 
			
		||||
 | 
			
		||||
            let digest = self.chunk_digest(pos);
 | 
			
		||||
            if let Err(err) = self.store.touch_chunk(digest) {
 | 
			
		||||
                bail!("unable to access chunk {}, required by {:?} - {}",
 | 
			
		||||
                      digest_to_hex(digest), self.filename, err);
 | 
			
		||||
@ -135,19 +149,116 @@ impl <'a> ArchiveIndexReader<'a> {
 | 
			
		||||
        let mut buffer = Vec::with_capacity(1024*1024);
 | 
			
		||||
 | 
			
		||||
        for pos in 0..self.index_entries {
 | 
			
		||||
            let offset = unsafe { *(self.index.add(pos*40) as *const u64) };
 | 
			
		||||
            let digest = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
 | 
			
		||||
 | 
			
		||||
            let end = self.chunk_end(pos);
 | 
			
		||||
            let digest = self.chunk_digest(pos);
 | 
			
		||||
            //println!("Dump {:08x}", end );
 | 
			
		||||
            self.store.read_chunk(digest, &mut buffer)?;
 | 
			
		||||
            println!("Dump {:08x} {}", offset, buffer.len(), );
 | 
			
		||||
            writer.write_all(&buffer)?;
 | 
			
		||||
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn binary_search(
 | 
			
		||||
        &self,
 | 
			
		||||
        start_idx: usize,
 | 
			
		||||
        start: u64,
 | 
			
		||||
        end_idx: usize,
 | 
			
		||||
        end: u64,
 | 
			
		||||
        offset: u64
 | 
			
		||||
    ) -> Result<usize, Error> {
 | 
			
		||||
 | 
			
		||||
        if (offset >= end) || (offset < start) {
 | 
			
		||||
            bail!("offset out of range");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if end_idx == start_idx {
 | 
			
		||||
            return Ok(start_idx); // found
 | 
			
		||||
        }
 | 
			
		||||
        let middle_idx = (start_idx + end_idx)/2;
 | 
			
		||||
        let middle_end = self.chunk_end(middle_idx);
 | 
			
		||||
 | 
			
		||||
        if offset < middle_end {
 | 
			
		||||
            return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
 | 
			
		||||
        } else {
 | 
			
		||||
            return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct BufferedArchiveReader<'a> {
 | 
			
		||||
    index: &'a ArchiveIndexReader<'a>,
 | 
			
		||||
    archive_size: u64,
 | 
			
		||||
    read_buffer: Vec<u8>,
 | 
			
		||||
    buffered_chunk_idx: usize,
 | 
			
		||||
    buffered_chunk_start: u64,
 | 
			
		||||
    read_offset: u64,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <'a>  BufferedArchiveReader<'a> {
 | 
			
		||||
 | 
			
		||||
    pub fn new(index: &'a ArchiveIndexReader) -> Self {
 | 
			
		||||
 | 
			
		||||
        let archive_size = index.chunk_end(index.index_entries - 1);
 | 
			
		||||
        Self {
 | 
			
		||||
            index: index,
 | 
			
		||||
            archive_size: archive_size,
 | 
			
		||||
            read_buffer: Vec::with_capacity(1024*1024),
 | 
			
		||||
            buffered_chunk_idx: 0,
 | 
			
		||||
            buffered_chunk_start: 0,
 | 
			
		||||
            read_offset: 0,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn archive_size(&self) -> u64 { self.archive_size }
 | 
			
		||||
 | 
			
		||||
    pub fn read(&mut self, offset: u64) -> Result<&[u8], Error> {
 | 
			
		||||
 | 
			
		||||
        let buffer_len = self.read_buffer.len();
 | 
			
		||||
        let index = self.index;
 | 
			
		||||
 | 
			
		||||
        // optimization for sequential read
 | 
			
		||||
        if buffer_len > 0 &&
 | 
			
		||||
            ((self.buffered_chunk_idx + 1) < index.index_entries) &&
 | 
			
		||||
            (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
 | 
			
		||||
        {
 | 
			
		||||
            let next_idx = self.buffered_chunk_idx + 1;
 | 
			
		||||
            let next_end = index.chunk_end(next_idx);
 | 
			
		||||
            if offset < next_end {
 | 
			
		||||
                self.buffer_chunk(next_idx);
 | 
			
		||||
                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
 | 
			
		||||
                return Ok(&self.read_buffer[buffer_offset..]);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (buffer_len == 0) ||
 | 
			
		||||
            (offset < self.buffered_chunk_start) ||
 | 
			
		||||
            (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
 | 
			
		||||
        {
 | 
			
		||||
            let end_idx = index.index_entries - 1;
 | 
			
		||||
            let end = index.chunk_end(end_idx);
 | 
			
		||||
            let idx = index.binary_search(0, 0, end_idx, end, offset)?;
 | 
			
		||||
            self.buffer_chunk(idx);
 | 
			
		||||
         }
 | 
			
		||||
 | 
			
		||||
        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
 | 
			
		||||
        Ok(&self.read_buffer[buffer_offset..])
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
 | 
			
		||||
 | 
			
		||||
        let index = self.index;
 | 
			
		||||
        let end = index.chunk_end(idx);
 | 
			
		||||
        let digest = index.chunk_digest(idx);
 | 
			
		||||
        index.store.read_chunk(digest, &mut self.read_buffer)?;
 | 
			
		||||
 | 
			
		||||
        self.buffered_chunk_idx = idx;
 | 
			
		||||
        self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
 | 
			
		||||
        //println!("BUFFER {} {}",  self.buffered_chunk_start, end);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct ArchiveIndexWriter<'a> {
 | 
			
		||||
    store: &'a ChunkStore,
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user