src/backup/data_blob.rs: make DataBlobWriter more generic
Allow to write compressed and uncompressed blobs.
This commit is contained in:
parent
1f26fdef19
commit
a762ce54fd
@ -294,54 +294,99 @@ impl DataBlob {
|
|||||||
|
|
||||||
use std::io::{Read, BufRead, Write, Seek, SeekFrom};
|
use std::io::{Read, BufRead, Write, Seek, SeekFrom};
|
||||||
|
|
||||||
/// Write compressed data blobs
|
enum BlobWriterState<W: Write> {
|
||||||
pub struct CompressedDataBlobWriter<W: Write> {
|
Uncompressed { writer: W, hasher: crc32fast::Hasher },
|
||||||
compr: Option<zstd::stream::write::Encoder<W>>,
|
Compressed { compr: zstd::stream::write::Encoder<W>, hasher: crc32fast::Hasher },
|
||||||
hasher: crc32fast::Hasher,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <W: Write + Seek> CompressedDataBlobWriter<W> {
|
/// Write compressed data blobs
|
||||||
|
pub struct DataBlobWriter<W: Write> {
|
||||||
|
state: BlobWriterState<W>,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new(mut out: W) -> Result<Self, Error> {
|
impl <W: Write + Seek> DataBlobWriter<W> {
|
||||||
out.seek(SeekFrom::Start(0))?;
|
|
||||||
|
pub fn new_uncompressed(mut writer: W) -> Result<Self, Error> {
|
||||||
|
let hasher = crc32fast::Hasher::new();
|
||||||
|
writer.seek(SeekFrom::Start(0))?;
|
||||||
|
let head = DataBlobHeader { magic: UNCOMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4] };
|
||||||
|
unsafe {
|
||||||
|
writer.write_le_value(head)?;
|
||||||
|
}
|
||||||
|
let state = BlobWriterState::Uncompressed { writer, hasher };
|
||||||
|
Ok(Self { state })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_compressed(mut writer: W) -> Result<Self, Error> {
|
||||||
|
let hasher = crc32fast::Hasher::new();
|
||||||
|
writer.seek(SeekFrom::Start(0))?;
|
||||||
let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4] };
|
let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4] };
|
||||||
unsafe {
|
unsafe {
|
||||||
out.write_le_value(head)?;
|
writer.write_le_value(head)?;
|
||||||
}
|
}
|
||||||
let compr = zstd::stream::write::Encoder::new(out, 1)?;
|
let compr = zstd::stream::write::Encoder::new(writer, 1)?;
|
||||||
Ok(Self { compr: Some(compr), hasher: crc32fast::Hasher::new() })
|
let state = BlobWriterState::Compressed { compr, hasher };
|
||||||
|
Ok(Self { state })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish(mut self) -> Result<W, Error> {
|
pub fn finish(self) -> Result<W, Error> {
|
||||||
let compr = self.compr.take().expect("blob writer already finished");
|
match self.state {
|
||||||
let mut out = compr.finish()?;
|
BlobWriterState::Uncompressed { mut writer, hasher } => {
|
||||||
|
|
||||||
// write CRC
|
// write CRC
|
||||||
let crc = self.hasher.finalize();
|
let crc = hasher.finalize();
|
||||||
let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: crc.to_le_bytes() };
|
let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: crc.to_le_bytes() };
|
||||||
|
|
||||||
out.seek(SeekFrom::Start(0))?;
|
writer.seek(SeekFrom::Start(0))?;
|
||||||
unsafe {
|
unsafe {
|
||||||
out.write_le_value(head)?;
|
writer.write_le_value(head)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(out)
|
return Ok(writer)
|
||||||
|
}
|
||||||
|
BlobWriterState::Compressed { compr, hasher } => {
|
||||||
|
let mut writer = compr.finish()?;
|
||||||
|
|
||||||
|
// write CRC
|
||||||
|
let crc = hasher.finalize();
|
||||||
|
let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: crc.to_le_bytes() };
|
||||||
|
|
||||||
|
writer.seek(SeekFrom::Start(0))?;
|
||||||
|
unsafe {
|
||||||
|
writer.write_le_value(head)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(writer)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <W: Write + Seek> Write for CompressedDataBlobWriter<W> {
|
impl <W: Write + Seek> Write for DataBlobWriter<W> {
|
||||||
|
|
||||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||||
let compr = self.compr.as_mut().expect("blob writer already finished");
|
match self.state {
|
||||||
self.hasher.update(buf);
|
BlobWriterState::Uncompressed { ref mut writer, ref mut hasher } => {
|
||||||
|
hasher.update(buf);
|
||||||
|
writer.write(buf)
|
||||||
|
}
|
||||||
|
BlobWriterState::Compressed { ref mut compr, ref mut hasher } => {
|
||||||
|
hasher.update(buf);
|
||||||
compr.write(buf)
|
compr.write(buf)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||||
let compr = self.compr.as_mut().expect("blob writer already finished");
|
match self.state {
|
||||||
|
BlobWriterState::Uncompressed { ref mut writer, .. } => {
|
||||||
|
writer.flush()
|
||||||
|
}
|
||||||
|
BlobWriterState::Compressed { ref mut compr, .. } => {
|
||||||
compr.flush()
|
compr.flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Read compressed data blobs
|
/// Read compressed data blobs
|
||||||
pub struct CompressedDataBlobReader<R: BufRead> {
|
pub struct CompressedDataBlobReader<R: BufRead> {
|
||||||
|
Loading…
Reference in New Issue
Block a user