src/backup/data_blob.rs: impl. simple reader/writer
To avoid loading blob into memory.
This commit is contained in:
		@ -289,3 +289,95 @@ impl DataBlob {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO: impl. other blob types
 | 
			
		||||
 | 
			
		||||
use std::io::{Read, BufRead, Write, Seek, SeekFrom};
 | 
			
		||||
 | 
			
		||||
/// Write compressed data blobs
 | 
			
		||||
pub struct CompressedDataBlobWriter<W: Write> {
 | 
			
		||||
    compr: Option<zstd::stream::write::Encoder<W>>,
 | 
			
		||||
    hasher: crc32fast::Hasher,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <W: Write + Seek> CompressedDataBlobWriter<W> {
 | 
			
		||||
 | 
			
		||||
    pub fn new(mut out: W) -> Result<Self, Error> {
 | 
			
		||||
        out.seek(SeekFrom::Start(0))?;
 | 
			
		||||
        let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4] };
 | 
			
		||||
        unsafe {
 | 
			
		||||
            out.write_le_value(head)?;
 | 
			
		||||
        }
 | 
			
		||||
        let compr = zstd::stream::write::Encoder::new(out, 1)?;
 | 
			
		||||
        Ok(Self { compr: Some(compr), hasher: crc32fast::Hasher::new() })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn finish(mut self) -> Result<W, Error> {
 | 
			
		||||
        let compr = self.compr.take().expect("blob writer already finished");
 | 
			
		||||
        let mut out = compr.finish()?;
 | 
			
		||||
 | 
			
		||||
        // write CRC
 | 
			
		||||
        let crc = self.hasher.finalize();
 | 
			
		||||
        let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: crc.to_le_bytes() };
 | 
			
		||||
 | 
			
		||||
        out.seek(SeekFrom::Start(0))?;
 | 
			
		||||
        unsafe {
 | 
			
		||||
            out.write_le_value(head)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <W: Write + Seek> Write for CompressedDataBlobWriter<W> {
 | 
			
		||||
 | 
			
		||||
    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
 | 
			
		||||
        let compr = self.compr.as_mut().expect("blob writer already finished");
 | 
			
		||||
        self.hasher.update(buf);
 | 
			
		||||
        compr.write(buf)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn flush(&mut self) -> Result<(), std::io::Error> {
 | 
			
		||||
        let compr = self.compr.as_mut().expect("blob writer already finished");
 | 
			
		||||
        compr.flush()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Read compressed data blobs
 | 
			
		||||
pub struct CompressedDataBlobReader<R: BufRead> {
 | 
			
		||||
    decompr: zstd::stream::read::Decoder<R>,
 | 
			
		||||
    hasher: Option<crc32fast::Hasher>,
 | 
			
		||||
    expected_crc: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <R: BufRead> CompressedDataBlobReader<R> {
 | 
			
		||||
 | 
			
		||||
    pub fn new(mut reader: R) -> Result<Self, Error> {
 | 
			
		||||
 | 
			
		||||
        let head: DataBlobHeader = unsafe { reader.read_le_value()? };
 | 
			
		||||
        if head.magic != COMPRESSED_BLOB_MAGIC_1_0 {
 | 
			
		||||
            bail!("got wrong magic number");
 | 
			
		||||
        }
 | 
			
		||||
        let expected_crc = u32::from_le_bytes(head.crc);
 | 
			
		||||
        let decompr = zstd::stream::read::Decoder::with_buffer(reader)?;
 | 
			
		||||
        Ok(Self { decompr: decompr, hasher: Some(crc32fast::Hasher::new()), expected_crc })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <R: BufRead> Read for CompressedDataBlobReader<R> {
 | 
			
		||||
 | 
			
		||||
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 | 
			
		||||
        let count = self.decompr.read(buf)?;
 | 
			
		||||
        if count == 0 { // EOF, verify crc
 | 
			
		||||
            let hasher = self.hasher.take().expect("blob reader already finished");
 | 
			
		||||
            let crc = hasher.finalize();
 | 
			
		||||
            if crc != self.expected_crc {
 | 
			
		||||
                return Err(std::io::Error::new(std::io::ErrorKind::Other, "blob reader crc error"));
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            let hasher = self.hasher.as_mut().expect("blob reader already finished");
 | 
			
		||||
            hasher.update(buf);
 | 
			
		||||
        }
 | 
			
		||||
        Ok(count)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user