From 1f26fdef191061bc28ae00010ebbb5d70b1c80d9 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sun, 11 Aug 2019 11:32:36 +0200 Subject: [PATCH] src/backup/data_blob.rs: impl. simple reader/writer To avoid loading blob into memory. --- src/backup/data_blob.rs | 92 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/src/backup/data_blob.rs b/src/backup/data_blob.rs index 35bf88bd..f2599302 100644 --- a/src/backup/data_blob.rs +++ b/src/backup/data_blob.rs @@ -289,3 +289,95 @@ impl DataBlob { } } + +// TODO: impl. other blob types + +use std::io::{Read, BufRead, Write, Seek, SeekFrom}; + +/// Write compressed data blobs +pub struct CompressedDataBlobWriter { + compr: Option>, + hasher: crc32fast::Hasher, +} + +impl CompressedDataBlobWriter { + + pub fn new(mut out: W) -> Result { + out.seek(SeekFrom::Start(0))?; + let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4] }; + unsafe { + out.write_le_value(head)?; + } + let compr = zstd::stream::write::Encoder::new(out, 1)?; + Ok(Self { compr: Some(compr), hasher: crc32fast::Hasher::new() }) + } + + pub fn finish(mut self) -> Result { + let compr = self.compr.take().expect("blob writer already finished"); + let mut out = compr.finish()?; + + // write CRC + let crc = self.hasher.finalize(); + let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: crc.to_le_bytes() }; + + out.seek(SeekFrom::Start(0))?; + unsafe { + out.write_le_value(head)?; + } + + Ok(out) + } +} + +impl Write for CompressedDataBlobWriter { + + fn write(&mut self, buf: &[u8]) -> Result { + let compr = self.compr.as_mut().expect("blob writer already finished"); + self.hasher.update(buf); + compr.write(buf) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + let compr = self.compr.as_mut().expect("blob writer already finished"); + compr.flush() + } +} + +/// Read compressed data blobs +pub struct CompressedDataBlobReader { + decompr: zstd::stream::read::Decoder, + hasher: Option, + expected_crc: u32, +} + +impl CompressedDataBlobReader { + + pub fn new(mut reader: R) -> Result { + + let head: DataBlobHeader = unsafe { reader.read_le_value()? }; + if head.magic != COMPRESSED_BLOB_MAGIC_1_0 { + bail!("got wrong magic number"); + } + let expected_crc = u32::from_le_bytes(head.crc); + let decompr = zstd::stream::read::Decoder::with_buffer(reader)?; + Ok(Self { decompr: decompr, hasher: Some(crc32fast::Hasher::new()), expected_crc }) + } +} + +impl Read for CompressedDataBlobReader { + + fn read(&mut self, buf: &mut [u8]) -> Result { + let count = self.decompr.read(buf)?; + if count == 0 { // EOF, verify crc + let hasher = self.hasher.take().expect("blob reader already finished"); + let crc = hasher.finalize(); + if crc != self.expected_crc { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "blob reader crc error")); + } + } else { + let hasher = self.hasher.as_mut().expect("blob reader already finished"); + hasher.update(buf); + } + Ok(count) + } +}