api2/admin/datastore: refactor create_zip into pxar/extract
we will reuse that code in the client, so we need to move it to where we can access it from the client Signed-off-by: Dominik Csapak <d.csapak@proxmox.com> [clippy fixes] Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
parent
5279ee745f
commit
2e21948156
@ -3,8 +3,6 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
use std::os::unix::ffi::OsStrExt;
|
use std::os::unix::ffi::OsStrExt;
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::pin::Pin;
|
|
||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use futures::*;
|
use futures::*;
|
||||||
@ -22,7 +20,7 @@ use proxmox::api::schema::*;
|
|||||||
use proxmox::tools::fs::{replace_file, CreateOptions};
|
use proxmox::tools::fs::{replace_file, CreateOptions};
|
||||||
use proxmox::{http_err, identity, list_subdirs_api_method, sortable};
|
use proxmox::{http_err, identity, list_subdirs_api_method, sortable};
|
||||||
|
|
||||||
use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
|
use pxar::accessor::aio::Accessor;
|
||||||
use pxar::EntryKind;
|
use pxar::EntryKind;
|
||||||
|
|
||||||
use crate::api2::types::*;
|
use crate::api2::types::*;
|
||||||
@ -31,11 +29,11 @@ use crate::api2::helpers;
|
|||||||
use crate::backup::*;
|
use crate::backup::*;
|
||||||
use crate::config::datastore;
|
use crate::config::datastore;
|
||||||
use crate::config::cached_user_info::CachedUserInfo;
|
use crate::config::cached_user_info::CachedUserInfo;
|
||||||
|
use crate::pxar::create_zip;
|
||||||
|
|
||||||
use crate::server::{jobstate::Job, WorkerTask};
|
use crate::server::{jobstate::Job, WorkerTask};
|
||||||
use crate::tools::{
|
use crate::tools::{
|
||||||
self,
|
self,
|
||||||
zip::{ZipEncoder, ZipEntry},
|
|
||||||
AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream,
|
AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1337,66 +1335,6 @@ pub fn catalog(
|
|||||||
helpers::list_dir_content(&mut catalog_reader, &path)
|
helpers::list_dir_content(&mut catalog_reader, &path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recurse_files<'a, T, W>(
|
|
||||||
zip: &'a mut ZipEncoder<W>,
|
|
||||||
decoder: &'a mut Accessor<T>,
|
|
||||||
prefix: &'a Path,
|
|
||||||
file: FileEntry<T>,
|
|
||||||
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
|
|
||||||
where
|
|
||||||
T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
|
|
||||||
W: tokio::io::AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
|
||||||
Box::pin(async move {
|
|
||||||
let metadata = file.entry().metadata();
|
|
||||||
let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
|
|
||||||
|
|
||||||
match file.kind() {
|
|
||||||
EntryKind::File { .. } => {
|
|
||||||
let entry = ZipEntry::new(
|
|
||||||
path,
|
|
||||||
metadata.stat.mtime.secs,
|
|
||||||
metadata.stat.mode as u16,
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
zip.add_entry(entry, Some(file.contents().await?))
|
|
||||||
.await
|
|
||||||
.map_err(|err| format_err!("could not send file entry: {}", err))?;
|
|
||||||
}
|
|
||||||
EntryKind::Hardlink(_) => {
|
|
||||||
let realfile = decoder.follow_hardlink(&file).await?;
|
|
||||||
let entry = ZipEntry::new(
|
|
||||||
path,
|
|
||||||
metadata.stat.mtime.secs,
|
|
||||||
metadata.stat.mode as u16,
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
zip.add_entry(entry, Some(realfile.contents().await?))
|
|
||||||
.await
|
|
||||||
.map_err(|err| format_err!("could not send file entry: {}", err))?;
|
|
||||||
}
|
|
||||||
EntryKind::Directory => {
|
|
||||||
let dir = file.enter_directory().await?;
|
|
||||||
let mut readdir = dir.read_dir();
|
|
||||||
let entry = ZipEntry::new(
|
|
||||||
path,
|
|
||||||
metadata.stat.mtime.secs,
|
|
||||||
metadata.stat.mode as u16,
|
|
||||||
false,
|
|
||||||
);
|
|
||||||
zip.add_entry::<FileContents<T>>(entry, None).await?;
|
|
||||||
while let Some(entry) = readdir.next().await {
|
|
||||||
let entry = entry?.decode_entry().await?;
|
|
||||||
recurse_files(zip, decoder, prefix, entry).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {} // ignore all else
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[sortable]
|
#[sortable]
|
||||||
pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
|
pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
|
||||||
&ApiHandler::AsyncHttp(&pxar_file_download),
|
&ApiHandler::AsyncHttp(&pxar_file_download),
|
||||||
@ -1472,9 +1410,10 @@ pub fn pxar_file_download(
|
|||||||
|
|
||||||
let decoder = Accessor::new(reader, archive_size).await?;
|
let decoder = Accessor::new(reader, archive_size).await?;
|
||||||
let root = decoder.open_root().await?;
|
let root = decoder.open_root().await?;
|
||||||
|
let path = OsStr::from_bytes(file_path).to_os_string();
|
||||||
let file = root
|
let file = root
|
||||||
.lookup(OsStr::from_bytes(file_path)).await?
|
.lookup(&path).await?
|
||||||
.ok_or_else(|| format_err!("error opening '{:?}'", file_path))?;
|
.ok_or_else(|| format_err!("error opening '{:?}'", path))?;
|
||||||
|
|
||||||
let body = match file.kind() {
|
let body = match file.kind() {
|
||||||
EntryKind::File { .. } => Body::wrap_stream(
|
EntryKind::File { .. } => Body::wrap_stream(
|
||||||
@ -1488,37 +1427,19 @@ pub fn pxar_file_download(
|
|||||||
.map_err(move |err| {
|
.map_err(move |err| {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"error during streaming of hardlink '{:?}' - {}",
|
"error during streaming of hardlink '{:?}' - {}",
|
||||||
filepath, err
|
path, err
|
||||||
);
|
);
|
||||||
err
|
err
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
EntryKind::Directory => {
|
EntryKind::Directory => {
|
||||||
let (sender, receiver) = tokio::sync::mpsc::channel(100);
|
let (sender, receiver) = tokio::sync::mpsc::channel(100);
|
||||||
let mut prefix = PathBuf::new();
|
|
||||||
let mut components = file.entry().path().components();
|
|
||||||
components.next_back(); // discar last
|
|
||||||
for comp in components {
|
|
||||||
prefix.push(comp);
|
|
||||||
}
|
|
||||||
|
|
||||||
let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
|
let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
|
||||||
|
crate::server::spawn_internal_task(
|
||||||
crate::server::spawn_internal_task(async move {
|
create_zip(channelwriter, decoder, path.clone(), false)
|
||||||
let mut zipencoder = ZipEncoder::new(channelwriter);
|
);
|
||||||
let mut decoder = decoder;
|
|
||||||
recurse_files(&mut zipencoder, &mut decoder, &prefix, file)
|
|
||||||
.await
|
|
||||||
.map_err(|err| eprintln!("error during creating of zip: {}", err))?;
|
|
||||||
|
|
||||||
zipencoder
|
|
||||||
.finish()
|
|
||||||
.await
|
|
||||||
.map_err(|err| eprintln!("error during finishing of zip: {}", err))
|
|
||||||
});
|
|
||||||
|
|
||||||
Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
|
Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
|
||||||
eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
|
eprintln!("error during streaming of zip '{:?}' - {}", path, err);
|
||||||
err
|
err
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -5,9 +5,11 @@ use std::ffi::{CStr, CString, OsStr, OsString};
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::os::unix::ffi::OsStrExt;
|
use std::os::unix::ffi::OsStrExt;
|
||||||
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
||||||
use std::path::Path;
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use futures::future::Future;
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use nix::dir::Dir;
|
use nix::dir::Dir;
|
||||||
use nix::fcntl::OFlag;
|
use nix::fcntl::OFlag;
|
||||||
@ -16,6 +18,7 @@ use nix::sys::stat::Mode;
|
|||||||
use pathpatterns::{MatchEntry, MatchList, MatchType};
|
use pathpatterns::{MatchEntry, MatchList, MatchType};
|
||||||
use pxar::format::Device;
|
use pxar::format::Device;
|
||||||
use pxar::Metadata;
|
use pxar::Metadata;
|
||||||
|
use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
|
||||||
|
|
||||||
use proxmox::c_result;
|
use proxmox::c_result;
|
||||||
use proxmox::tools::fs::{create_path, CreateOptions};
|
use proxmox::tools::fs::{create_path, CreateOptions};
|
||||||
@ -24,6 +27,8 @@ use crate::pxar::dir_stack::PxarDirStack;
|
|||||||
use crate::pxar::metadata;
|
use crate::pxar::metadata;
|
||||||
use crate::pxar::Flags;
|
use crate::pxar::Flags;
|
||||||
|
|
||||||
|
use crate::tools::zip::{ZipEncoder, ZipEntry};
|
||||||
|
|
||||||
pub struct PxarExtractOptions<'a> {
|
pub struct PxarExtractOptions<'a> {
|
||||||
pub match_list: &'a[MatchEntry],
|
pub match_list: &'a[MatchEntry],
|
||||||
pub extract_match_default: bool,
|
pub extract_match_default: bool,
|
||||||
@ -465,3 +470,116 @@ impl Extractor {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn create_zip<T, W, P>(
|
||||||
|
output: W,
|
||||||
|
decoder: Accessor<T>,
|
||||||
|
path: P,
|
||||||
|
verbose: bool,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
|
||||||
|
W: tokio::io::AsyncWrite + Unpin + Send + 'static,
|
||||||
|
P: AsRef<Path>,
|
||||||
|
{
|
||||||
|
let root = decoder.open_root().await?;
|
||||||
|
let file = root
|
||||||
|
.lookup(&path).await?
|
||||||
|
.ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
|
||||||
|
|
||||||
|
let mut prefix = PathBuf::new();
|
||||||
|
let mut components = file.entry().path().components();
|
||||||
|
components.next_back(); // discar last
|
||||||
|
for comp in components {
|
||||||
|
prefix.push(comp);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut zipencoder = ZipEncoder::new(output);
|
||||||
|
let mut decoder = decoder;
|
||||||
|
recurse_files_zip(&mut zipencoder, &mut decoder, &prefix, file, verbose)
|
||||||
|
.await
|
||||||
|
.map_err(|err| {
|
||||||
|
eprintln!("error during creating of zip: {}", err);
|
||||||
|
err
|
||||||
|
})?;
|
||||||
|
|
||||||
|
zipencoder
|
||||||
|
.finish()
|
||||||
|
.await
|
||||||
|
.map_err(|err| {
|
||||||
|
eprintln!("error during finishing of zip: {}", err);
|
||||||
|
err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recurse_files_zip<'a, T, W>(
|
||||||
|
zip: &'a mut ZipEncoder<W>,
|
||||||
|
decoder: &'a mut Accessor<T>,
|
||||||
|
prefix: &'a Path,
|
||||||
|
file: FileEntry<T>,
|
||||||
|
verbose: bool,
|
||||||
|
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
|
||||||
|
where
|
||||||
|
T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
|
||||||
|
W: tokio::io::AsyncWrite + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
use pxar::EntryKind;
|
||||||
|
Box::pin(async move {
|
||||||
|
let metadata = file.entry().metadata();
|
||||||
|
let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
|
||||||
|
|
||||||
|
match file.kind() {
|
||||||
|
EntryKind::File { .. } => {
|
||||||
|
if verbose {
|
||||||
|
eprintln!("adding '{}' to zip", path.display());
|
||||||
|
}
|
||||||
|
let entry = ZipEntry::new(
|
||||||
|
path,
|
||||||
|
metadata.stat.mtime.secs,
|
||||||
|
metadata.stat.mode as u16,
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
zip.add_entry(entry, Some(file.contents().await?))
|
||||||
|
.await
|
||||||
|
.map_err(|err| format_err!("could not send file entry: {}", err))?;
|
||||||
|
}
|
||||||
|
EntryKind::Hardlink(_) => {
|
||||||
|
let realfile = decoder.follow_hardlink(&file).await?;
|
||||||
|
if verbose {
|
||||||
|
eprintln!("adding '{}' to zip", path.display());
|
||||||
|
}
|
||||||
|
let entry = ZipEntry::new(
|
||||||
|
path,
|
||||||
|
metadata.stat.mtime.secs,
|
||||||
|
metadata.stat.mode as u16,
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
zip.add_entry(entry, Some(realfile.contents().await?))
|
||||||
|
.await
|
||||||
|
.map_err(|err| format_err!("could not send file entry: {}", err))?;
|
||||||
|
}
|
||||||
|
EntryKind::Directory => {
|
||||||
|
let dir = file.enter_directory().await?;
|
||||||
|
let mut readdir = dir.read_dir();
|
||||||
|
if verbose {
|
||||||
|
eprintln!("adding '{}' to zip", path.display());
|
||||||
|
}
|
||||||
|
let entry = ZipEntry::new(
|
||||||
|
path,
|
||||||
|
metadata.stat.mtime.secs,
|
||||||
|
metadata.stat.mode as u16,
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
zip.add_entry::<FileContents<T>>(entry, None).await?;
|
||||||
|
while let Some(entry) = readdir.next().await {
|
||||||
|
let entry = entry?.decode_entry().await?;
|
||||||
|
recurse_files_zip(zip, decoder, prefix, entry, verbose).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {} // ignore all else
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ mod flags;
|
|||||||
pub use flags::Flags;
|
pub use flags::Flags;
|
||||||
|
|
||||||
pub use create::{create_archive, PxarCreateOptions};
|
pub use create::{create_archive, PxarCreateOptions};
|
||||||
pub use extract::{extract_archive, ErrorHandler, PxarExtractOptions};
|
pub use extract::{create_zip, extract_archive, ErrorHandler, PxarExtractOptions};
|
||||||
|
|
||||||
/// The format requires to build sorted directory lookup tables in
|
/// The format requires to build sorted directory lookup tables in
|
||||||
/// memory, so we restrict the number of allowed entries to limit
|
/// memory, so we restrict the number of allowed entries to limit
|
||||||
|
Loading…
Reference in New Issue
Block a user