tools/zip: compress zips with deflate

by using our DeflateEncoder

for this to work, we have to create wrapper reader that generates the crc32
checksum while reading.

also we need to put the target writer in an Option, so that we can take
it out of self and move it into the DeflateEncoder while writing
compressed

we can drop the internal buffer then, since that is managed by the
deflate encoder now

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2021-04-06 11:03:47 +02:00 committed by Thomas Lamprecht
parent e8656da70d
commit d84e4073af

View File

@ -10,15 +10,19 @@ use std::io;
use std::mem::size_of; use std::mem::size_of;
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::path::{Component, Path, PathBuf}; use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{Error, Result}; use anyhow::{format_err, Error, Result};
use endian_trait::Endian; use endian_trait::Endian;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use futures::ready;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use crc32fast::Hasher; use crc32fast::Hasher;
use proxmox::tools::byte_buffer::ByteBuffer;
use proxmox::tools::time::gmtime; use proxmox::tools::time::gmtime;
use crate::tools::compression::{DeflateEncoder, Level};
const LOCAL_FH_SIG: u32 = 0x04034B50; const LOCAL_FH_SIG: u32 = 0x04034B50;
const LOCAL_FF_SIG: u32 = 0x08074B50; const LOCAL_FF_SIG: u32 = 0x08074B50;
const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50; const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50;
@ -245,7 +249,7 @@ impl ZipEntry {
signature: LOCAL_FH_SIG, signature: LOCAL_FH_SIG,
version_needed: 0x2d, version_needed: 0x2d,
flags: 1 << 3, flags: 1 << 3,
compression: 0, compression: 0x8,
time, time,
date, date,
crc32: 0, crc32: 0,
@ -328,7 +332,7 @@ impl ZipEntry {
version_made_by: VERSION_MADE_BY, version_made_by: VERSION_MADE_BY,
version_needed: VERSION_NEEDED, version_needed: VERSION_NEEDED,
flags: 1 << 3, flags: 1 << 3,
compression: 0, compression: 0x8,
time, time,
date, date,
crc32: self.crc32, crc32: self.crc32,
@ -366,6 +370,47 @@ impl ZipEntry {
} }
} }
// wraps an asyncreader and calculates the hash
struct HashWrapper<R> {
inner: R,
hasher: Hasher,
}
impl<R> HashWrapper<R> {
fn new(inner: R) -> Self {
Self {
inner,
hasher: Hasher::new(),
}
}
// consumes self and returns the hash and the reader
fn finish(self) -> (u32, R) {
let crc32 = self.hasher.finalize();
(crc32, self.inner)
}
}
impl<R> AsyncRead for HashWrapper<R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
let this = self.get_mut();
let old_len = buf.filled().len();
ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
let new_len = buf.filled().len();
if new_len > old_len {
this.hasher.update(&buf.filled()[old_len..new_len]);
}
Poll::Ready(Ok(()))
}
}
/// Wraps a writer that implements AsyncWrite for creating a ZIP archive /// Wraps a writer that implements AsyncWrite for creating a ZIP archive
/// ///
/// This will create a ZIP archive on the fly with files added with /// This will create a ZIP archive on the fly with files added with
@ -400,8 +445,7 @@ where
{ {
byte_count: usize, byte_count: usize,
files: Vec<ZipEntry>, files: Vec<ZipEntry>,
target: W, target: Option<W>,
buf: ByteBuffer,
} }
impl<W: AsyncWrite + Unpin> ZipEncoder<W> { impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
@ -409,8 +453,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
Self { Self {
byte_count: 0, byte_count: 0,
files: Vec::new(), files: Vec::new(),
target, target: Some(target),
buf: ByteBuffer::with_capacity(1024 * 1024),
} }
} }
@ -419,31 +462,31 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
mut entry: ZipEntry, mut entry: ZipEntry,
content: Option<R>, content: Option<R>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut target = self
.target
.take()
.ok_or_else(|| format_err!("had no target during add entry"))?;
entry.offset = self.byte_count.try_into()?; entry.offset = self.byte_count.try_into()?;
self.byte_count += entry.write_local_header(&mut self.target).await?; self.byte_count += entry.write_local_header(&mut target).await?;
if let Some(mut content) = content { if let Some(content) = content {
let mut hasher = Hasher::new(); let mut reader = HashWrapper::new(content);
let mut size = 0; let mut enc = DeflateEncoder::with_quality(target, Level::Fastest);
loop {
let count = self.buf.read_from_async(&mut content).await?;
// end of file enc.compress(&mut reader).await?;
if count == 0 { let total_in = enc.total_in();
break; let total_out = enc.total_out();
} target = enc.into_inner();
size += count; let (crc32, _reader) = reader.finish();
hasher.update(&self.buf);
self.target.write_all(&self.buf).await?;
self.buf.consume(count);
}
self.byte_count += size; self.byte_count += total_out as usize;
entry.compressed_size = size.try_into()?; entry.compressed_size = total_out;
entry.uncompressed_size = size.try_into()?; entry.uncompressed_size = total_in;
entry.crc32 = hasher.finalize();
entry.crc32 = crc32;
} }
self.byte_count += entry.write_data_descriptor(&mut self.target).await?; self.byte_count += entry.write_data_descriptor(&mut target).await?;
self.target = Some(target);
self.files.push(entry); self.files.push(entry);
@ -456,6 +499,10 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
central_dir_offset: usize, central_dir_offset: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let entrycount = self.files.len(); let entrycount = self.files.len();
let mut target = self
.target
.take()
.ok_or_else(|| format_err!("had no target during write_eocd"))?;
let mut count = entrycount as u16; let mut count = entrycount as u16;
let mut directory_size = central_dir_size as u32; let mut directory_size = central_dir_size as u32;
@ -470,7 +517,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
directory_offset = 0xFFFFFFFF; directory_offset = 0xFFFFFFFF;
write_struct( write_struct(
&mut self.target, &mut target,
Zip64EOCDRecord { Zip64EOCDRecord {
signature: ZIP64_EOCD_RECORD, signature: ZIP64_EOCD_RECORD,
field_size: 44, field_size: 44,
@ -489,7 +536,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
let locator_offset = central_dir_offset + central_dir_size; let locator_offset = central_dir_offset + central_dir_size;
write_struct( write_struct(
&mut self.target, &mut target,
Zip64EOCDLocator { Zip64EOCDLocator {
signature: ZIP64_EOCD_LOCATOR, signature: ZIP64_EOCD_LOCATOR,
disk_number: 0, disk_number: 0,
@ -501,7 +548,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
} }
write_struct( write_struct(
&mut self.target, &mut target,
EndOfCentralDir { EndOfCentralDir {
signature: END_OF_CENTRAL_DIR, signature: END_OF_CENTRAL_DIR,
disk_number: 0, disk_number: 0,
@ -515,23 +562,32 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
) )
.await?; .await?;
self.target = Some(target);
Ok(()) Ok(())
} }
pub async fn finish(&mut self) -> Result<(), Error> { pub async fn finish(&mut self) -> Result<(), Error> {
let mut target = self
.target
.take()
.ok_or_else(|| format_err!("had no target during finish"))?;
let central_dir_offset = self.byte_count; let central_dir_offset = self.byte_count;
let mut central_dir_size = 0; let mut central_dir_size = 0;
for file in &self.files { for file in &self.files {
central_dir_size += file central_dir_size += file.write_central_directory_header(&mut target).await?;
.write_central_directory_header(&mut self.target)
.await?;
} }
self.target = Some(target);
self.write_eocd(central_dir_size, central_dir_offset) self.write_eocd(central_dir_size, central_dir_offset)
.await?; .await?;
self.target.flush().await?; self.target
.take()
.ok_or_else(|| format_err!("had no target for flush"))?
.flush()
.await?;
Ok(()) Ok(())
} }