diff --git a/src/tools/zip.rs b/src/tools/zip.rs index afacfbb9..37483d4b 100644 --- a/src/tools/zip.rs +++ b/src/tools/zip.rs @@ -10,15 +10,19 @@ use std::io; use std::mem::size_of; use std::os::unix::ffi::OsStrExt; use std::path::{Component, Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context, Poll}; -use anyhow::{Error, Result}; +use anyhow::{format_err, Error, Result}; use endian_trait::Endian; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::ready; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use crc32fast::Hasher; -use proxmox::tools::byte_buffer::ByteBuffer; use proxmox::tools::time::gmtime; +use crate::tools::compression::{DeflateEncoder, Level}; + const LOCAL_FH_SIG: u32 = 0x04034B50; const LOCAL_FF_SIG: u32 = 0x08074B50; const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50; @@ -245,7 +249,7 @@ impl ZipEntry { signature: LOCAL_FH_SIG, version_needed: 0x2d, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: 0, @@ -328,7 +332,7 @@ impl ZipEntry { version_made_by: VERSION_MADE_BY, version_needed: VERSION_NEEDED, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: self.crc32, @@ -366,6 +370,47 @@ impl ZipEntry { } } +// wraps an asyncreader and calculates the hash +struct HashWrapper { + inner: R, + hasher: Hasher, +} + +impl HashWrapper { + fn new(inner: R) -> Self { + Self { + inner, + hasher: Hasher::new(), + } + } + + // consumes self and returns the hash and the reader + fn finish(self) -> (u32, R) { + let crc32 = self.hasher.finalize(); + (crc32, self.inner) + } +} + +impl AsyncRead for HashWrapper +where + R: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + let old_len = buf.filled().len(); + ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + let new_len = buf.filled().len(); + if new_len > old_len { + this.hasher.update(&buf.filled()[old_len..new_len]); + } + Poll::Ready(Ok(())) + } +} + /// Wraps a writer that implements AsyncWrite for creating a ZIP archive /// /// This will create a ZIP archive on the fly with files added with @@ -400,8 +445,7 @@ where { byte_count: usize, files: Vec, - target: W, - buf: ByteBuffer, + target: Option, } impl ZipEncoder { @@ -409,8 +453,7 @@ impl ZipEncoder { Self { byte_count: 0, files: Vec::new(), - target, - buf: ByteBuffer::with_capacity(1024 * 1024), + target: Some(target), } } @@ -419,31 +462,31 @@ impl ZipEncoder { mut entry: ZipEntry, content: Option, ) -> Result<(), Error> { + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during add entry"))?; entry.offset = self.byte_count.try_into()?; - self.byte_count += entry.write_local_header(&mut self.target).await?; - if let Some(mut content) = content { - let mut hasher = Hasher::new(); - let mut size = 0; - loop { - let count = self.buf.read_from_async(&mut content).await?; + self.byte_count += entry.write_local_header(&mut target).await?; + if let Some(content) = content { + let mut reader = HashWrapper::new(content); + let mut enc = DeflateEncoder::with_quality(target, Level::Fastest); - // end of file - if count == 0 { - break; - } + enc.compress(&mut reader).await?; + let total_in = enc.total_in(); + let total_out = enc.total_out(); + target = enc.into_inner(); - size += count; - hasher.update(&self.buf); - self.target.write_all(&self.buf).await?; - self.buf.consume(count); - } + let (crc32, _reader) = reader.finish(); - self.byte_count += size; - entry.compressed_size = size.try_into()?; - entry.uncompressed_size = size.try_into()?; - entry.crc32 = hasher.finalize(); + self.byte_count += total_out as usize; + entry.compressed_size = total_out; + entry.uncompressed_size = total_in; + + entry.crc32 = crc32; } - self.byte_count += entry.write_data_descriptor(&mut self.target).await?; + self.byte_count += entry.write_data_descriptor(&mut target).await?; + self.target = Some(target); self.files.push(entry); @@ -456,6 +499,10 @@ impl ZipEncoder { central_dir_offset: usize, ) -> Result<(), Error> { let entrycount = self.files.len(); + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during write_eocd"))?; let mut count = entrycount as u16; let mut directory_size = central_dir_size as u32; @@ -470,7 +517,7 @@ impl ZipEncoder { directory_offset = 0xFFFFFFFF; write_struct( - &mut self.target, + &mut target, Zip64EOCDRecord { signature: ZIP64_EOCD_RECORD, field_size: 44, @@ -489,7 +536,7 @@ impl ZipEncoder { let locator_offset = central_dir_offset + central_dir_size; write_struct( - &mut self.target, + &mut target, Zip64EOCDLocator { signature: ZIP64_EOCD_LOCATOR, disk_number: 0, @@ -501,7 +548,7 @@ impl ZipEncoder { } write_struct( - &mut self.target, + &mut target, EndOfCentralDir { signature: END_OF_CENTRAL_DIR, disk_number: 0, @@ -515,23 +562,32 @@ impl ZipEncoder { ) .await?; + self.target = Some(target); + Ok(()) } pub async fn finish(&mut self) -> Result<(), Error> { + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during finish"))?; let central_dir_offset = self.byte_count; let mut central_dir_size = 0; for file in &self.files { - central_dir_size += file - .write_central_directory_header(&mut self.target) - .await?; + central_dir_size += file.write_central_directory_header(&mut target).await?; } + self.target = Some(target); self.write_eocd(central_dir_size, central_dir_offset) .await?; - self.target.flush().await?; + self.target + .take() + .ok_or_else(|| format_err!("had no target for flush"))? + .flush() + .await?; Ok(()) }