client: rustfmt

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-04-14 14:08:48 +02:00
parent f9a5beaa15
commit bdfa637058
17 changed files with 722 additions and 529 deletions

View File

@ -1,23 +1,23 @@
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use std::io::{Write, Seek, SeekFrom};
use std::fs::File; use std::fs::File;
use std::sync::Arc; use std::io::{Seek, SeekFrom, Write};
use std::os::unix::fs::OpenOptionsExt; use std::os::unix::fs::OpenOptionsExt;
use std::sync::Arc;
use futures::future::AbortHandle; use futures::future::AbortHandle;
use serde_json::{json, Value}; use serde_json::{json, Value};
use pbs_tools::crypt_config::CryptConfig;
use pbs_tools::sha::sha256;
use pbs_datastore::{PROXMOX_BACKUP_READER_PROTOCOL_ID_V1, BackupManifest};
use pbs_datastore::data_blob::DataBlob; use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::data_blob_reader::DataBlobReader; use pbs_datastore::data_blob_reader::DataBlobReader;
use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile; use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::MANIFEST_BLOB_NAME; use pbs_datastore::manifest::MANIFEST_BLOB_NAME;
use pbs_datastore::{BackupManifest, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
use pbs_tools::crypt_config::CryptConfig;
use pbs_tools::sha::sha256;
use super::{HttpClient, H2Client}; use super::{H2Client, HttpClient};
/// Backup Reader /// Backup Reader
pub struct BackupReader { pub struct BackupReader {
@ -27,16 +27,18 @@ pub struct BackupReader {
} }
impl Drop for BackupReader { impl Drop for BackupReader {
fn drop(&mut self) { fn drop(&mut self) {
self.abort.abort(); self.abort.abort();
} }
} }
impl BackupReader { impl BackupReader {
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> { fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
Arc::new(Self { h2, abort, crypt_config}) Arc::new(Self {
h2,
abort,
crypt_config,
})
} }
/// Create a new instance by upgrading the connection at '/api2/json/reader' /// Create a new instance by upgrading the connection at '/api2/json/reader'
@ -49,7 +51,6 @@ impl BackupReader {
backup_time: i64, backup_time: i64,
debug: bool, debug: bool,
) -> Result<Arc<BackupReader>, Error> { ) -> Result<Arc<BackupReader>, Error> {
let param = json!({ let param = json!({
"backup-type": backup_type, "backup-type": backup_type,
"backup-id": backup_id, "backup-id": backup_id,
@ -57,46 +58,39 @@ impl BackupReader {
"store": datastore, "store": datastore,
"debug": debug, "debug": debug,
}); });
let req = HttpClient::request_builder(client.server(), client.port(), "GET", "/api2/json/reader", Some(param)).unwrap(); let req = HttpClient::request_builder(
client.server(),
client.port(),
"GET",
"/api2/json/reader",
Some(param),
)
.unwrap();
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?; let (h2, abort) = client
.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
.await?;
Ok(BackupReader::new(h2, abort, crypt_config)) Ok(BackupReader::new(h2, abort, crypt_config))
} }
/// Execute a GET request /// Execute a GET request
pub async fn get( pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>,
) -> Result<Value, Error> {
self.h2.get(path, param).await self.h2.get(path, param).await
} }
/// Execute a PUT request /// Execute a PUT request
pub async fn put( pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>,
) -> Result<Value, Error> {
self.h2.put(path, param).await self.h2.put(path, param).await
} }
/// Execute a POST request /// Execute a POST request
pub async fn post( pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>,
) -> Result<Value, Error> {
self.h2.post(path, param).await self.h2.post(path, param).await
} }
/// Execute a GET request and send output to a writer /// Execute a GET request and send output to a writer
pub async fn download<W: Write + Send>( pub async fn download<W: Write + Send>(&self, file_name: &str, output: W) -> Result<(), Error> {
&self,
file_name: &str,
output: W,
) -> Result<(), Error> {
let path = "download"; let path = "download";
let param = json!({ "file-name": file_name }); let param = json!({ "file-name": file_name });
self.h2.download(path, Some(param), output).await self.h2.download(path, Some(param), output).await
@ -105,10 +99,7 @@ impl BackupReader {
/// Execute a special GET request and send output to a writer /// Execute a special GET request and send output to a writer
/// ///
/// This writes random data, and is only useful to test download speed. /// This writes random data, and is only useful to test download speed.
pub async fn speedtest<W: Write + Send>( pub async fn speedtest<W: Write + Send>(&self, output: W) -> Result<(), Error> {
&self,
output: W,
) -> Result<(), Error> {
self.h2.download("speedtest", None, output).await self.h2.download("speedtest", None, output).await
} }
@ -131,14 +122,14 @@ impl BackupReader {
/// ///
/// The manifest signature is verified if we have a crypt_config. /// The manifest signature is verified if we have a crypt_config.
pub async fn download_manifest(&self) -> Result<(BackupManifest, Vec<u8>), Error> { pub async fn download_manifest(&self) -> Result<(BackupManifest, Vec<u8>), Error> {
let mut raw_data = Vec::with_capacity(64 * 1024); let mut raw_data = Vec::with_capacity(64 * 1024);
self.download(MANIFEST_BLOB_NAME, &mut raw_data).await?; self.download(MANIFEST_BLOB_NAME, &mut raw_data).await?;
let blob = DataBlob::load_from_reader(&mut &raw_data[..])?; let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
// no expected digest available // no expected digest available
let data = blob.decode(None, None)?; let data = blob.decode(None, None)?;
let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?; let manifest =
BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
Ok((manifest, data)) Ok((manifest, data))
} }
@ -152,7 +143,6 @@ impl BackupReader {
manifest: &BackupManifest, manifest: &BackupManifest,
name: &str, name: &str,
) -> Result<DataBlobReader<'_, File>, Error> { ) -> Result<DataBlobReader<'_, File>, Error> {
let mut tmpfile = std::fs::OpenOptions::new() let mut tmpfile = std::fs::OpenOptions::new()
.write(true) .write(true)
.read(true) .read(true)
@ -179,7 +169,6 @@ impl BackupReader {
manifest: &BackupManifest, manifest: &BackupManifest,
name: &str, name: &str,
) -> Result<DynamicIndexReader, Error> { ) -> Result<DynamicIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new() let mut tmpfile = std::fs::OpenOptions::new()
.write(true) .write(true)
.read(true) .read(true)
@ -207,7 +196,6 @@ impl BackupReader {
manifest: &BackupManifest, manifest: &BackupManifest,
name: &str, name: &str,
) -> Result<FixedIndexReader, Error> { ) -> Result<FixedIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new() let mut tmpfile = std::fs::OpenOptions::new()
.write(true) .write(true)
.read(true) .read(true)

View File

@ -3,7 +3,7 @@ use std::fmt;
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use pbs_api_types::{BACKUP_REPO_URL_REGEX, IP_V6_REGEX, Authid, Userid}; use pbs_api_types::{Authid, Userid, BACKUP_REPO_URL_REGEX, IP_V6_REGEX};
/// Reference remote backup locations /// Reference remote backup locations
/// ///
@ -21,15 +21,22 @@ pub struct BackupRepository {
} }
impl BackupRepository { impl BackupRepository {
pub fn new(
pub fn new(auth_id: Option<Authid>, host: Option<String>, port: Option<u16>, store: String) -> Self { auth_id: Option<Authid>,
host: Option<String>,
port: Option<u16>,
store: String,
) -> Self {
let host = match host { let host = match host {
Some(host) if (IP_V6_REGEX.regex_obj)().is_match(&host) => { Some(host) if (IP_V6_REGEX.regex_obj)().is_match(&host) => Some(format!("[{}]", host)),
Some(format!("[{}]", host))
},
other => other, other => other,
}; };
Self { auth_id, host, port, store } Self {
auth_id,
host,
port,
store,
}
} }
pub fn auth_id(&self) -> &Authid { pub fn auth_id(&self) -> &Authid {
@ -70,7 +77,14 @@ impl BackupRepository {
impl fmt::Display for BackupRepository { impl fmt::Display for BackupRepository {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match (&self.auth_id, &self.host, self.port) { match (&self.auth_id, &self.host, self.port) {
(Some(auth_id), _, _) => write!(f, "{}@{}:{}:{}", auth_id, self.host(), self.port(), self.store), (Some(auth_id), _, _) => write!(
f,
"{}@{}:{}:{}",
auth_id,
self.host(),
self.port(),
self.store
),
(None, Some(host), None) => write!(f, "{}:{}", host, self.store), (None, Some(host), None) => write!(f, "{}:{}", host, self.store),
(None, _, Some(port)) => write!(f, "{}:{}:{}", self.host(), port, self.store), (None, _, Some(port)) => write!(f, "{}:{}:{}", self.host(), port, self.store),
(None, None, None) => write!(f, "{}", self.store), (None, None, None) => write!(f, "{}", self.store),
@ -87,12 +101,15 @@ impl std::str::FromStr for BackupRepository {
/// `host` parts are optional, where `host` defaults to the local /// `host` parts are optional, where `host` defaults to the local
/// host, and `user` defaults to `root@pam`. /// host, and `user` defaults to `root@pam`.
fn from_str(url: &str) -> Result<Self, Self::Err> { fn from_str(url: &str) -> Result<Self, Self::Err> {
let cap = (BACKUP_REPO_URL_REGEX.regex_obj)()
let cap = (BACKUP_REPO_URL_REGEX.regex_obj)().captures(url) .captures(url)
.ok_or_else(|| format_err!("unable to parse repository url '{}'", url))?; .ok_or_else(|| format_err!("unable to parse repository url '{}'", url))?;
Ok(Self { Ok(Self {
auth_id: cap.get(1).map(|m| Authid::try_from(m.as_str().to_owned())).transpose()?, auth_id: cap
.get(1)
.map(|m| Authid::try_from(m.as_str().to_owned()))
.transpose()?,
host: cap.get(2).map(|m| m.as_str().to_owned()), host: cap.get(2).map(|m| m.as_str().to_owned()),
port: cap.get(3).map(|m| m.as_str().parse::<u16>()).transpose()?, port: cap.get(3).map(|m| m.as_str().parse::<u16>()).transpose()?,
store: cap[4].to_owned(), store: cap[4].to_owned(),

View File

@ -6,25 +6,29 @@ const_regex! {
BACKUPSPEC_REGEX = r"^([a-zA-Z0-9_-]+\.(pxar|img|conf|log)):(.+)$"; BACKUPSPEC_REGEX = r"^([a-zA-Z0-9_-]+\.(pxar|img|conf|log)):(.+)$";
} }
pub const BACKUP_SOURCE_SCHEMA: Schema = StringSchema::new( pub const BACKUP_SOURCE_SCHEMA: Schema =
"Backup source specification ([<label>:<path>]).") StringSchema::new("Backup source specification ([<label>:<path>]).")
.format(&ApiStringFormat::Pattern(&BACKUPSPEC_REGEX)) .format(&ApiStringFormat::Pattern(&BACKUPSPEC_REGEX))
.schema(); .schema();
pub enum BackupSpecificationType { PXAR, IMAGE, CONFIG, LOGFILE } pub enum BackupSpecificationType {
PXAR,
IMAGE,
CONFIG,
LOGFILE,
}
pub struct BackupSpecification { pub struct BackupSpecification {
pub archive_name: String, // left part pub archive_name: String, // left part
pub config_string: String, // right part pub config_string: String, // right part
pub spec_type: BackupSpecificationType, pub spec_type: BackupSpecificationType,
} }
pub fn parse_backup_specification(value: &str) -> Result<BackupSpecification, Error> { pub fn parse_backup_specification(value: &str) -> Result<BackupSpecification, Error> {
if let Some(caps) = (BACKUPSPEC_REGEX.regex_obj)().captures(value) { if let Some(caps) = (BACKUPSPEC_REGEX.regex_obj)().captures(value) {
let archive_name = caps.get(1).unwrap().as_str().into(); let archive_name = caps.get(1).unwrap().as_str().into();
let extension = caps.get(2).unwrap().as_str(); let extension = caps.get(2).unwrap().as_str();
let config_string = caps.get(3).unwrap().as_str().into(); let config_string = caps.get(3).unwrap().as_str().into();
let spec_type = match extension { let spec_type = match extension {
"pxar" => BackupSpecificationType::PXAR, "pxar" => BackupSpecificationType::PXAR,
"img" => BackupSpecificationType::IMAGE, "img" => BackupSpecificationType::IMAGE,
@ -32,7 +36,11 @@ pub fn parse_backup_specification(value: &str) -> Result<BackupSpecification, Er
"log" => BackupSpecificationType::LOGFILE, "log" => BackupSpecificationType::LOGFILE,
_ => bail!("unknown backup source type '{}'", extension), _ => bail!("unknown backup source type '{}'", extension),
}; };
return Ok(BackupSpecification { archive_name, config_string, spec_type }); return Ok(BackupSpecification {
archive_name,
config_string,
spec_type,
});
} }
bail!("unable to parse backup source specification '{}'", value); bail!("unable to parse backup source specification '{}'", value);

View File

@ -13,13 +13,13 @@ use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use pbs_api_types::HumanByte; use pbs_api_types::HumanByte;
use pbs_tools::crypt_config::CryptConfig;
use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1};
use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder}; use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile; use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME}; use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1};
use pbs_tools::crypt_config::CryptConfig;
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};

View File

@ -14,16 +14,16 @@ use nix::fcntl::OFlag;
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
use pathpatterns::{MatchEntry, MatchList, MatchPattern, MatchType, PatternFlag}; use pathpatterns::{MatchEntry, MatchList, MatchPattern, MatchType, PatternFlag};
use proxmox_sys::fs::{create_path, CreateOptions};
use proxmox_router::cli::{self, CliCommand, CliCommandMap, CliHelper, CommandLineInterface}; use proxmox_router::cli::{self, CliCommand, CliCommandMap, CliHelper, CommandLineInterface};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::fs::{create_path, CreateOptions};
use pxar::{EntryKind, Metadata}; use pxar::{EntryKind, Metadata};
use proxmox_async::runtime::block_in_place;
use pbs_datastore::catalog::{self, DirEntryAttribute}; use pbs_datastore::catalog::{self, DirEntryAttribute};
use proxmox_async::runtime::block_in_place;
use crate::pxar::Flags;
use crate::pxar::fuse::{Accessor, FileEntry}; use crate::pxar::fuse::{Accessor, FileEntry};
use crate::pxar::Flags;
type CatalogReader = pbs_datastore::catalog::CatalogReader<std::fs::File>; type CatalogReader = pbs_datastore::catalog::CatalogReader<std::fs::File>;
@ -91,10 +91,7 @@ pub fn catalog_shell_cli() -> CommandLineInterface {
"find", "find",
CliCommand::new(&API_METHOD_FIND_COMMAND).arg_param(&["pattern"]), CliCommand::new(&API_METHOD_FIND_COMMAND).arg_param(&["pattern"]),
) )
.insert( .insert("exit", CliCommand::new(&API_METHOD_EXIT))
"exit",
CliCommand::new(&API_METHOD_EXIT),
)
.insert_help(), .insert_help(),
) )
} }
@ -1070,7 +1067,8 @@ impl<'a> ExtractorState<'a> {
} }
self.path.extend(&entry.name); self.path.extend(&entry.name);
self.extractor.set_path(OsString::from_vec(self.path.clone())); self.extractor
.set_path(OsString::from_vec(self.path.clone()));
self.handle_entry(entry).await?; self.handle_entry(entry).await?;
} }
@ -1122,7 +1120,8 @@ impl<'a> ExtractorState<'a> {
let dir_pxar = self.dir_stack.last().unwrap().pxar.as_ref().unwrap(); let dir_pxar = self.dir_stack.last().unwrap().pxar.as_ref().unwrap();
let dir_meta = dir_pxar.entry().metadata().clone(); let dir_meta = dir_pxar.entry().metadata().clone();
let create = self.matches && match_result != Some(MatchType::Exclude); let create = self.matches && match_result != Some(MatchType::Exclude);
self.extractor.enter_directory(dir_pxar.file_name().to_os_string(), dir_meta, create)?; self.extractor
.enter_directory(dir_pxar.file_name().to_os_string(), dir_meta, create)?;
Ok(()) Ok(())
} }
@ -1172,13 +1171,9 @@ impl<'a> ExtractorState<'a> {
pxar::EntryKind::File { size, .. } => { pxar::EntryKind::File { size, .. } => {
let file_name = CString::new(entry.file_name().as_bytes())?; let file_name = CString::new(entry.file_name().as_bytes())?;
let mut contents = entry.contents().await?; let mut contents = entry.contents().await?;
self.extractor.async_extract_file( self.extractor
&file_name, .async_extract_file(&file_name, entry.metadata(), *size, &mut contents)
entry.metadata(), .await
*size,
&mut contents,
)
.await
} }
_ => { _ => {
bail!( bail!(
@ -1197,11 +1192,13 @@ impl<'a> ExtractorState<'a> {
let file_name = CString::new(entry.file_name().as_bytes())?; let file_name = CString::new(entry.file_name().as_bytes())?;
match (catalog_attr, entry.kind()) { match (catalog_attr, entry.kind()) {
(DirEntryAttribute::Symlink, pxar::EntryKind::Symlink(symlink)) => { (DirEntryAttribute::Symlink, pxar::EntryKind::Symlink(symlink)) => {
block_in_place(|| self.extractor.extract_symlink( block_in_place(|| {
&file_name, self.extractor.extract_symlink(
entry.metadata(), &file_name,
symlink.as_os_str(), entry.metadata(),
)) symlink.as_os_str(),
)
})
} }
(DirEntryAttribute::Symlink, _) => { (DirEntryAttribute::Symlink, _) => {
bail!( bail!(
@ -1211,7 +1208,10 @@ impl<'a> ExtractorState<'a> {
} }
(DirEntryAttribute::Hardlink, pxar::EntryKind::Hardlink(hardlink)) => { (DirEntryAttribute::Hardlink, pxar::EntryKind::Hardlink(hardlink)) => {
block_in_place(|| self.extractor.extract_hardlink(&file_name, hardlink.as_os_str())) block_in_place(|| {
self.extractor
.extract_hardlink(&file_name, hardlink.as_os_str())
})
} }
(DirEntryAttribute::Hardlink, _) => { (DirEntryAttribute::Hardlink, _) => {
bail!( bail!(
@ -1224,16 +1224,18 @@ impl<'a> ExtractorState<'a> {
self.extract_device(attr.clone(), &file_name, device, entry.metadata()) self.extract_device(attr.clone(), &file_name, device, entry.metadata())
} }
(DirEntryAttribute::Fifo, pxar::EntryKind::Fifo) => { (DirEntryAttribute::Fifo, pxar::EntryKind::Fifo) => block_in_place(|| {
block_in_place(|| self.extractor.extract_special(&file_name, entry.metadata(), 0)) self.extractor
} .extract_special(&file_name, entry.metadata(), 0)
}),
(DirEntryAttribute::Fifo, _) => { (DirEntryAttribute::Fifo, _) => {
bail!("catalog fifo {:?} not a fifo in the archive", self.path()); bail!("catalog fifo {:?} not a fifo in the archive", self.path());
} }
(DirEntryAttribute::Socket, pxar::EntryKind::Socket) => { (DirEntryAttribute::Socket, pxar::EntryKind::Socket) => block_in_place(|| {
block_in_place(|| self.extractor.extract_special(&file_name, entry.metadata(), 0)) self.extractor
} .extract_special(&file_name, entry.metadata(), 0)
}),
(DirEntryAttribute::Socket, _) => { (DirEntryAttribute::Socket, _) => {
bail!( bail!(
"catalog socket {:?} not a socket in the archive", "catalog socket {:?} not a socket in the archive",
@ -1277,6 +1279,9 @@ impl<'a> ExtractorState<'a> {
); );
} }
} }
block_in_place(|| self.extractor.extract_special(file_name, metadata, device.to_dev_t())) block_in_place(|| {
self.extractor
.extract_special(file_name, metadata, device.to_dev_t())
})
} }
} }

View File

@ -1,8 +1,8 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::Error;
use bytes::BytesMut; use bytes::BytesMut;
use anyhow::{Error};
use futures::ready; use futures::ready;
use futures::stream::{Stream, TryStream}; use futures::stream::{Stream, TryStream};
@ -18,7 +18,12 @@ pub struct ChunkStream<S: Unpin> {
impl<S: Unpin> ChunkStream<S> { impl<S: Unpin> ChunkStream<S> {
pub fn new(input: S, chunk_size: Option<usize>) -> Self { pub fn new(input: S, chunk_size: Option<usize>) -> Self {
Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} Self {
input,
chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
buffer: BytesMut::new(),
scan_pos: 0,
}
} }
} }
@ -30,7 +35,6 @@ where
S::Ok: AsRef<[u8]>, S::Ok: AsRef<[u8]>,
S::Error: Into<Error>, S::Error: Into<Error>,
{ {
type Item = Result<BytesMut, Error>; type Item = Result<BytesMut, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@ -82,7 +86,11 @@ pub struct FixedChunkStream<S: Unpin> {
impl<S: Unpin> FixedChunkStream<S> { impl<S: Unpin> FixedChunkStream<S> {
pub fn new(input: S, chunk_size: usize) -> Self { pub fn new(input: S, chunk_size: usize) -> Self {
Self { input, chunk_size, buffer: BytesMut::new() } Self {
input,
chunk_size,
buffer: BytesMut::new(),
}
} }
} }
@ -95,7 +103,10 @@ where
{ {
type Item = Result<BytesMut, S::Error>; type Item = Result<BytesMut, S::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<BytesMut, S::Error>>> {
let this = self.get_mut(); let this = self.get_mut();
loop { loop {
if this.buffer.len() >= this.chunk_size { if this.buffer.len() >= this.chunk_size {

View File

@ -4,26 +4,29 @@ use std::time::Duration;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use futures::*; use futures::*;
use http::Uri;
use http::header::HeaderValue; use http::header::HeaderValue;
use http::Uri;
use http::{Request, Response}; use http::{Request, Response};
use hyper::Body;
use hyper::client::{Client, HttpConnector}; use hyper::client::{Client, HttpConnector};
use openssl::{ssl::{SslConnector, SslMethod}, x509::X509StoreContextRef}; use hyper::Body;
use serde_json::{json, Value}; use openssl::{
ssl::{SslConnector, SslMethod},
x509::X509StoreContextRef,
};
use percent_encoding::percent_encode; use percent_encoding::percent_encode;
use serde_json::{json, Value};
use xdg::BaseDirectories; use xdg::BaseDirectories;
use proxmox_sys::linux::tty;
use proxmox_sys::fs::{file_get_json, replace_file, CreateOptions};
use proxmox_router::HttpError; use proxmox_router::HttpError;
use proxmox_sys::fs::{file_get_json, replace_file, CreateOptions};
use proxmox_sys::linux::tty;
use proxmox_async::broadcast_future::BroadcastFuture;
use proxmox_http::client::{HttpsConnector, RateLimiter}; use proxmox_http::client::{HttpsConnector, RateLimiter};
use proxmox_http::uri::build_authority; use proxmox_http::uri::build_authority;
use proxmox_async::broadcast_future::BroadcastFuture;
use pbs_api_types::{Authid, Userid, RateLimitConfig};
use pbs_api_types::percent_encoding::DEFAULT_ENCODE_SET; use pbs_api_types::percent_encoding::DEFAULT_ENCODE_SET;
use pbs_api_types::{Authid, RateLimitConfig, Userid};
use pbs_tools::json::json_object_to_query; use pbs_tools::json::json_object_to_query;
use pbs_tools::ticket; use pbs_tools::ticket;
@ -53,7 +56,6 @@ pub struct HttpClientOptions {
} }
impl HttpClientOptions { impl HttpClientOptions {
pub fn new_interactive(password: Option<String>, fingerprint: Option<String>) -> Self { pub fn new_interactive(password: Option<String>, fingerprint: Option<String>) -> Self {
Self { Self {
password, password,
@ -144,7 +146,6 @@ pub struct HttpClient {
/// Delete stored ticket data (logout) /// Delete stored ticket data (logout)
pub fn delete_ticket_info(prefix: &str, server: &str, username: &Userid) -> Result<(), Error> { pub fn delete_ticket_info(prefix: &str, server: &str, username: &Userid) -> Result<(), Error> {
let base = BaseDirectories::with_prefix(prefix)?; let base = BaseDirectories::with_prefix(prefix)?;
// usually /run/user/<uid>/... // usually /run/user/<uid>/...
@ -158,13 +159,17 @@ pub fn delete_ticket_info(prefix: &str, server: &str, username: &Userid) -> Resu
map.remove(username.as_str()); map.remove(username.as_str());
} }
replace_file(path, data.to_string().as_bytes(), CreateOptions::new().perm(mode), false)?; replace_file(
path,
data.to_string().as_bytes(),
CreateOptions::new().perm(mode),
false,
)?;
Ok(()) Ok(())
} }
fn store_fingerprint(prefix: &str, server: &str, fingerprint: &str) -> Result<(), Error> { fn store_fingerprint(prefix: &str, server: &str, fingerprint: &str) -> Result<(), Error> {
let base = BaseDirectories::with_prefix(prefix)?; let base = BaseDirectories::with_prefix(prefix)?;
// usually ~/.config/<prefix>/fingerprints // usually ~/.config/<prefix>/fingerprints
@ -206,7 +211,6 @@ fn store_fingerprint(prefix: &str, server: &str, fingerprint: &str) -> Result<()
} }
fn load_fingerprint(prefix: &str, server: &str) -> Option<String> { fn load_fingerprint(prefix: &str, server: &str) -> Option<String> {
let base = BaseDirectories::with_prefix(prefix).ok()?; let base = BaseDirectories::with_prefix(prefix).ok()?;
// usually ~/.config/<prefix>/fingerprints // usually ~/.config/<prefix>/fingerprints
@ -224,8 +228,13 @@ fn load_fingerprint(prefix: &str, server: &str) -> Option<String> {
None None
} }
fn store_ticket_info(prefix: &str, server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> { fn store_ticket_info(
prefix: &str,
server: &str,
username: &str,
ticket: &str,
token: &str,
) -> Result<(), Error> {
let base = BaseDirectories::with_prefix(prefix)?; let base = BaseDirectories::with_prefix(prefix)?;
// usually /run/user/<uid>/... // usually /run/user/<uid>/...
@ -255,7 +264,12 @@ fn store_ticket_info(prefix: &str, server: &str, username: &str, ticket: &str, t
} }
} }
replace_file(path, new_data.to_string().as_bytes(), CreateOptions::new().perm(mode), false)?; replace_file(
path,
new_data.to_string().as_bytes(),
CreateOptions::new().perm(mode),
false,
)?;
Ok(()) Ok(())
} }
@ -300,7 +314,6 @@ impl HttpClient {
auth_id: &Authid, auth_id: &Authid,
mut options: HttpClientOptions, mut options: HttpClientOptions,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let verified_fingerprint = Arc::new(Mutex::new(None)); let verified_fingerprint = Arc::new(Mutex::new(None));
let mut expected_fingerprint = options.fingerprint.take(); let mut expected_fingerprint = options.fingerprint.take();
@ -320,25 +333,32 @@ impl HttpClient {
let interactive = options.interactive; let interactive = options.interactive;
let fingerprint_cache = options.fingerprint_cache; let fingerprint_cache = options.fingerprint_cache;
let prefix = options.prefix.clone(); let prefix = options.prefix.clone();
ssl_connector_builder.set_verify_callback(openssl::ssl::SslVerifyMode::PEER, move |valid, ctx| { ssl_connector_builder.set_verify_callback(
match Self::verify_callback(valid, ctx, expected_fingerprint.as_ref(), interactive) { openssl::ssl::SslVerifyMode::PEER,
move |valid, ctx| match Self::verify_callback(
valid,
ctx,
expected_fingerprint.as_ref(),
interactive,
) {
Ok(None) => true, Ok(None) => true,
Ok(Some(fingerprint)) => { Ok(Some(fingerprint)) => {
if fingerprint_cache && prefix.is_some() { if fingerprint_cache && prefix.is_some() {
if let Err(err) = store_fingerprint( if let Err(err) =
prefix.as_ref().unwrap(), &server, &fingerprint) { store_fingerprint(prefix.as_ref().unwrap(), &server, &fingerprint)
{
eprintln!("{}", err); eprintln!("{}", err);
} }
} }
*verified_fingerprint.lock().unwrap() = Some(fingerprint); *verified_fingerprint.lock().unwrap() = Some(fingerprint);
true true
}, }
Err(err) => { Err(err) => {
eprintln!("certificate validation failed - {}", err); eprintln!("certificate validation failed - {}", err);
false false
}, }
} },
}); );
} else { } else {
ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
} }
@ -348,25 +368,31 @@ impl HttpClient {
httpc.enforce_http(false); // we want https... httpc.enforce_http(false); // we want https...
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))); httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); let mut https = HttpsConnector::with_connector(
httpc,
ssl_connector_builder.build(),
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
);
if let Some(rate_in) = options.limit.rate_in { if let Some(rate_in) = options.limit.rate_in {
let burst_in = options.limit.burst_in.unwrap_or(rate_in).as_u64(); let burst_in = options.limit.burst_in.unwrap_or(rate_in).as_u64();
https.set_read_limiter(Some(Arc::new(Mutex::new( https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(
RateLimiter::new(rate_in.as_u64(), burst_in) rate_in.as_u64(),
)))); burst_in,
)))));
} }
if let Some(rate_out) = options.limit.rate_out { if let Some(rate_out) = options.limit.rate_out {
let burst_out = options.limit.burst_out.unwrap_or(rate_out).as_u64(); let burst_out = options.limit.burst_out.unwrap_or(rate_out).as_u64();
https.set_write_limiter(Some(Arc::new(Mutex::new( https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(
RateLimiter::new(rate_out.as_u64(), burst_out) rate_out.as_u64(),
)))); burst_out,
)))));
} }
let client = Client::builder() let client = Client::builder()
//.http2_initial_stream_window_size( (1 << 31) - 2) //.http2_initial_stream_window_size( (1 << 31) - 2)
//.http2_initial_connection_window_size( (1 << 31) - 2) //.http2_initial_connection_window_size( (1 << 31) - 2)
.build::<_, Body>(https); .build::<_, Body>(https);
let password = options.password.take(); let password = options.password.take();
@ -404,18 +430,32 @@ impl HttpClient {
let renewal_future = async move { let renewal_future = async move {
loop { loop {
tokio::time::sleep(Duration::new(60*15, 0)).await; // 15 minutes tokio::time::sleep(Duration::new(60 * 15, 0)).await; // 15 minutes
let (auth_id, ticket) = { let (auth_id, ticket) = {
let authinfo = auth2.read().unwrap().clone(); let authinfo = auth2.read().unwrap().clone();
(authinfo.auth_id, authinfo.ticket) (authinfo.auth_id, authinfo.ticket)
}; };
match Self::credentials(client2.clone(), server2.clone(), port, auth_id.user().clone(), ticket).await { match Self::credentials(
client2.clone(),
server2.clone(),
port,
auth_id.user().clone(),
ticket,
)
.await
{
Ok(auth) => { Ok(auth) => {
if use_ticket_cache && prefix2.is_some() { if use_ticket_cache && prefix2.is_some() {
let _ = store_ticket_info(prefix2.as_ref().unwrap(), &server2, &auth.auth_id.to_string(), &auth.ticket, &auth.token); let _ = store_ticket_info(
prefix2.as_ref().unwrap(),
&server2,
&auth.auth_id.to_string(),
&auth.ticket,
&auth.token,
);
} }
*auth2.write().unwrap() = auth; *auth2.write().unwrap() = auth;
}, }
Err(err) => { Err(err) => {
eprintln!("re-authentication failed: {}", err); eprintln!("re-authentication failed: {}", err);
return; return;
@ -432,14 +472,21 @@ impl HttpClient {
port, port,
auth_id.user().clone(), auth_id.user().clone(),
password, password,
).map_ok({ )
.map_ok({
let server = server.to_string(); let server = server.to_string();
let prefix = options.prefix.clone(); let prefix = options.prefix.clone();
let authinfo = auth.clone(); let authinfo = auth.clone();
move |auth| { move |auth| {
if use_ticket_cache && prefix.is_some() { if use_ticket_cache && prefix.is_some() {
let _ = store_ticket_info(prefix.as_ref().unwrap(), &server, &auth.auth_id.to_string(), &auth.ticket, &auth.token); let _ = store_ticket_info(
prefix.as_ref().unwrap(),
&server,
&auth.auth_id.to_string(),
&auth.ticket,
&auth.token,
);
} }
*authinfo.write().unwrap() = auth; *authinfo.write().unwrap() = auth;
tokio::spawn(renewal_future); tokio::spawn(renewal_future);
@ -502,7 +549,6 @@ impl HttpClient {
expected_fingerprint: Option<&String>, expected_fingerprint: Option<&String>,
interactive: bool, interactive: bool,
) -> Result<Option<String>, Error> { ) -> Result<Option<String>, Error> {
if openssl_valid { if openssl_valid {
return Ok(None); return Ok(None);
} }
@ -513,15 +559,21 @@ impl HttpClient {
}; };
let depth = ctx.error_depth(); let depth = ctx.error_depth();
if depth != 0 { bail!("context depth != 0") } if depth != 0 {
bail!("context depth != 0")
}
let fp = match cert.digest(openssl::hash::MessageDigest::sha256()) { let fp = match cert.digest(openssl::hash::MessageDigest::sha256()) {
Ok(fp) => fp, Ok(fp) => fp,
Err(err) => bail!("failed to calculate certificate FP - {}", err), // should not happen Err(err) => bail!("failed to calculate certificate FP - {}", err), // should not happen
}; };
let fp_string = hex::encode(&fp); let fp_string = hex::encode(&fp);
let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap()) let fp_string = fp_string
.collect::<Vec<&str>>().join(":"); .as_bytes()
.chunks(2)
.map(|v| std::str::from_utf8(v).unwrap())
.collect::<Vec<&str>>()
.join(":");
if let Some(expected_fingerprint) = expected_fingerprint { if let Some(expected_fingerprint) = expected_fingerprint {
let expected_fingerprint = expected_fingerprint.to_lowercase(); let expected_fingerprint = expected_fingerprint.to_lowercase();
@ -561,76 +613,70 @@ impl HttpClient {
} }
pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> { pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
let client = self.client.clone(); let client = self.client.clone();
let auth = self.login().await?; let auth = self.login().await?;
if auth.auth_id.is_token() { if auth.auth_id.is_token() {
let enc_api_token = format!("PBSAPIToken {}:{}", auth.auth_id, percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_api_token = format!(
req.headers_mut().insert("Authorization", HeaderValue::from_str(&enc_api_token).unwrap()); "PBSAPIToken {}:{}",
auth.auth_id,
percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)
);
req.headers_mut().insert(
"Authorization",
HeaderValue::from_str(&enc_api_token).unwrap(),
);
} else { } else {
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_ticket = format!(
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); "PBSAuthCookie={}",
req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)
);
req.headers_mut()
.insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
req.headers_mut().insert(
"CSRFPreventionToken",
HeaderValue::from_str(&auth.token).unwrap(),
);
} }
Self::api_request(client, req).await Self::api_request(client, req).await
} }
pub async fn get( pub async fn get(&self, path: &str, data: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
data: Option<Value>,
) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, self.port, "GET", path, data)?; let req = Self::request_builder(&self.server, self.port, "GET", path, data)?;
self.request(req).await self.request(req).await
} }
pub async fn delete( pub async fn delete(&self, path: &str, data: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
data: Option<Value>,
) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, self.port, "DELETE", path, data)?; let req = Self::request_builder(&self.server, self.port, "DELETE", path, data)?;
self.request(req).await self.request(req).await
} }
pub async fn post( pub async fn post(&self, path: &str, data: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
data: Option<Value>,
) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, self.port, "POST", path, data)?; let req = Self::request_builder(&self.server, self.port, "POST", path, data)?;
self.request(req).await self.request(req).await
} }
pub async fn put( pub async fn put(&self, path: &str, data: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
data: Option<Value>,
) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, self.port, "PUT", path, data)?; let req = Self::request_builder(&self.server, self.port, "PUT", path, data)?;
self.request(req).await self.request(req).await
} }
pub async fn download( pub async fn download(&self, path: &str, output: &mut (dyn Write + Send)) -> Result<(), Error> {
&self,
path: &str,
output: &mut (dyn Write + Send),
) -> Result<(), Error> {
let mut req = Self::request_builder(&self.server, self.port, "GET", path, None)?; let mut req = Self::request_builder(&self.server, self.port, "GET", path, None)?;
let client = self.client.clone(); let client = self.client.clone();
let auth = self.login().await?; let auth = self.login().await?;
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_ticket = format!(
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); "PBSAuthCookie={}",
percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)
);
req.headers_mut()
.insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
let resp = tokio::time::timeout( let resp = tokio::time::timeout(HTTP_TIMEOUT, client.request(req))
HTTP_TIMEOUT,
client.request(req)
)
.await .await
.map_err(|_| format_err!("http download request timed out"))??; .map_err(|_| format_err!("http download request timed out"))??;
let status = resp.status(); let status = resp.status();
@ -657,7 +703,6 @@ impl HttpClient {
path: &str, path: &str,
data: Option<Value>, data: Option<Value>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let query = match data { let query = match data {
Some(data) => Some(json_object_to_query(data)?), Some(data) => Some(json_object_to_query(data)?),
None => None, None => None,
@ -669,7 +714,8 @@ impl HttpClient {
.uri(url) .uri(url)
.header("User-Agent", "proxmox-backup-client/1.0") .header("User-Agent", "proxmox-backup-client/1.0")
.header("Content-Type", content_type) .header("Content-Type", content_type)
.body(body).unwrap(); .body(body)
.unwrap();
self.request(req).await self.request(req).await
} }
@ -679,25 +725,36 @@ impl HttpClient {
mut req: Request<Body>, mut req: Request<Body>,
protocol_name: String, protocol_name: String,
) -> Result<(H2Client, futures::future::AbortHandle), Error> { ) -> Result<(H2Client, futures::future::AbortHandle), Error> {
let client = self.client.clone(); let client = self.client.clone();
let auth = self.login().await?; let auth = self.login().await?;
if auth.auth_id.is_token() { if auth.auth_id.is_token() {
let enc_api_token = format!("PBSAPIToken {}:{}", auth.auth_id, percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_api_token = format!(
req.headers_mut().insert("Authorization", HeaderValue::from_str(&enc_api_token).unwrap()); "PBSAPIToken {}:{}",
auth.auth_id,
percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)
);
req.headers_mut().insert(
"Authorization",
HeaderValue::from_str(&enc_api_token).unwrap(),
);
} else { } else {
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_ticket = format!(
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); "PBSAuthCookie={}",
req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)
);
req.headers_mut()
.insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
req.headers_mut().insert(
"CSRFPreventionToken",
HeaderValue::from_str(&auth.token).unwrap(),
);
} }
req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap()); req.headers_mut()
.insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
let resp = tokio::time::timeout( let resp = tokio::time::timeout(HTTP_TIMEOUT, client.request(req))
HTTP_TIMEOUT,
client.request(req)
)
.await .await
.map_err(|_| format_err!("http upgrade request timed out"))??; .map_err(|_| format_err!("http upgrade request timed out"))??;
let status = resp.status(); let status = resp.status();
@ -714,12 +771,11 @@ impl HttpClient {
let (h2, connection) = h2::client::Builder::new() let (h2, connection) = h2::client::Builder::new()
.initial_connection_window_size(max_window_size) .initial_connection_window_size(max_window_size)
.initial_window_size(max_window_size) .initial_window_size(max_window_size)
.max_frame_size(4*1024*1024) .max_frame_size(4 * 1024 * 1024)
.handshake(upgraded) .handshake(upgraded)
.await?; .await?;
let connection = connection let connection = connection.map_err(|_| eprintln!("HTTP/2.0 connection failed"));
.map_err(|_| eprintln!("HTTP/2.0 connection failed"));
let (connection, abort) = futures::future::abortable(connection); let (connection, abort) = futures::future::abortable(connection);
// A cancellable future returns an Option which is None when cancelled and // A cancellable future returns an Option which is None when cancelled and
@ -743,12 +799,21 @@ impl HttpClient {
password: String, password: String,
) -> Result<AuthInfo, Error> { ) -> Result<AuthInfo, Error> {
let data = json!({ "username": username, "password": password }); let data = json!({ "username": username, "password": password });
let req = Self::request_builder(&server, port, "POST", "/api2/json/access/ticket", Some(data))?; let req = Self::request_builder(
&server,
port,
"POST",
"/api2/json/access/ticket",
Some(data),
)?;
let cred = Self::api_request(client, req).await?; let cred = Self::api_request(client, req).await?;
let auth = AuthInfo { let auth = AuthInfo {
auth_id: cred["data"]["username"].as_str().unwrap().parse()?, auth_id: cred["data"]["username"].as_str().unwrap().parse()?,
ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(), ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(), token: cred["data"]["CSRFPreventionToken"]
.as_str()
.unwrap()
.to_owned(),
}; };
Ok(auth) Ok(auth)
@ -773,17 +838,14 @@ impl HttpClient {
async fn api_request( async fn api_request(
client: Client<HttpsConnector>, client: Client<HttpsConnector>,
req: Request<Body> req: Request<Body>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
Self::api_response( Self::api_response(
tokio::time::timeout( tokio::time::timeout(HTTP_TIMEOUT, client.request(req))
HTTP_TIMEOUT,
client.request(req)
)
.await .await
.map_err(|_| format_err!("http request timed out"))?? .map_err(|_| format_err!("http request timed out"))??,
).await )
.await
} }
// Read-only access to server property // Read-only access to server property
@ -795,7 +857,13 @@ impl HttpClient {
self.port self.port
} }
pub fn request_builder(server: &str, port: u16, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> { pub fn request_builder(
server: &str,
port: u16,
method: &str,
path: &str,
data: Option<Value>,
) -> Result<Request<Body>, Error> {
if let Some(data) = data { if let Some(data) = data {
if method == "POST" { if method == "POST" {
let url = build_uri(server, port, path, None)?; let url = build_uri(server, port, path, None)?;
@ -813,7 +881,10 @@ impl HttpClient {
.method(method) .method(method)
.uri(url) .uri(url)
.header("User-Agent", "proxmox-backup-client/1.0") .header("User-Agent", "proxmox-backup-client/1.0")
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") .header(
hyper::header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.body(Body::empty())?; .body(Body::empty())?;
Ok(request) Ok(request)
} }
@ -823,7 +894,10 @@ impl HttpClient {
.method(method) .method(method)
.uri(url) .uri(url)
.header("User-Agent", "proxmox-backup-client/1.0") .header("User-Agent", "proxmox-backup-client/1.0")
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") .header(
hyper::header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.body(Body::empty())?; .body(Body::empty())?;
Ok(request) Ok(request)
@ -837,41 +911,27 @@ impl Drop for HttpClient {
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct H2Client { pub struct H2Client {
h2: h2::client::SendRequest<bytes::Bytes>, h2: h2::client::SendRequest<bytes::Bytes>,
} }
impl H2Client { impl H2Client {
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self { pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
Self { h2 } Self { h2 }
} }
pub async fn get( pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "GET", path, param, None).unwrap(); let req = Self::request_builder("localhost", "GET", path, param, None).unwrap();
self.request(req).await self.request(req).await
} }
pub async fn put( pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap(); let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap();
self.request(req).await self.request(req).await
} }
pub async fn post( pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "POST", path, param, None).unwrap(); let req = Self::request_builder("localhost", "POST", path, param, None).unwrap();
self.request(req).await self.request(req).await
} }
@ -912,7 +972,8 @@ impl H2Client {
content_type: &str, content_type: &str,
data: Vec<u8>, data: Vec<u8>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let request = Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap(); let request =
Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
let mut send_request = self.h2.clone().ready().await?; let mut send_request = self.h2.clone().ready().await?;
@ -926,17 +987,9 @@ impl H2Client {
.await .await
} }
async fn request( async fn request(&self, request: Request<()>) -> Result<Value, Error> {
&self,
request: Request<()>,
) -> Result<Value, Error> {
self.send_request(request, None) self.send_request(request, None)
.and_then(move |response| { .and_then(move |response| response.map_err(Error::from).and_then(Self::h2api_response))
response
.map_err(Error::from)
.and_then(Self::h2api_response)
})
.await .await
} }
@ -945,8 +998,8 @@ impl H2Client {
request: Request<()>, request: Request<()>,
data: Option<bytes::Bytes>, data: Option<bytes::Bytes>,
) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> { ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
self.h2
self.h2.clone() .clone()
.ready() .ready()
.map_err(Error::from) .map_err(Error::from)
.and_then(move |mut send_request| async move { .and_then(move |mut send_request| async move {
@ -961,9 +1014,7 @@ impl H2Client {
}) })
} }
pub async fn h2api_response( pub async fn h2api_response(response: Response<h2::RecvStream>) -> Result<Value, Error> {
response: Response<h2::RecvStream>,
) -> Result<Value, Error> {
let status = response.status(); let status = response.status();
let (_head, mut body) = response.into_parts(); let (_head, mut body) = response.into_parts();
@ -1013,7 +1064,10 @@ impl H2Client {
let query = json_object_to_query(param)?; let query = json_object_to_query(param)?;
// We detected problem with hyper around 6000 characters - so we try to keep on the safe side // We detected problem with hyper around 6000 characters - so we try to keep on the safe side
if query.len() > 4096 { if query.len() > 4096 {
bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); bail!(
"h2 query data too large ({} bytes) - please encode data inside body",
query.len()
);
} }
Some(query) Some(query)
} }

View File

@ -1,4 +1,4 @@
use std::collections::{HashSet, HashMap}; use std::collections::{HashMap, HashSet};
use std::ffi::{CStr, CString, OsStr}; use std::ffi::{CStr, CString, OsStr};
use std::fmt; use std::fmt;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
@ -8,29 +8,29 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use futures::future::BoxFuture;
use futures::FutureExt;
use nix::dir::Dir; use nix::dir::Dir;
use nix::errno::Errno; use nix::errno::Errno;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use nix::sys::stat::{FileStat, Mode}; use nix::sys::stat::{FileStat, Mode};
use futures::future::BoxFuture;
use futures::FutureExt;
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use pxar::encoder::{LinkOffset, SeqWrite};
use pxar::Metadata; use pxar::Metadata;
use pxar::encoder::{SeqWrite, LinkOffset};
use proxmox_sys::error::SysError;
use proxmox_sys::fd::RawFdNum;
use proxmox_sys::fd::Fd;
use proxmox_sys::fs::{self, acl, xattr};
use proxmox_io::vec; use proxmox_io::vec;
use proxmox_lang::c_str; use proxmox_lang::c_str;
use proxmox_sys::error::SysError;
use proxmox_sys::fd::Fd;
use proxmox_sys::fd::RawFdNum;
use proxmox_sys::fs::{self, acl, xattr};
use pbs_datastore::catalog::BackupCatalogWriter; use pbs_datastore::catalog::BackupCatalogWriter;
use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::Flags;
use crate::pxar::tools::assert_single_path_component; use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
/// Pxar options for creating a pxar archive/stream /// Pxar options for creating a pxar archive/stream
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -47,7 +47,6 @@ pub struct PxarCreateOptions {
pub verbose: bool, pub verbose: bool,
} }
fn detect_fs_type(fd: RawFd) -> Result<i64, Error> { fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
let mut fs_stat = std::mem::MaybeUninit::uninit(); let mut fs_stat = std::mem::MaybeUninit::uninit();
let res = unsafe { libc::fstatfs(fd, fs_stat.as_mut_ptr()) }; let res = unsafe { libc::fstatfs(fd, fs_stat.as_mut_ptr()) };
@ -229,7 +228,9 @@ where
file_copy_buffer: vec::undefined(4 * 1024 * 1024), file_copy_buffer: vec::undefined(4 * 1024 * 1024),
}; };
archiver.archive_dir_contents(&mut encoder, source_dir, true).await?; archiver
.archive_dir_contents(&mut encoder, source_dir, true)
.await?;
encoder.finish().await?; encoder.finish().await?;
Ok(()) Ok(())
} }
@ -285,13 +286,15 @@ impl Archiver {
let file_name = file_entry.name.to_bytes(); let file_name = file_entry.name.to_bytes();
if is_root && file_name == b".pxarexclude-cli" { if is_root && file_name == b".pxarexclude-cli" {
self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?; self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
.await?;
continue; continue;
} }
(self.callback)(&file_entry.path)?; (self.callback)(&file_entry.path)?;
self.path = file_entry.path; self.path = file_entry.path;
self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
.await
.map_err(|err| self.wrap_err(err))?; .map_err(|err| self.wrap_err(err))?;
} }
self.path = old_path; self.path = old_path;
@ -299,7 +302,8 @@ impl Archiver {
self.patterns.truncate(old_patterns_count); self.patterns.truncate(old_patterns_count);
Ok(()) Ok(())
}.boxed() }
.boxed()
} }
/// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`. /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
@ -332,7 +336,11 @@ impl Archiver {
Ok(None) Ok(None)
} }
Err(nix::Error::Sys(Errno::EACCES)) => { Err(nix::Error::Sys(Errno::EACCES)) => {
writeln!(self.errors, "failed to open file: {:?}: access denied", file_name)?; writeln!(
self.errors,
"failed to open file: {:?}: access denied",
file_name
)?;
Ok(None) Ok(None)
} }
Err(nix::Error::Sys(Errno::EPERM)) if !noatime.is_empty() => { Err(nix::Error::Sys(Errno::EPERM)) if !noatime.is_empty() => {
@ -341,7 +349,7 @@ impl Archiver {
continue; continue;
} }
Err(other) => Err(Error::from(other)), Err(other) => Err(Error::from(other)),
} };
} }
} }
@ -365,8 +373,7 @@ impl Archiver {
let _ = writeln!( let _ = writeln!(
self.errors, self.errors,
"ignoring .pxarexclude after read error in {:?}: {}", "ignoring .pxarexclude after read error in {:?}: {}",
self.path, self.path, err,
err,
); );
self.patterns.truncate(old_pattern_count); self.patterns.truncate(old_pattern_count);
return Ok(()); return Ok(());
@ -422,13 +429,18 @@ impl Archiver {
) -> Result<(), Error> { ) -> Result<(), Error> {
let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]); let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
if let Some(ref catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?; catalog
.lock()
.unwrap()
.add_file(file_name, content.len() as u64, 0)?;
} }
let mut metadata = Metadata::default(); let mut metadata = Metadata::default();
metadata.stat.mode = pxar::format::mode::IFREG | 0o600; metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?; let mut file = encoder
.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)
.await?;
file.write_all(&content).await?; file.write_all(&content).await?;
Ok(()) Ok(())
@ -481,13 +493,16 @@ impl Archiver {
self.entry_counter += 1; self.entry_counter += 1;
if self.entry_counter > self.entry_limit { if self.entry_counter > self.entry_limit {
bail!("exceeded allowed number of file entries (> {})",self.entry_limit); bail!(
"exceeded allowed number of file entries (> {})",
self.entry_limit
);
} }
file_list.push(FileListEntry { file_list.push(FileListEntry {
name: file_name, name: file_name,
path: full_path, path: full_path,
stat stat,
}); });
} }
@ -497,7 +512,11 @@ impl Archiver {
} }
fn report_vanished_file(&mut self) -> Result<(), Error> { fn report_vanished_file(&mut self) -> Result<(), Error> {
writeln!(self.errors, "warning: file vanished while reading: {:?}", self.path)?; writeln!(
self.errors,
"warning: file vanished while reading: {:?}",
self.path
)?;
Ok(()) Ok(())
} }
@ -547,7 +566,13 @@ impl Archiver {
None => return Ok(()), None => return Ok(()),
}; };
let metadata = get_metadata(fd.as_raw_fd(), stat, self.flags(), self.fs_magic, &mut self.fs_feature_flags)?; let metadata = get_metadata(
fd.as_raw_fd(),
stat,
self.flags(),
self.fs_magic,
&mut self.fs_feature_flags,
)?;
let match_path = PathBuf::from("/").join(self.path.clone()); let match_path = PathBuf::from("/").join(self.path.clone());
if self if self
@ -580,14 +605,19 @@ impl Archiver {
let file_size = stat.st_size as u64; let file_size = stat.st_size as u64;
if let Some(ref catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?; catalog
.lock()
.unwrap()
.add_file(c_file_name, file_size, stat.st_mtime)?;
} }
let offset: LinkOffset = let offset: LinkOffset = self
self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?; .add_regular_file(encoder, fd, file_name, &metadata, file_size)
.await?;
if stat.st_nlink > 1 { if stat.st_nlink > 1 {
self.hardlinks.insert(link_info, (self.path.clone(), offset)); self.hardlinks
.insert(link_info, (self.path.clone(), offset));
} }
Ok(()) Ok(())
@ -598,7 +628,9 @@ impl Archiver {
if let Some(ref catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().start_directory(c_file_name)?; catalog.lock().unwrap().start_directory(c_file_name)?;
} }
let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await; let result = self
.add_directory(encoder, dir, c_file_name, &metadata, stat)
.await;
if let Some(ref catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().end_directory()?; catalog.lock().unwrap().end_directory()?;
} }
@ -749,15 +781,23 @@ impl Archiver {
metadata: &Metadata, metadata: &Metadata,
stat: &FileStat, stat: &FileStat,
) -> Result<(), Error> { ) -> Result<(), Error> {
Ok(encoder.add_device( Ok(encoder
metadata, .add_device(
file_name, metadata,
pxar::format::Device::from_dev_t(stat.st_rdev), file_name,
).await?) pxar::format::Device::from_dev_t(stat.st_rdev),
)
.await?)
} }
} }
fn get_metadata(fd: RawFd, stat: &FileStat, flags: Flags, fs_magic: i64, fs_feature_flags: &mut Flags) -> Result<Metadata, Error> { fn get_metadata(
fd: RawFd,
stat: &FileStat,
flags: Flags,
fs_magic: i64,
fs_feature_flags: &mut Flags,
) -> Result<Metadata, Error> {
// required for some of these // required for some of these
let proc_path = Path::new("/proc/self/fd/").join(fd.to_string()); let proc_path = Path::new("/proc/self/fd/").join(fd.to_string());
@ -779,7 +819,12 @@ fn get_metadata(fd: RawFd, stat: &FileStat, flags: Flags, fs_magic: i64, fs_feat
Ok(meta) Ok(meta)
} }
fn get_fcaps(meta: &mut Metadata, fd: RawFd, flags: Flags, fs_feature_flags: &mut Flags) -> Result<(), Error> { fn get_fcaps(
meta: &mut Metadata,
fd: RawFd,
flags: Flags,
fs_feature_flags: &mut Flags,
) -> Result<(), Error> {
if !flags.contains(Flags::WITH_FCAPS) { if !flags.contains(Flags::WITH_FCAPS) {
return Ok(()); return Ok(());
} }
@ -815,7 +860,7 @@ fn get_xattr_fcaps_acl(
Err(Errno::EOPNOTSUPP) => { Err(Errno::EOPNOTSUPP) => {
fs_feature_flags.remove(Flags::WITH_XATTRS); fs_feature_flags.remove(Flags::WITH_XATTRS);
return Ok(()); return Ok(());
}, }
Err(Errno::EBADF) => return Ok(()), // symlinks Err(Errno::EBADF) => return Ok(()), // symlinks
Err(err) => bail!("failed to read xattrs: {}", err), Err(err) => bail!("failed to read xattrs: {}", err),
}; };
@ -932,7 +977,12 @@ fn get_quota_project_id(
Ok(()) Ok(())
} }
fn get_acl(metadata: &mut Metadata, proc_path: &Path, flags: Flags, fs_feature_flags: &mut Flags) -> Result<(), Error> { fn get_acl(
metadata: &mut Metadata,
proc_path: &Path,
flags: Flags,
fs_feature_flags: &mut Flags,
) -> Result<(), Error> {
if !flags.contains(Flags::WITH_ACL) { if !flags.contains(Flags::WITH_ACL) {
return Ok(()); return Ok(());
} }

View File

@ -151,24 +151,35 @@ impl Default for Flags {
} }
} }
// form /usr/include/linux/fs.h #[rustfmt::skip]
const FS_APPEND_FL: c_long = 0x0000_0020; mod fs_flags {
const FS_NOATIME_FL: c_long = 0x0000_0080; use libc::c_long;
const FS_COMPR_FL: c_long = 0x0000_0004; // form /usr/include/linux/fs.h
const FS_NOCOW_FL: c_long = 0x0080_0000; pub const FS_APPEND_FL: c_long = 0x0000_0020;
const FS_NODUMP_FL: c_long = 0x0000_0040; pub const FS_NOATIME_FL: c_long = 0x0000_0080;
const FS_DIRSYNC_FL: c_long = 0x0001_0000; pub const FS_COMPR_FL: c_long = 0x0000_0004;
const FS_IMMUTABLE_FL: c_long = 0x0000_0010; pub const FS_NOCOW_FL: c_long = 0x0080_0000;
const FS_SYNC_FL: c_long = 0x0000_0008; pub const FS_NODUMP_FL: c_long = 0x0000_0040;
const FS_NOCOMP_FL: c_long = 0x0000_0400; pub const FS_DIRSYNC_FL: c_long = 0x0001_0000;
const FS_PROJINHERIT_FL: c_long = 0x2000_0000; pub const FS_IMMUTABLE_FL: c_long = 0x0000_0010;
pub const FS_SYNC_FL: c_long = 0x0000_0008;
pub const FS_NOCOMP_FL: c_long = 0x0000_0400;
pub const FS_PROJINHERIT_FL: c_long = 0x2000_0000;
pub(crate) const INITIAL_FS_FLAGS: c_long = // from /usr/include/linux/msdos_fs.h
FS_NOATIME_FL pub const ATTR_HIDDEN: u32 = 2;
| FS_COMPR_FL pub const ATTR_SYS: u32 = 4;
| FS_NOCOW_FL pub const ATTR_ARCH: u32 = 32;
| FS_NOCOMP_FL
| FS_PROJINHERIT_FL; pub(crate) const INITIAL_FS_FLAGS: c_long =
FS_NOATIME_FL
| FS_COMPR_FL
| FS_NOCOW_FL
| FS_NOCOMP_FL
| FS_PROJINHERIT_FL;
}
use fs_flags::*; // for code formating/rusfmt
#[rustfmt::skip] #[rustfmt::skip]
const CHATTR_MAP: [(Flags, c_long); 10] = [ const CHATTR_MAP: [(Flags, c_long); 10] = [
@ -184,11 +195,6 @@ const CHATTR_MAP: [(Flags, c_long); 10] = [
( Flags::WITH_FLAG_PROJINHERIT, FS_PROJINHERIT_FL ), ( Flags::WITH_FLAG_PROJINHERIT, FS_PROJINHERIT_FL ),
]; ];
// from /usr/include/linux/msdos_fs.h
const ATTR_HIDDEN: u32 = 2;
const ATTR_SYS: u32 = 4;
const ATTR_ARCH: u32 = 32;
#[rustfmt::skip] #[rustfmt::skip]
const FAT_ATTR_MAP: [(Flags, u32); 3] = [ const FAT_ATTR_MAP: [(Flags, u32); 3] = [
( Flags::WITH_FLAG_HIDDEN, ATTR_HIDDEN ), ( Flags::WITH_FLAG_HIDDEN, ATTR_HIDDEN ),
@ -258,121 +264,117 @@ impl Flags {
use proxmox_sys::linux::magic::*; use proxmox_sys::linux::magic::*;
match magic { match magic {
MSDOS_SUPER_MAGIC => { MSDOS_SUPER_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME | Flags::WITH_READ_ONLY | Flags::WITH_FAT_ATTRS
Flags::WITH_READ_ONLY | }
Flags::WITH_FAT_ATTRS
},
EXT4_SUPER_MAGIC => { EXT4_SUPER_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_FLAG_APPEND | | Flags::WITH_FLAG_APPEND
Flags::WITH_FLAG_NOATIME | | Flags::WITH_FLAG_NOATIME
Flags::WITH_FLAG_NODUMP | | Flags::WITH_FLAG_NODUMP
Flags::WITH_FLAG_DIRSYNC | | Flags::WITH_FLAG_DIRSYNC
Flags::WITH_FLAG_IMMUTABLE | | Flags::WITH_FLAG_IMMUTABLE
Flags::WITH_FLAG_SYNC | | Flags::WITH_FLAG_SYNC
Flags::WITH_XATTRS | | Flags::WITH_XATTRS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_SELINUX | | Flags::WITH_SELINUX
Flags::WITH_FCAPS | | Flags::WITH_FCAPS
Flags::WITH_QUOTA_PROJID | Flags::WITH_QUOTA_PROJID
}, }
XFS_SUPER_MAGIC => { XFS_SUPER_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_FLAG_APPEND | | Flags::WITH_FLAG_APPEND
Flags::WITH_FLAG_NOATIME | | Flags::WITH_FLAG_NOATIME
Flags::WITH_FLAG_NODUMP | | Flags::WITH_FLAG_NODUMP
Flags::WITH_FLAG_IMMUTABLE | | Flags::WITH_FLAG_IMMUTABLE
Flags::WITH_FLAG_SYNC | | Flags::WITH_FLAG_SYNC
Flags::WITH_XATTRS | | Flags::WITH_XATTRS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_SELINUX | | Flags::WITH_SELINUX
Flags::WITH_FCAPS | | Flags::WITH_FCAPS
Flags::WITH_QUOTA_PROJID | Flags::WITH_QUOTA_PROJID
}, }
ZFS_SUPER_MAGIC => { ZFS_SUPER_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_FLAG_APPEND | | Flags::WITH_FLAG_APPEND
Flags::WITH_FLAG_NOATIME | | Flags::WITH_FLAG_NOATIME
Flags::WITH_FLAG_NODUMP | | Flags::WITH_FLAG_NODUMP
Flags::WITH_FLAG_DIRSYNC | | Flags::WITH_FLAG_DIRSYNC
Flags::WITH_FLAG_IMMUTABLE | | Flags::WITH_FLAG_IMMUTABLE
Flags::WITH_FLAG_SYNC | | Flags::WITH_FLAG_SYNC
Flags::WITH_XATTRS | | Flags::WITH_XATTRS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_SELINUX | | Flags::WITH_SELINUX
Flags::WITH_FCAPS | | Flags::WITH_FCAPS
Flags::WITH_QUOTA_PROJID | Flags::WITH_QUOTA_PROJID
}, }
BTRFS_SUPER_MAGIC => { BTRFS_SUPER_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_FLAG_APPEND | | Flags::WITH_FLAG_APPEND
Flags::WITH_FLAG_NOATIME | | Flags::WITH_FLAG_NOATIME
Flags::WITH_FLAG_COMPR | | Flags::WITH_FLAG_COMPR
Flags::WITH_FLAG_NOCOW | | Flags::WITH_FLAG_NOCOW
Flags::WITH_FLAG_NODUMP | | Flags::WITH_FLAG_NODUMP
Flags::WITH_FLAG_DIRSYNC | | Flags::WITH_FLAG_DIRSYNC
Flags::WITH_FLAG_IMMUTABLE | | Flags::WITH_FLAG_IMMUTABLE
Flags::WITH_FLAG_SYNC | | Flags::WITH_FLAG_SYNC
Flags::WITH_FLAG_NOCOMP | | Flags::WITH_FLAG_NOCOMP
Flags::WITH_XATTRS | | Flags::WITH_XATTRS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_SELINUX | | Flags::WITH_SELINUX
Flags::WITH_SUBVOLUME | | Flags::WITH_SUBVOLUME
Flags::WITH_SUBVOLUME_RO | | Flags::WITH_SUBVOLUME_RO
Flags::WITH_FCAPS | Flags::WITH_FCAPS
}, }
TMPFS_MAGIC => { TMPFS_MAGIC => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_SELINUX | Flags::WITH_SELINUX
}, }
// FUSE mounts are special as the supported feature set // FUSE mounts are special as the supported feature set
// is not clear a priori. // is not clear a priori.
FUSE_SUPER_MAGIC => { FUSE_SUPER_MAGIC => Flags::WITH_FUSE,
Flags::WITH_FUSE
},
_ => { _ => {
Flags::WITH_2SEC_TIME | Flags::WITH_2SEC_TIME
Flags::WITH_READ_ONLY | | Flags::WITH_READ_ONLY
Flags::WITH_PERMISSIONS | | Flags::WITH_PERMISSIONS
Flags::WITH_SYMLINKS | | Flags::WITH_SYMLINKS
Flags::WITH_DEVICE_NODES | | Flags::WITH_DEVICE_NODES
Flags::WITH_FIFOS | | Flags::WITH_FIFOS
Flags::WITH_SOCKETS | | Flags::WITH_SOCKETS
Flags::WITH_XATTRS | | Flags::WITH_XATTRS
Flags::WITH_ACL | | Flags::WITH_ACL
Flags::WITH_FCAPS | Flags::WITH_FCAPS
}, }
} }
} }
} }

View File

@ -503,7 +503,9 @@ impl SessionImpl {
async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> { async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
let entry = unsafe { let entry = unsafe {
self.accessor.open_file_at_range(&self.get_lookup(inode)?.entry_range_info).await? self.accessor
.open_file_at_range(&self.get_lookup(inode)?.entry_range_info)
.await?
}; };
to_stat(inode, &entry) to_stat(inode, &entry)
} }
@ -584,11 +586,7 @@ impl SessionImpl {
async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> { async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
let lookup = self.get_lookup(inode)?; let lookup = self.get_lookup(inode)?;
let metadata = self let metadata = self.open_entry(&lookup).await?.into_entry().into_metadata();
.open_entry(&lookup)
.await?
.into_entry()
.into_metadata();
let mut xattrs = metadata.xattrs; let mut xattrs = metadata.xattrs;

View File

@ -358,7 +358,10 @@ fn apply_quota_project_id(flags: Flags, fd: RawFd, metadata: &Metadata) -> Resul
} }
pub(crate) fn errno_is_unsupported(errno: Errno) -> bool { pub(crate) fn errno_is_unsupported(errno: Errno) -> bool {
matches!(errno, Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL) matches!(
errno,
Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL
)
} }
fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> { fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> {

View File

@ -8,7 +8,7 @@ use std::path::Path;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
use pxar::{mode, Entry, EntryKind, Metadata, format::StatxTimestamp}; use pxar::{format::StatxTimestamp, mode, Entry, EntryKind, Metadata};
/// Get the file permissions as `nix::Mode` /// Get the file permissions as `nix::Mode`
pub fn perms_from_metadata(meta: &Metadata) -> Result<Mode, Error> { pub fn perms_from_metadata(meta: &Metadata) -> Result<Mode, Error> {

View File

@ -6,8 +6,8 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use futures::future::{AbortHandle, Abortable};
use futures::stream::Stream; use futures::stream::Stream;
use futures::future::{Abortable, AbortHandle};
use nix::dir::Dir; use nix::dir::Dir;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
@ -68,7 +68,9 @@ impl PxarBackupStream {
}, },
Some(catalog), Some(catalog),
options, options,
).await { )
.await
{
let mut error = error2.lock().unwrap(); let mut error = error2.lock().unwrap();
*error = Some(err.to_string()); *error = Some(err.to_string());
} }
@ -92,11 +94,7 @@ impl PxarBackupStream {
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
Self::new( Self::new(dir, catalog, options)
dir,
catalog,
options,
)
} }
} }

View File

@ -1,5 +1,5 @@
use std::future::Future;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -7,11 +7,11 @@ use anyhow::{bail, Error};
use proxmox_async::runtime::block_on; use proxmox_async::runtime::block_on;
use pbs_tools::crypt_config::CryptConfig;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
use pbs_datastore::data_blob::DataBlob; use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::read_chunk::ReadChunk;
use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::read_chunk::ReadChunk;
use pbs_tools::crypt_config::CryptConfig;
use super::BackupReader; use super::BackupReader;
@ -49,24 +49,20 @@ impl RemoteChunkReader {
pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
self.client self.client.download_chunk(digest, &mut chunk_data).await?;
.download_chunk(digest, &mut chunk_data)
.await?;
let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?; let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
match self.crypt_mode { match self.crypt_mode {
CryptMode::Encrypt => { CryptMode::Encrypt => match chunk.crypt_mode()? {
match chunk.crypt_mode()? { CryptMode::Encrypt => Ok(chunk),
CryptMode::Encrypt => Ok(chunk), CryptMode::SignOnly | CryptMode::None => {
CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."), bail!("Index and chunk CryptMode don't match.")
} }
}, },
CryptMode::SignOnly | CryptMode::None => { CryptMode::SignOnly | CryptMode::None => match chunk.crypt_mode()? {
match chunk.crypt_mode()? { CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), CryptMode::SignOnly | CryptMode::None => Ok(chunk),
CryptMode::SignOnly | CryptMode::None => Ok(chunk),
}
}, },
} }
} }
@ -114,7 +110,8 @@ impl AsyncReadChunk for RemoteChunkReader {
let chunk = Self::read_raw_chunk(self, digest).await?; let chunk = Self::read_raw_chunk(self, digest).await?;
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; let raw_data =
chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
let use_cache = self.cache_hint.contains_key(digest); let use_cache = self.cache_hint.contains_key(digest);
if use_cache { if use_cache {

View File

@ -1,9 +1,12 @@
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use futures::*;
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use futures::*;
use proxmox_router::cli::format_and_print_result; use proxmox_router::cli::format_and_print_result;
@ -22,7 +25,6 @@ pub async fn display_task_log(
upid_str: &str, upid_str: &str,
strip_date: bool, strip_date: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut signal_stream = signal(SignalKind::interrupt())?; let mut signal_stream = signal(SignalKind::interrupt())?;
let abort_count = Arc::new(AtomicUsize::new(0)); let abort_count = Arc::new(AtomicUsize::new(0));
let abort_count2 = Arc::clone(&abort_count); let abort_count2 = Arc::clone(&abort_count);
@ -40,21 +42,25 @@ pub async fn display_task_log(
}; };
let request_future = async move { let request_future = async move {
let mut start = 1; let mut start = 1;
let limit = 500; let limit = 500;
loop { loop {
let abort = abort_count.load(Ordering::Relaxed); let abort = abort_count.load(Ordering::Relaxed);
if abort > 0 { if abort > 0 {
let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str)); let path = format!(
"api2/json/nodes/localhost/tasks/{}",
percent_encode_component(upid_str)
);
let _ = client.delete(&path, None).await?; let _ = client.delete(&path, None).await?;
} }
let param = json!({ "start": start, "limit": limit, "test-status": true }); let param = json!({ "start": start, "limit": limit, "test-status": true });
let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str)); let path = format!(
"api2/json/nodes/localhost/tasks/{}/log",
percent_encode_component(upid_str)
);
let result = client.get(&path, Some(param)).await?; let result = client.get(&path, Some(param)).await?;
let active = result["active"].as_bool().unwrap(); let active = result["active"].as_bool().unwrap();
@ -66,7 +72,9 @@ pub async fn display_task_log(
for item in data { for item in data {
let n = item["n"].as_u64().unwrap(); let n = item["n"].as_u64().unwrap();
let t = item["t"].as_str().unwrap(); let t = item["t"].as_str().unwrap();
if n != start { bail!("got wrong line number in response data ({} != {}", n, start); } if n != start {
bail!("got wrong line number in response data ({} != {}", n, start);
}
if strip_date && t.len() > 27 && &t[25..27] == ": " { if strip_date && t.len() > 27 && &t[25..27] == ": " {
let line = &t[27..]; let line = &t[27..];
println!("{}", line); println!("{}", line);
@ -83,14 +91,18 @@ pub async fn display_task_log(
break; break;
} }
} else if lines != limit { } else if lines != limit {
bail!("got wrong number of lines from server ({} != {})", lines, limit); bail!(
"got wrong number of lines from server ({} != {})",
lines,
limit
);
} }
} }
Ok(()) Ok(())
}; };
futures::select!{ futures::select! {
request = request_future.fuse() => request?, request = request_future.fuse() => request?,
abort = abort_future.fuse() => abort?, abort = abort_future.fuse() => abort?,
}; };

View File

@ -1,14 +1,14 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::path::PathBuf;
use std::os::unix::io::{FromRawFd, RawFd};
use std::io::Read; use std::io::Read;
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::Value; use serde_json::Value;
use proxmox_sys::linux::tty;
use proxmox_sys::fs::file_get_contents;
use proxmox_schema::*; use proxmox_schema::*;
use proxmox_sys::fs::file_get_contents;
use proxmox_sys::linux::tty;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
@ -102,11 +102,12 @@ fn do_crypto_parameters(param: &Value, keep_keyfd_open: bool) -> Result<CryptoPa
let key_fd = match param.get("keyfd") { let key_fd = match param.get("keyfd") {
Some(Value::Number(key_fd)) => Some( Some(Value::Number(key_fd)) => Some(
RawFd::try_from(key_fd RawFd::try_from(
.as_i64() key_fd
.ok_or_else(|| format_err!("bad key fd: {:?}", key_fd))? .as_i64()
.ok_or_else(|| format_err!("bad key fd: {:?}", key_fd))?,
) )
.map_err(|err| format_err!("bad key fd: {:?}: {}", key_fd, err))? .map_err(|err| format_err!("bad key fd: {:?}: {}", key_fd, err))?,
), ),
Some(_) => bail!("bad --keyfd parameter type"), Some(_) => bail!("bad --keyfd parameter type"),
None => None, None => None,
@ -120,11 +121,12 @@ fn do_crypto_parameters(param: &Value, keep_keyfd_open: bool) -> Result<CryptoPa
let master_pubkey_fd = match param.get("master-pubkey-fd") { let master_pubkey_fd = match param.get("master-pubkey-fd") {
Some(Value::Number(key_fd)) => Some( Some(Value::Number(key_fd)) => Some(
RawFd::try_from(key_fd RawFd::try_from(
.as_i64() key_fd
.ok_or_else(|| format_err!("bad master public key fd: {:?}", key_fd))? .as_i64()
.ok_or_else(|| format_err!("bad master public key fd: {:?}", key_fd))?,
) )
.map_err(|err| format_err!("bad public master key fd: {:?}: {}", key_fd, err))? .map_err(|err| format_err!("bad public master key fd: {:?}: {}", key_fd, err))?,
), ),
Some(_) => bail!("bad --master-pubkey-fd parameter type"), Some(_) => bail!("bad --master-pubkey-fd parameter type"),
None => None, None => None,
@ -151,7 +153,9 @@ fn do_crypto_parameters(param: &Value, keep_keyfd_open: bool) -> Result<CryptoPa
if keep_keyfd_open { if keep_keyfd_open {
// don't close fd if requested, and try to reset seek position // don't close fd if requested, and try to reset seek position
std::mem::forget(input); std::mem::forget(input);
unsafe { libc::lseek(fd, 0, libc::SEEK_SET); } unsafe {
libc::lseek(fd, 0, libc::SEEK_SET);
}
} }
Some(KeyWithSource::from_fd(data)) Some(KeyWithSource::from_fd(data))
} }
@ -373,14 +377,14 @@ fn create_testdir(name: &str) -> Result<String, Error> {
// WARNING: there must only be one test for crypto_parameters as the default key handling is not // WARNING: there must only be one test for crypto_parameters as the default key handling is not
// safe w.r.t. concurrency // safe w.r.t. concurrency
fn test_crypto_parameters_handling() -> Result<(), Error> { fn test_crypto_parameters_handling() -> Result<(), Error> {
use serde_json::json;
use proxmox_sys::fs::{replace_file, CreateOptions}; use proxmox_sys::fs::{replace_file, CreateOptions};
use serde_json::json;
let some_key = vec![1;1]; let some_key = vec![1; 1];
let default_key = vec![2;1]; let default_key = vec![2; 1];
let some_master_key = vec![3;1]; let some_master_key = vec![3; 1];
let default_master_key = vec![4;1]; let default_master_key = vec![4; 1];
let testdir = create_testdir("key_source")?; let testdir = create_testdir("key_source")?;
@ -441,14 +445,19 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
}; };
replace_file(&keypath, &some_key, CreateOptions::default(), false)?; replace_file(&keypath, &some_key, CreateOptions::default(), false)?;
replace_file(&master_keypath, &some_master_key, CreateOptions::default(), false)?; replace_file(
&master_keypath,
&some_master_key,
CreateOptions::default(),
false,
)?;
// no params, no default key == no key // no params, no default key == no key
let res = crypto_parameters(&json!({})); let res = crypto_parameters(&json!({}));
assert_eq!(res.unwrap(), no_key_res); assert_eq!(res.unwrap(), no_key_res);
// keyfile param == key from keyfile // keyfile param == key from keyfile
let res = crypto_parameters(&json!({"keyfile": keypath})); let res = crypto_parameters(&json!({ "keyfile": keypath }));
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// crypt mode none == no key // crypt mode none == no key
@ -469,13 +478,19 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// invalid keyfile parameter always errors // invalid keyfile parameter always errors
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath})).is_err()); assert!(crypto_parameters(&json!({ "keyfile": invalid_keypath })).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err()); assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()); assert!(
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()); crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()
);
assert!(
crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()
);
// now set a default key // now set a default key
unsafe { set_test_encryption_key(Ok(Some(default_key))); } unsafe {
set_test_encryption_key(Ok(Some(default_key)));
}
// and repeat // and repeat
@ -484,7 +499,7 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
assert_eq!(res.unwrap(), default_key_res); assert_eq!(res.unwrap(), default_key_res);
// keyfile param == key from keyfile // keyfile param == key from keyfile
let res = crypto_parameters(&json!({"keyfile": keypath})); let res = crypto_parameters(&json!({ "keyfile": keypath }));
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// crypt mode none == no key // crypt mode none == no key
@ -507,13 +522,19 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// invalid keyfile parameter always errors // invalid keyfile parameter always errors
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath})).is_err()); assert!(crypto_parameters(&json!({ "keyfile": invalid_keypath })).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err()); assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()); assert!(
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()); crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()
);
assert!(
crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()
);
// now make default key retrieval error // now make default key retrieval error
unsafe { set_test_encryption_key(Err(format_err!("test error"))); } unsafe {
set_test_encryption_key(Err(format_err!("test error")));
}
// and repeat // and repeat
@ -521,7 +542,7 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
assert!(crypto_parameters(&json!({})).is_err()); assert!(crypto_parameters(&json!({})).is_err());
// keyfile param == key from keyfile // keyfile param == key from keyfile
let res = crypto_parameters(&json!({"keyfile": keypath})); let res = crypto_parameters(&json!({ "keyfile": keypath }));
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// crypt mode none == no key // crypt mode none == no key
@ -542,18 +563,26 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
assert_eq!(res.unwrap(), some_key_res); assert_eq!(res.unwrap(), some_key_res);
// invalid keyfile parameter always errors // invalid keyfile parameter always errors
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath})).is_err()); assert!(crypto_parameters(&json!({ "keyfile": invalid_keypath })).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err()); assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "none"})).is_err());
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()); assert!(
assert!(crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()); crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "sign-only"})).is_err()
);
assert!(
crypto_parameters(&json!({"keyfile": invalid_keypath, "crypt-mode": "encrypt"})).is_err()
);
// now remove default key again // now remove default key again
unsafe { set_test_encryption_key(Ok(None)); } unsafe {
set_test_encryption_key(Ok(None));
}
// set a default master key // set a default master key
unsafe { set_test_default_master_pubkey(Ok(Some(default_master_key))); } unsafe {
set_test_default_master_pubkey(Ok(Some(default_master_key)));
}
// and use an explicit master key // and use an explicit master key
assert!(crypto_parameters(&json!({"master-pubkey-file": master_keypath})).is_err()); assert!(crypto_parameters(&json!({ "master-pubkey-file": master_keypath })).is_err());
// just a default == no key // just a default == no key
let res = crypto_parameters(&json!({})); let res = crypto_parameters(&json!({}));
assert_eq!(res.unwrap(), no_key_res); assert_eq!(res.unwrap(), no_key_res);
@ -562,35 +591,55 @@ fn test_crypto_parameters_handling() -> Result<(), Error> {
let res = crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": master_keypath})); let res = crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": master_keypath}));
assert_eq!(res.unwrap(), some_key_some_master_res); assert_eq!(res.unwrap(), some_key_some_master_res);
// same with fallback to default master key // same with fallback to default master key
let res = crypto_parameters(&json!({"keyfile": keypath})); let res = crypto_parameters(&json!({ "keyfile": keypath }));
assert_eq!(res.unwrap(), some_key_default_master_res); assert_eq!(res.unwrap(), some_key_default_master_res);
// crypt mode none == error // crypt mode none == error
assert!(crypto_parameters(&json!({"crypt-mode": "none", "master-pubkey-file": master_keypath})).is_err()); assert!(crypto_parameters(
&json!({"crypt-mode": "none", "master-pubkey-file": master_keypath})
)
.is_err());
// with just default master key == no key // with just default master key == no key
let res = crypto_parameters(&json!({"crypt-mode": "none"})); let res = crypto_parameters(&json!({"crypt-mode": "none"}));
assert_eq!(res.unwrap(), no_key_res); assert_eq!(res.unwrap(), no_key_res);
// crypt mode encrypt without enc key == error // crypt mode encrypt without enc key == error
assert!(crypto_parameters(&json!({"crypt-mode": "encrypt", "master-pubkey-file": master_keypath})).is_err()); assert!(crypto_parameters(
&json!({"crypt-mode": "encrypt", "master-pubkey-file": master_keypath})
)
.is_err());
assert!(crypto_parameters(&json!({"crypt-mode": "encrypt"})).is_err()); assert!(crypto_parameters(&json!({"crypt-mode": "encrypt"})).is_err());
// crypt mode none with explicit key == Error // crypt mode none with explicit key == Error
assert!(crypto_parameters(&json!({"crypt-mode": "none", "keyfile": keypath, "master-pubkey-file": master_keypath})).is_err()); assert!(crypto_parameters(
&json!({"crypt-mode": "none", "keyfile": keypath, "master-pubkey-file": master_keypath})
)
.is_err());
assert!(crypto_parameters(&json!({"crypt-mode": "none", "keyfile": keypath})).is_err()); assert!(crypto_parameters(&json!({"crypt-mode": "none", "keyfile": keypath})).is_err());
// crypt mode encrypt with keyfile == key from keyfile with correct mode // crypt mode encrypt with keyfile == key from keyfile with correct mode
let res = crypto_parameters(&json!({"crypt-mode": "encrypt", "keyfile": keypath, "master-pubkey-file": master_keypath})); let res = crypto_parameters(
&json!({"crypt-mode": "encrypt", "keyfile": keypath, "master-pubkey-file": master_keypath}),
);
assert_eq!(res.unwrap(), some_key_some_master_res); assert_eq!(res.unwrap(), some_key_some_master_res);
let res = crypto_parameters(&json!({"crypt-mode": "encrypt", "keyfile": keypath})); let res = crypto_parameters(&json!({"crypt-mode": "encrypt", "keyfile": keypath}));
assert_eq!(res.unwrap(), some_key_default_master_res); assert_eq!(res.unwrap(), some_key_default_master_res);
// invalid master keyfile parameter always errors when a key is passed, even with a valid // invalid master keyfile parameter always errors when a key is passed, even with a valid
// default master key // default master key
assert!(crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath})).is_err()); assert!(
assert!(crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "none"})).is_err()); crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath}))
.is_err()
);
assert!(crypto_parameters(
&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "none"})
)
.is_err());
assert!(crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "sign-only"})).is_err()); assert!(crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "sign-only"})).is_err());
assert!(crypto_parameters(&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "encrypt"})).is_err()); assert!(crypto_parameters(
&json!({"keyfile": keypath, "master-pubkey-file": invalid_keypath,"crypt-mode": "encrypt"})
)
.is_err());
Ok(()) Ok(())
} }

View File

@ -1,20 +1,20 @@
//! Shared tools useful for common CLI clients. //! Shared tools useful for common CLI clients.
use std::collections::HashMap; use std::collections::HashMap;
use std::env::VarError::{NotPresent, NotUnicode};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::env::VarError::{NotUnicode, NotPresent};
use std::io::{BufReader, BufRead};
use std::process::Command; use std::process::Command;
use anyhow::{bail, format_err, Context, Error}; use anyhow::{bail, format_err, Context, Error};
use serde_json::{json, Value}; use serde_json::{json, Value};
use xdg::BaseDirectories; use xdg::BaseDirectories;
use proxmox_schema::*;
use proxmox_router::cli::{complete_file_name, shellword_split}; use proxmox_router::cli::{complete_file_name, shellword_split};
use proxmox_schema::*;
use proxmox_sys::fs::file_get_json; use proxmox_sys::fs::file_get_json;
use pbs_api_types::{BACKUP_REPO_URL, Authid, RateLimitConfig, UserWithTokens}; use pbs_api_types::{Authid, RateLimitConfig, UserWithTokens, BACKUP_REPO_URL};
use pbs_datastore::BackupDir; use pbs_datastore::BackupDir;
use pbs_tools::json::json_object_to_query; use pbs_tools::json::json_object_to_query;
@ -48,7 +48,6 @@ pub const CHUNK_SIZE_SCHEMA: Schema = IntegerSchema::new("Chunk size in KB. Must
/// ///
/// Only return the first line of data (without CRLF). /// Only return the first line of data (without CRLF).
pub fn get_secret_from_env(base_name: &str) -> Result<Option<String>, Error> { pub fn get_secret_from_env(base_name: &str) -> Result<Option<String>, Error> {
let firstline = |data: String| -> String { let firstline = |data: String| -> String {
match data.lines().next() { match data.lines().next() {
Some(line) => line.to_string(), Some(line) => line.to_string(),
@ -68,19 +67,24 @@ pub fn get_secret_from_env(base_name: &str) -> Result<Option<String>, Error> {
match std::env::var(base_name) { match std::env::var(base_name) {
Ok(p) => return Ok(Some(firstline(p))), Ok(p) => return Ok(Some(firstline(p))),
Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", base_name)), Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", base_name)),
Err(NotPresent) => {}, Err(NotPresent) => {}
}; };
let env_name = format!("{}_FD", base_name); let env_name = format!("{}_FD", base_name);
match std::env::var(&env_name) { match std::env::var(&env_name) {
Ok(fd_str) => { Ok(fd_str) => {
let fd: i32 = fd_str.parse() let fd: i32 = fd_str.parse().map_err(|err| {
.map_err(|err| format_err!("unable to parse file descriptor in ENV({}): {}", env_name, err))?; format_err!(
"unable to parse file descriptor in ENV({}): {}",
env_name,
err
)
})?;
let mut file = unsafe { File::from_raw_fd(fd) }; let mut file = unsafe { File::from_raw_fd(fd) };
return Ok(Some(firstline_file(&mut file)?)); return Ok(Some(firstline_file(&mut file)?));
} }
Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)), Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)),
Err(NotPresent) => {}, Err(NotPresent) => {}
} }
let env_name = format!("{}_FILE", base_name); let env_name = format!("{}_FILE", base_name);
@ -91,7 +95,7 @@ pub fn get_secret_from_env(base_name: &str) -> Result<Option<String>, Error> {
return Ok(Some(firstline_file(&mut file)?)); return Ok(Some(firstline_file(&mut file)?));
} }
Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)), Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)),
Err(NotPresent) => {}, Err(NotPresent) => {}
} }
let env_name = format!("{}_CMD", base_name); let env_name = format!("{}_CMD", base_name);
@ -104,7 +108,7 @@ pub fn get_secret_from_env(base_name: &str) -> Result<Option<String>, Error> {
return Ok(Some(firstline(output))); return Ok(Some(firstline(output)));
} }
Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)), Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", env_name)),
Err(NotPresent) => {}, Err(NotPresent) => {}
} }
Ok(None) Ok(None)
@ -157,21 +161,18 @@ fn connect_do(
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?; let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
let options = HttpClientOptions::new_interactive(password, fingerprint) let options = HttpClientOptions::new_interactive(password, fingerprint).rate_limit(rate_limit);
.rate_limit(rate_limit);
HttpClient::new(server, port, auth_id, options) HttpClient::new(server, port, auth_id, options)
} }
/// like get, but simply ignore errors and return Null instead /// like get, but simply ignore errors and return Null instead
pub async fn try_get(repo: &BackupRepository, url: &str) -> Value { pub async fn try_get(repo: &BackupRepository, url: &str) -> Value {
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD).unwrap_or(None); let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD).unwrap_or(None);
// ticket cache, but no questions asked // ticket cache, but no questions asked
let options = HttpClientOptions::new_interactive(password, fingerprint) let options = HttpClientOptions::new_interactive(password, fingerprint).interactive(false);
.interactive(false);
let client = match HttpClient::new(repo.host(), repo.port(), repo.auth_id(), options) { let client = match HttpClient::new(repo.host(), repo.port(), repo.auth_id(), options) {
Ok(v) => v, Ok(v) => v,
@ -196,7 +197,6 @@ pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec
} }
pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![]; let mut result = vec![];
let repo = match extract_repository_from_map(param) { let repo = match extract_repository_from_map(param) {
@ -225,8 +225,10 @@ pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) ->
proxmox_async::runtime::main(async { complete_group_or_snapshot_do(arg, param).await }) proxmox_async::runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
} }
pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_group_or_snapshot_do(
arg: &str,
param: &HashMap<String, String>,
) -> Vec<String> {
if arg.matches('/').count() < 2 { if arg.matches('/').count() < 2 {
let groups = complete_backup_group_do(param).await; let groups = complete_backup_group_do(param).await;
let mut result = vec![]; let mut result = vec![];
@ -245,7 +247,6 @@ pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) ->
} }
pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![]; let mut result = vec![];
let repo = match extract_repository_from_map(param) { let repo = match extract_repository_from_map(param) {
@ -259,9 +260,11 @@ pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec
if let Some(list) = data.as_array() { if let Some(list) = data.as_array() {
for item in list { for item in list {
if let (Some(backup_id), Some(backup_type), Some(backup_time)) = if let (Some(backup_id), Some(backup_type), Some(backup_time)) = (
(item["backup-id"].as_str(), item["backup-type"].as_str(), item["backup-time"].as_i64()) item["backup-id"].as_str(),
{ item["backup-type"].as_str(),
item["backup-time"].as_i64(),
) {
if let Ok(snapshot) = BackupDir::new(backup_type, backup_id, backup_time) { if let Ok(snapshot) = BackupDir::new(backup_type, backup_id, backup_time) {
result.push(snapshot.relative_path().to_str().unwrap().to_owned()); result.push(snapshot.relative_path().to_str().unwrap().to_owned());
} }
@ -277,7 +280,6 @@ pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) ->
} }
pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![]; let mut result = vec![];
let repo = match extract_repository_from_map(param) { let repo = match extract_repository_from_map(param) {
@ -286,12 +288,10 @@ pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Ve
}; };
let snapshot: BackupDir = match param.get("snapshot") { let snapshot: BackupDir = match param.get("snapshot") {
Some(path) => { Some(path) => match path.parse() {
match path.parse() { Ok(v) => v,
Ok(v) => v, _ => return result,
_ => return result, },
}
}
_ => return result, _ => return result,
}; };
@ -299,7 +299,8 @@ pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Ve
"backup-type": snapshot.group().backup_type(), "backup-type": snapshot.group().backup_type(),
"backup-id": snapshot.group().backup_id(), "backup-id": snapshot.group().backup_id(),
"backup-time": snapshot.backup_time(), "backup-time": snapshot.backup_time(),
})).unwrap(); }))
.unwrap();
let path = format!("api2/json/admin/datastore/{}/files?{}", repo.store(), query); let path = format!("api2/json/admin/datastore/{}/files?{}", repo.store(), query);
@ -350,14 +351,15 @@ pub fn complete_img_archive_name(arg: &str, param: &HashMap<String, String>) ->
} }
pub fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> { pub fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![]; let mut result = vec![];
let mut size = 64; let mut size = 64;
loop { loop {
result.push(size.to_string()); result.push(size.to_string());
size *= 2; size *= 2;
if size > 4096 { break; } if size > 4096 {
break;
}
} }
result result
@ -368,7 +370,6 @@ pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<Stri
} }
pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![]; let mut result = vec![];
let repo = match extract_repository_from_map(param) { let repo = match extract_repository_from_map(param) {