switch to external pxar and fuse crates

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller
2020-03-23 15:03:18 +01:00
parent ab1092392f
commit c443f58b09
28 changed files with 4213 additions and 6191 deletions

View File

@ -1,13 +1,25 @@
use anyhow::{bail, format_err, Error};
use nix::unistd::{fork, ForkResult, pipe};
use std::os::unix::io::RawFd;
use chrono::{Local, DateTime, Utc, TimeZone};
use std::path::{Path, PathBuf};
use std::collections::{HashSet, HashMap};
use std::ffi::OsStr;
use std::io::{Write, Seek, SeekFrom};
use std::io::{self, Write, Seek, SeekFrom};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use chrono::{Local, DateTime, Utc, TimeZone};
use futures::future::FutureExt;
use futures::select;
use futures::stream::{StreamExt, TryStreamExt};
use nix::unistd::{fork, ForkResult, pipe};
use serde_json::{json, Value};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use xdg::BaseDirectories;
use pathpatterns::{MatchEntry, MatchType, PatternFlag};
use proxmox::{sortable, identity};
use proxmox::tools::fs::{file_get_contents, file_get_json, replace_file, CreateOptions, image_size};
use proxmox::sys::linux::tty;
@ -20,16 +32,7 @@ use proxmox_backup::tools;
use proxmox_backup::api2::types::*;
use proxmox_backup::client::*;
use proxmox_backup::backup::*;
use proxmox_backup::pxar::{ self, catalog::* };
use serde_json::{json, Value};
//use hyper::Body;
use std::sync::{Arc, Mutex};
//use regex::Regex;
use xdg::BaseDirectories;
use futures::*;
use tokio::sync::mpsc;
use proxmox_backup::pxar::catalog::*;
const ENV_VAR_PBS_FINGERPRINT: &str = "PBS_FINGERPRINT";
const ENV_VAR_PBS_PASSWORD: &str = "PBS_PASSWORD";
@ -243,7 +246,7 @@ async fn backup_directory<P: AsRef<Path>>(
skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
exclude_pattern: Vec<pxar::MatchPattern>,
exclude_pattern: Vec<MatchEntry>,
entries_max: usize,
) -> Result<BackupStats, Error> {
@ -769,7 +772,7 @@ fn spawn_catalog_upload(
type: Integer,
description: "Max number of entries to hold in memory.",
optional: true,
default: pxar::ENCODER_MAX_ENTRIES as isize,
default: proxmox_backup::pxar::ENCODER_MAX_ENTRIES as isize,
},
"verbose": {
type: Boolean,
@ -812,17 +815,19 @@ async fn create_backup(
let include_dev = param["include-dev"].as_array();
let entries_max = param["entries-max"].as_u64().unwrap_or(pxar::ENCODER_MAX_ENTRIES as u64);
let entries_max = param["entries-max"].as_u64()
.unwrap_or(proxmox_backup::pxar::ENCODER_MAX_ENTRIES as u64);
let empty = Vec::new();
let arg_pattern = param["exclude"].as_array().unwrap_or(&empty);
let exclude_args = param["exclude"].as_array().unwrap_or(&empty);
let mut pattern_list = Vec::with_capacity(arg_pattern.len());
for s in arg_pattern {
let l = s.as_str().ok_or_else(|| format_err!("Invalid pattern string slice"))?;
let p = pxar::MatchPattern::from_line(l.as_bytes())?
.ok_or_else(|| format_err!("Invalid match pattern in arguments"))?;
pattern_list.push(p);
let mut exclude_list = Vec::with_capacity(exclude_args.len());
for entry in exclude_args {
let entry = entry.as_str().ok_or_else(|| format_err!("Invalid pattern string slice"))?;
exclude_list.push(
MatchEntry::parse_pattern(entry, PatternFlag::PATH_NAME, MatchType::Exclude)
.map_err(|err| format_err!("invalid exclude pattern entry: {}", err))?
);
}
let mut devices = if all_file_systems { None } else { Some(HashSet::new()) };
@ -966,7 +971,7 @@ async fn create_backup(
skip_lost_and_found,
crypt_config.clone(),
catalog.clone(),
pattern_list.clone(),
exclude_list.clone(),
entries_max as usize,
).await?;
manifest.add_file(target, stats.size, stats.csum)?;
@ -1246,18 +1251,19 @@ async fn restore(param: Value) -> Result<Value, Error> {
let mut reader = BufferedDynamicReader::new(index, chunk_reader);
if let Some(target) = target {
let feature_flags = pxar::flags::DEFAULT;
let mut decoder = pxar::SequentialDecoder::new(&mut reader, feature_flags);
decoder.set_callback(move |path| {
if verbose {
eprintln!("{:?}", path);
}
Ok(())
});
decoder.set_allow_existing_dirs(allow_existing_dirs);
decoder.restore(Path::new(target), &Vec::new())?;
proxmox_backup::pxar::extract_archive(
pxar::decoder::Decoder::from_std(reader)?,
Path::new(target),
&[],
proxmox_backup::pxar::flags::DEFAULT,
allow_existing_dirs,
|path| {
if verbose {
println!("{:?}", path);
}
},
)
.map_err(|err| format_err!("error extracting archive - {}", err))?;
} else {
let mut writer = std::fs::OpenOptions::new()
.write(true)
@ -1966,6 +1972,41 @@ fn mount(
}
}
use proxmox_backup::client::RemoteChunkReader;
/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
/// async use!
///
/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
/// so that we can properly access it from multiple threads simultaneously while not issuing
/// duplicate simultaneous reads over http.
struct BufferedDynamicReadAt {
inner: Mutex<BufferedDynamicReader<RemoteChunkReader>>,
}
impl BufferedDynamicReadAt {
fn new(inner: BufferedDynamicReader<RemoteChunkReader>) -> Self {
Self {
inner: Mutex::new(inner),
}
}
}
impl pxar::accessor::ReadAt for BufferedDynamicReadAt {
fn poll_read_at(
self: Pin<&Self>,
_cx: &mut Context,
buf: &mut [u8],
offset: u64,
) -> Poll<io::Result<usize>> {
use std::io::Read;
tokio::task::block_in_place(move || {
let mut reader = self.inner.lock().unwrap();
reader.seek(SeekFrom::Start(offset))?;
Poll::Ready(Ok(reader.read(buf)?))
})
}
}
async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let archive_name = tools::required_string_param(&param, "archive-name")?;
@ -2015,15 +2056,19 @@ async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let decoder = pxar::Decoder::new(reader)?;
let archive_size = reader.archive_size();
let reader: proxmox_backup::pxar::fuse::Reader =
Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let options = OsStr::new("ro,default_permissions");
let mut session = pxar::fuse::Session::new(decoder, &options, pipe.is_none())
.map_err(|err| format_err!("pxar mount failed: {}", err))?;
// Mount the session but not call fuse deamonize as this will cause
// issues with the runtime after the fork
let deamonize = false;
session.mount(&Path::new(target), deamonize)?;
let session = proxmox_backup::pxar::fuse::Session::mount(
decoder,
&options,
false,
Path::new(target),
)
.map_err(|err| format_err!("pxar mount failed: {}", err))?;
if let Some(pipe) = pipe {
nix::unistd::chdir(Path::new("/")).unwrap();
@ -2045,8 +2090,13 @@ async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
nix::unistd::close(pipe).unwrap();
}
let multithreaded = true;
session.run_loop(multithreaded)?;
let mut interrupt = signal(SignalKind::interrupt())?;
select! {
res = session.fuse() => res?,
_ = interrupt.recv().fuse() => {
// exit on interrupted
}
}
} else {
bail!("unknown archive file extension (expected .pxar)");
}
@ -2129,11 +2179,10 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config.clone(), most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let mut decoder = pxar::Decoder::new(reader)?;
decoder.set_callback(|path| {
println!("{:?}", path);
Ok(())
});
let archive_size = reader.archive_size();
let reader: proxmox_backup::pxar::fuse::Reader =
Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)
@ -2161,10 +2210,10 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
catalog_reader,
&server_archive_name,
decoder,
)?;
).await?;
println!("Starting interactive shell");
state.shell()?;
state.shell().await?;
record_repository(&repo);