Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
c002d48b0c | |||
15998ed12a | |||
9d8ab62769 | |||
3526a76ef3 | |||
b9e0fcbdcd | |||
a7188b3a75 | |||
b6c06dce9d | |||
4adf47b606 | |||
4d0dc29951 | |||
1011fb552b | |||
2fd2d29281 | |||
9104152a83 | |||
02a58862dd | |||
26153589ba | |||
17b3e4451f | |||
a2072cc346 | |||
fea23d0323 | |||
71e83e1b1f | |||
28570d19a6 | |||
1369bcdbba | |||
5e4d81e957 | |||
0f4721f305 |
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "proxmox-backup"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
authors = [
|
||||
"Dietmar Maurer <dietmar@proxmox.com>",
|
||||
"Dominik Csapak <d.csapak@proxmox.com>",
|
||||
|
17
debian/changelog
vendored
17
debian/changelog
vendored
@ -1,3 +1,20 @@
|
||||
rust-proxmox-backup (1.1.3-1) unstable; urgency=medium
|
||||
|
||||
* tape restore: improve datastore locking when GC runs at the same time
|
||||
|
||||
* tape restore: always do quick chunk verification
|
||||
|
||||
* tape: improve compatibillity with some changers
|
||||
|
||||
* tape: work-around missing format command on LTO-4 drives, fall-back to
|
||||
slower rewind erease
|
||||
|
||||
* fix #3393: pxar: allow and safe the 'security.NTACL' extended attribute
|
||||
|
||||
* file-restore: support encrypted VM backups
|
||||
|
||||
-- Proxmox Support Team <support@proxmox.com> Thu, 22 Apr 2021 20:14:58 +0200
|
||||
|
||||
rust-proxmox-backup (1.1.2-1) unstable; urgency=medium
|
||||
|
||||
* backup verify: always re-check if we can skip a chunk in the actual verify
|
||||
|
@ -48,7 +48,7 @@ pub fn list_dir_content<R: Read + Seek>(
|
||||
let mut components = path.clone();
|
||||
components.push(b'/');
|
||||
components.extend(&direntry.name);
|
||||
let mut entry = ArchiveEntry::new(&components, &direntry.attr);
|
||||
let mut entry = ArchiveEntry::new(&components, Some(&direntry.attr));
|
||||
if let DirEntryAttribute::File { size, mtime } = direntry.attr {
|
||||
entry.size = size.into();
|
||||
entry.mtime = mtime.into();
|
||||
|
@ -7,7 +7,7 @@ use proxmox::api::{api, RpcEnvironment, RpcEnvironmentType, Permission};
|
||||
use proxmox::api::router::{Router, SubdirMap};
|
||||
|
||||
use crate::server::WorkerTask;
|
||||
use crate::tools::{apt, http, subscription};
|
||||
use crate::tools::{apt, http::SimpleHttp, subscription};
|
||||
|
||||
use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
|
||||
use crate::api2::types::{Authid, APTUpdateInfo, NODE_SCHEMA, UPID_SCHEMA};
|
||||
@ -194,10 +194,12 @@ fn apt_get_changelog(
|
||||
bail!("Package '{}' not found", name);
|
||||
}
|
||||
|
||||
let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
|
||||
|
||||
let changelog_url = &pkg_info[0].change_log_url;
|
||||
// FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
|
||||
if changelog_url.starts_with("http://download.proxmox.com/") {
|
||||
let changelog = crate::tools::runtime::block_on(http::get_string(changelog_url, None))
|
||||
let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, None))
|
||||
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
|
||||
Ok(json!(changelog))
|
||||
|
||||
@ -221,7 +223,7 @@ fn apt_get_changelog(
|
||||
auth_header.insert("Authorization".to_owned(),
|
||||
format!("Basic {}", base64::encode(format!("{}:{}", key, id))));
|
||||
|
||||
let changelog = crate::tools::runtime::block_on(http::get_string(changelog_url, Some(&auth_header)))
|
||||
let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
|
||||
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
|
||||
Ok(json!(changelog))
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::panic::UnwindSafe;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use serde_json::Value;
|
||||
@ -1334,7 +1335,8 @@ pub fn catalog_media(
|
||||
drive.rewind()?;
|
||||
drive.read_label()?; // skip over labels - we already read them above
|
||||
|
||||
restore_media(&worker, &mut drive, &media_id, None, verbose)?;
|
||||
let mut checked_chunks = HashMap::new();
|
||||
restore_media(&worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
|
@ -305,11 +305,17 @@ pub fn restore(
|
||||
task_log!(worker, "Encryption key fingerprint: {}", fingerprint);
|
||||
}
|
||||
task_log!(worker, "Pool: {}", pool);
|
||||
task_log!(worker, "Datastore(s):");
|
||||
task_log!(
|
||||
worker,
|
||||
"Datastore(s): {}",
|
||||
store_map
|
||||
.used_datastores()
|
||||
.iter()
|
||||
.for_each(|store| task_log!(worker, "\t{}", store));
|
||||
.into_iter()
|
||||
.map(String::from)
|
||||
.collect::<Vec<String>>()
|
||||
.join(", "),
|
||||
);
|
||||
|
||||
task_log!(worker, "Drive: {}", drive);
|
||||
task_log!(
|
||||
worker,
|
||||
@ -320,6 +326,17 @@ pub fn restore(
|
||||
.join(";")
|
||||
);
|
||||
|
||||
let mut datastore_locks = Vec::new();
|
||||
for store_name in store_map.used_datastores() {
|
||||
// explicit create shared lock to prevent GC on newly created chunks
|
||||
if let Some(store) = store_map.get_datastore(store_name) {
|
||||
let shared_store_lock = store.try_shared_chunk_store_lock()?;
|
||||
datastore_locks.push(shared_store_lock);
|
||||
}
|
||||
}
|
||||
|
||||
let mut checked_chunks_map = HashMap::new();
|
||||
|
||||
for media_id in media_id_list.iter() {
|
||||
request_and_restore_media(
|
||||
&worker,
|
||||
@ -327,12 +344,15 @@ pub fn restore(
|
||||
&drive_config,
|
||||
&drive,
|
||||
&store_map,
|
||||
&mut checked_chunks_map,
|
||||
&auth_id,
|
||||
¬ify_user,
|
||||
&owner,
|
||||
)?;
|
||||
}
|
||||
|
||||
drop(datastore_locks);
|
||||
|
||||
task_log!(worker, "Restore mediaset '{}' done", media_set);
|
||||
|
||||
if let Err(err) = set_tape_device_state(&drive, "") {
|
||||
@ -358,6 +378,7 @@ pub fn request_and_restore_media(
|
||||
drive_config: &SectionConfigData,
|
||||
drive_name: &str,
|
||||
store_map: &DataStoreMap,
|
||||
checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
|
||||
authid: &Authid,
|
||||
notify_user: &Option<Userid>,
|
||||
owner: &Option<Authid>,
|
||||
@ -399,6 +420,7 @@ pub fn request_and_restore_media(
|
||||
&mut drive,
|
||||
&info,
|
||||
Some((&store_map, restore_owner)),
|
||||
checked_chunks_map,
|
||||
false,
|
||||
)
|
||||
}
|
||||
@ -411,6 +433,7 @@ pub fn restore_media(
|
||||
drive: &mut Box<dyn TapeDriver>,
|
||||
media_id: &MediaId,
|
||||
target: Option<(&DataStoreMap, &Authid)>,
|
||||
checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
|
||||
verbose: bool,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
@ -434,7 +457,7 @@ pub fn restore_media(
|
||||
Ok(reader) => reader,
|
||||
};
|
||||
|
||||
restore_archive(worker, reader, current_file_number, target, &mut catalog, verbose)?;
|
||||
restore_archive(worker, reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
|
||||
}
|
||||
|
||||
MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
|
||||
@ -448,6 +471,7 @@ fn restore_archive<'a>(
|
||||
current_file_number: u64,
|
||||
target: Option<(&DataStoreMap, &Authid)>,
|
||||
catalog: &mut MediaCatalog,
|
||||
checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
|
||||
verbose: bool,
|
||||
) -> Result<(), Error> {
|
||||
let header: MediaContentHeader = unsafe { reader.read_le_value()? };
|
||||
@ -473,6 +497,8 @@ fn restore_archive<'a>(
|
||||
let datastore_name = archive_header.store;
|
||||
let snapshot = archive_header.snapshot;
|
||||
|
||||
let checked_chunks = checked_chunks_map.entry(datastore_name.clone()).or_insert(HashSet::new());
|
||||
|
||||
task_log!(worker, "File {}: snapshot archive {}:{}", current_file_number, datastore_name, snapshot);
|
||||
|
||||
let backup_dir: BackupDir = snapshot.parse()?;
|
||||
@ -499,7 +525,7 @@ fn restore_archive<'a>(
|
||||
if is_new {
|
||||
task_log!(worker, "restore snapshot {}", backup_dir);
|
||||
|
||||
match restore_snapshot_archive(worker, reader, &path) {
|
||||
match restore_snapshot_archive(worker, reader, &path, &datastore, checked_chunks) {
|
||||
Err(err) => {
|
||||
std::fs::remove_dir_all(&path)?;
|
||||
bail!("restore snapshot {} failed - {}", backup_dir, err);
|
||||
@ -548,7 +574,11 @@ fn restore_archive<'a>(
|
||||
.and_then(|t| t.0.get_datastore(&source_datastore));
|
||||
|
||||
if datastore.is_some() || target.is_none() {
|
||||
if let Some(chunks) = restore_chunk_archive(worker, reader, datastore, verbose)? {
|
||||
let checked_chunks = checked_chunks_map
|
||||
.entry(datastore.map(|d| d.name()).unwrap_or("_unused_").to_string())
|
||||
.or_insert(HashSet::new());
|
||||
|
||||
if let Some(chunks) = restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)? {
|
||||
catalog.start_chunk_archive(
|
||||
Uuid::from(header.uuid),
|
||||
current_file_number,
|
||||
@ -590,6 +620,7 @@ fn restore_chunk_archive<'a>(
|
||||
worker: &WorkerTask,
|
||||
reader: Box<dyn 'a + TapeRead>,
|
||||
datastore: Option<&DataStore>,
|
||||
checked_chunks: &mut HashSet<[u8;32]>,
|
||||
verbose: bool,
|
||||
) -> Result<Option<Vec<[u8;32]>>, Error> {
|
||||
|
||||
@ -637,6 +668,7 @@ fn restore_chunk_archive<'a>(
|
||||
} else if verbose {
|
||||
task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
||||
}
|
||||
checked_chunks.insert(digest.clone());
|
||||
} else if verbose {
|
||||
task_log!(worker, "Found chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
||||
}
|
||||
@ -650,10 +682,12 @@ fn restore_snapshot_archive<'a>(
|
||||
worker: &WorkerTask,
|
||||
reader: Box<dyn 'a + TapeRead>,
|
||||
snapshot_path: &Path,
|
||||
datastore: &DataStore,
|
||||
checked_chunks: &mut HashSet<[u8;32]>,
|
||||
) -> Result<bool, Error> {
|
||||
|
||||
let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
|
||||
match try_restore_snapshot_archive(worker, &mut decoder, snapshot_path) {
|
||||
match try_restore_snapshot_archive(worker, &mut decoder, snapshot_path, datastore, checked_chunks) {
|
||||
Ok(()) => Ok(true),
|
||||
Err(err) => {
|
||||
let reader = decoder.input();
|
||||
@ -678,6 +712,8 @@ fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
|
||||
worker: &WorkerTask,
|
||||
decoder: &mut pxar::decoder::sync::Decoder<R>,
|
||||
snapshot_path: &Path,
|
||||
datastore: &DataStore,
|
||||
checked_chunks: &mut HashSet<[u8;32]>,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let _root = match decoder.next() {
|
||||
@ -768,11 +804,13 @@ fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
|
||||
let index = DynamicIndexReader::open(&archive_path)?;
|
||||
let (csum, size) = index.compute_csum();
|
||||
manifest.verify_file(&item.filename, &csum, size)?;
|
||||
datastore.fast_index_verification(&index, checked_chunks)?;
|
||||
}
|
||||
ArchiveType::FixedIndex => {
|
||||
let index = FixedIndexReader::open(&archive_path)?;
|
||||
let (csum, size) = index.compute_csum();
|
||||
manifest.verify_file(&item.filename, &csum, size)?;
|
||||
datastore.fast_index_verification(&index, checked_chunks)?;
|
||||
}
|
||||
ArchiveType::Blob => {
|
||||
let mut tmpfile = std::fs::File::open(&archive_path)?;
|
||||
|
@ -1354,19 +1354,22 @@ pub struct ArchiveEntry {
|
||||
}
|
||||
|
||||
impl ArchiveEntry {
|
||||
pub fn new(filepath: &[u8], entry_type: &DirEntryAttribute) -> Self {
|
||||
pub fn new(filepath: &[u8], entry_type: Option<&DirEntryAttribute>) -> Self {
|
||||
Self {
|
||||
filepath: base64::encode(filepath),
|
||||
text: String::from_utf8_lossy(filepath.split(|x| *x == b'/').last().unwrap())
|
||||
.to_string(),
|
||||
entry_type: CatalogEntryType::from(entry_type).to_string(),
|
||||
leaf: !matches!(entry_type, DirEntryAttribute::Directory { .. }),
|
||||
entry_type: match entry_type {
|
||||
Some(entry_type) => CatalogEntryType::from(entry_type).to_string(),
|
||||
None => "v".to_owned(),
|
||||
},
|
||||
leaf: !matches!(entry_type, None | Some(DirEntryAttribute::Directory { .. })),
|
||||
size: match entry_type {
|
||||
DirEntryAttribute::File { size, .. } => Some(*size),
|
||||
Some(DirEntryAttribute::File { size, .. }) => Some(*size),
|
||||
_ => None
|
||||
},
|
||||
mtime: match entry_type {
|
||||
DirEntryAttribute::File { mtime, .. } => Some(*mtime),
|
||||
Some(DirEntryAttribute::File { mtime, .. }) => Some(*mtime),
|
||||
_ => None
|
||||
},
|
||||
}
|
||||
|
@ -153,6 +153,34 @@ impl DataStore {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Fast index verification - only check if chunks exists
|
||||
pub fn fast_index_verification(
|
||||
&self,
|
||||
index: &dyn IndexFile,
|
||||
checked: &mut HashSet<[u8;32]>,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
for pos in 0..index.index_count() {
|
||||
let info = index.chunk_info(pos).unwrap();
|
||||
if checked.contains(&info.digest) {
|
||||
continue;
|
||||
}
|
||||
|
||||
self.stat_chunk(&info.digest).
|
||||
map_err(|err| {
|
||||
format_err!(
|
||||
"fast_index_verification error, stat_chunk {} failed - {}",
|
||||
proxmox::tools::digest_to_hex(&info.digest),
|
||||
err,
|
||||
)
|
||||
})?;
|
||||
|
||||
checked.insert(info.digest);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
self.chunk_store.name()
|
||||
}
|
||||
@ -786,4 +814,3 @@ impl DataStore {
|
||||
self.verify_new
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,7 @@ pub mod proxmox_client_tools;
|
||||
use proxmox_client_tools::{
|
||||
complete_group_or_snapshot, complete_repository, connect, extract_repository_from_value,
|
||||
key_source::{
|
||||
crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
|
||||
crypto_parameters_keep_fd, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
|
||||
KEYFILE_SCHEMA,
|
||||
},
|
||||
REPO_URL_SCHEMA,
|
||||
@ -56,7 +56,7 @@ fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
|
||||
return Ok(ExtractPath::ListArchives);
|
||||
}
|
||||
|
||||
while bytes.len() > 0 && bytes[0] == b'/' {
|
||||
while !bytes.is_empty() && bytes[0] == b'/' {
|
||||
bytes.remove(0);
|
||||
}
|
||||
|
||||
@ -76,6 +76,18 @@ fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
fn keyfile_path(param: &Value) -> Option<String> {
|
||||
if let Some(Value::String(keyfile)) = param.get("keyfile") {
|
||||
return Some(keyfile.to_owned());
|
||||
}
|
||||
|
||||
if let Some(Value::Number(keyfd)) = param.get("keyfd") {
|
||||
return Some(format!("/dev/fd/{}", keyfd));
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
@ -138,7 +150,8 @@ async fn list(
|
||||
let snapshot: BackupDir = snapshot.parse()?;
|
||||
let path = parse_path(path, base64)?;
|
||||
|
||||
let crypto = crypto_parameters(¶m)?;
|
||||
let keyfile = keyfile_path(¶m);
|
||||
let crypto = crypto_parameters_keep_fd(¶m)?;
|
||||
let crypt_config = match crypto.enc_key {
|
||||
None => None,
|
||||
Some(ref key) => {
|
||||
@ -170,14 +183,17 @@ async fn list(
|
||||
ExtractPath::ListArchives => {
|
||||
let mut entries = vec![];
|
||||
for file in manifest.files() {
|
||||
match file.filename.rsplitn(2, '.').next().unwrap() {
|
||||
"didx" => {}
|
||||
"fidx" => {}
|
||||
_ => continue, // ignore all non fidx/didx
|
||||
if !file.filename.ends_with(".pxar.didx") && !file.filename.ends_with(".img.fidx") {
|
||||
continue;
|
||||
}
|
||||
let path = format!("/{}", file.filename);
|
||||
let attr = DirEntryAttribute::Directory { start: 0 };
|
||||
entries.push(ArchiveEntry::new(path.as_bytes(), &attr));
|
||||
let attr = if file.filename.ends_with(".pxar.didx") {
|
||||
// a pxar file is a file archive, so it's root is also a directory root
|
||||
Some(&DirEntryAttribute::Directory { start: 0 })
|
||||
} else {
|
||||
None
|
||||
};
|
||||
entries.push(ArchiveEntry::new(path.as_bytes(), attr));
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
@ -207,6 +223,7 @@ async fn list(
|
||||
manifest,
|
||||
repo,
|
||||
snapshot,
|
||||
keyfile,
|
||||
};
|
||||
let driver: Option<BlockDriverType> = match param.get("driver") {
|
||||
Some(drv) => Some(serde_json::from_value(drv.clone())?),
|
||||
@ -306,7 +323,8 @@ async fn extract(
|
||||
None => Some(std::env::current_dir()?),
|
||||
};
|
||||
|
||||
let crypto = crypto_parameters(¶m)?;
|
||||
let keyfile = keyfile_path(¶m);
|
||||
let crypto = crypto_parameters_keep_fd(¶m)?;
|
||||
let crypt_config = match crypto.enc_key {
|
||||
None => None,
|
||||
Some(ref key) => {
|
||||
@ -357,6 +375,7 @@ async fn extract(
|
||||
manifest,
|
||||
repo,
|
||||
snapshot,
|
||||
keyfile,
|
||||
};
|
||||
let driver: Option<BlockDriverType> = match param.get("driver") {
|
||||
Some(drv) => Some(serde_json::from_value(drv.clone())?),
|
||||
@ -396,14 +415,16 @@ async fn extract_to_target<T>(
|
||||
where
|
||||
T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let path = if path.is_empty() { b"/" } else { path };
|
||||
|
||||
let root = decoder.open_root().await?;
|
||||
let file = root
|
||||
.lookup(OsStr::from_bytes(&path))
|
||||
.lookup(OsStr::from_bytes(path))
|
||||
.await?
|
||||
.ok_or_else(|| format_err!("error opening '{:?}'", path))?;
|
||||
|
||||
if let Some(target) = target {
|
||||
extract_sub_dir(target, decoder, OsStr::from_bytes(&path), verbose).await?;
|
||||
extract_sub_dir(target, decoder, OsStr::from_bytes(path), verbose).await?;
|
||||
} else {
|
||||
match file.kind() {
|
||||
pxar::EntryKind::File { .. } => {
|
||||
@ -413,7 +434,7 @@ where
|
||||
create_zip(
|
||||
tokio::io::stdout(),
|
||||
decoder,
|
||||
OsStr::from_bytes(&path),
|
||||
OsStr::from_bytes(path),
|
||||
verbose,
|
||||
)
|
||||
.await?;
|
||||
|
@ -86,6 +86,14 @@ pub struct CryptoParams {
|
||||
}
|
||||
|
||||
pub fn crypto_parameters(param: &Value) -> Result<CryptoParams, Error> {
|
||||
do_crypto_parameters(param, false)
|
||||
}
|
||||
|
||||
pub fn crypto_parameters_keep_fd(param: &Value) -> Result<CryptoParams, Error> {
|
||||
do_crypto_parameters(param, true)
|
||||
}
|
||||
|
||||
fn do_crypto_parameters(param: &Value, keep_keyfd_open: bool) -> Result<CryptoParams, Error> {
|
||||
let keyfile = match param.get("keyfile") {
|
||||
Some(Value::String(keyfile)) => Some(keyfile),
|
||||
Some(_) => bail!("bad --keyfile parameter type"),
|
||||
@ -135,11 +143,16 @@ pub fn crypto_parameters(param: &Value) -> Result<CryptoParams, Error> {
|
||||
file_get_contents(keyfile)?,
|
||||
)),
|
||||
(None, Some(fd)) => {
|
||||
let input = unsafe { std::fs::File::from_raw_fd(fd) };
|
||||
let mut input = unsafe { std::fs::File::from_raw_fd(fd) };
|
||||
let mut data = Vec::new();
|
||||
let _len: usize = { input }.read_to_end(&mut data).map_err(|err| {
|
||||
let _len: usize = input.read_to_end(&mut data).map_err(|err| {
|
||||
format_err!("error reading encryption key from fd {}: {}", fd, err)
|
||||
})?;
|
||||
if keep_keyfd_open {
|
||||
// don't close fd if requested, and try to reset seek position
|
||||
std::mem::forget(input);
|
||||
unsafe { libc::lseek(fd, 0, libc::SEEK_SET); }
|
||||
}
|
||||
Some(KeyWithSource::from_fd(data))
|
||||
}
|
||||
};
|
||||
|
@ -21,6 +21,7 @@ pub struct SnapRestoreDetails {
|
||||
pub repo: BackupRepository,
|
||||
pub snapshot: BackupDir,
|
||||
pub manifest: BackupManifest,
|
||||
pub keyfile: Option<String>,
|
||||
}
|
||||
|
||||
/// Return value of a BlockRestoreDriver.status() call, 'id' must be valid for .stop(id)
|
||||
|
@ -94,7 +94,7 @@ async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
|
||||
if res.is_err() {
|
||||
// VM is not reachable, remove from map and inform user
|
||||
to_remove.push(name.clone());
|
||||
println!(
|
||||
eprintln!(
|
||||
"VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
|
||||
name, state.pid, state.cid
|
||||
);
|
||||
@ -129,7 +129,7 @@ async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Err
|
||||
return Ok(client);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("stale VM detected, restarting ({})", err);
|
||||
eprintln!("stale VM detected, restarting ({})", err);
|
||||
// VM is dead, restart
|
||||
let vms = start_vm(vm.cid, details).await?;
|
||||
new_cid = vms.cid;
|
||||
|
@ -127,9 +127,6 @@ pub async fn start_vm(
|
||||
if let Err(_) = std::env::var("PBS_PASSWORD") {
|
||||
bail!("environment variable PBS_PASSWORD has to be set for QEMU VM restore");
|
||||
}
|
||||
if let Err(_) = std::env::var("PBS_FINGERPRINT") {
|
||||
bail!("environment variable PBS_FINGERPRINT has to be set for QEMU VM restore");
|
||||
}
|
||||
|
||||
let pid;
|
||||
let (pid_fd, pid_path) = make_tmp_file("/tmp/file-restore-qemu.pid.tmp", CreateOptions::new())?;
|
||||
@ -193,9 +190,14 @@ pub async fn start_vm(
|
||||
continue;
|
||||
}
|
||||
drives.push("-drive".to_owned());
|
||||
let keyfile = if let Some(ref keyfile) = details.keyfile {
|
||||
format!(",,keyfile={}", keyfile)
|
||||
} else {
|
||||
"".to_owned()
|
||||
};
|
||||
drives.push(format!(
|
||||
"file=pbs:repository={},,snapshot={},,archive={},read-only=on,if=none,id=drive{}",
|
||||
details.repo, details.snapshot, file, id
|
||||
"file=pbs:repository={},,snapshot={},,archive={}{},read-only=on,if=none,id=drive{}",
|
||||
details.repo, details.snapshot, file, keyfile, id
|
||||
));
|
||||
drives.push("-device".to_owned());
|
||||
// drive serial is used by VM to map .fidx files to /dev paths
|
||||
|
@ -148,7 +148,7 @@ fn list(
|
||||
match root_entry {
|
||||
DirEntryAttribute::File { .. } => {
|
||||
// list on file, return details
|
||||
res.push(ArchiveEntry::new(¶m_path, &root_entry));
|
||||
res.push(ArchiveEntry::new(¶m_path, Some(&root_entry)));
|
||||
}
|
||||
DirEntryAttribute::Directory { .. } => {
|
||||
// list on directory, return all contained files/dirs
|
||||
@ -176,7 +176,7 @@ fn list(
|
||||
if let Ok(entry) = entry {
|
||||
res.push(ArchiveEntry::new(
|
||||
full_path.as_os_str().as_bytes(),
|
||||
&entry,
|
||||
Some(&entry),
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -192,7 +192,7 @@ fn list(
|
||||
t_path.extend(t.as_bytes());
|
||||
res.push(ArchiveEntry::new(
|
||||
&t_path[..],
|
||||
&DirEntryAttribute::Directory { start: 0 },
|
||||
None,
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -203,7 +203,8 @@ fn list(
|
||||
c_path.extend(c.as_bytes());
|
||||
res.push(ArchiveEntry::new(
|
||||
&c_path[..],
|
||||
&DirEntryAttribute::Directory { start: 0 },
|
||||
// this marks the beginning of a filesystem, i.e. '/', so this is a Directory
|
||||
Some(&DirEntryAttribute::Directory { start: 0 }),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
@ -593,6 +593,8 @@ fn decode_element_status_page(
|
||||
break;
|
||||
}
|
||||
|
||||
let len_before = reader.len();
|
||||
|
||||
match subhead.element_type_code {
|
||||
1 => {
|
||||
let desc: TrasnsportDescriptor = unsafe { reader.read_be_value()? };
|
||||
@ -693,6 +695,19 @@ fn decode_element_status_page(
|
||||
}
|
||||
code => bail!("got unknown element type code {}", code),
|
||||
}
|
||||
|
||||
// we have to consume the whole descriptor size, else
|
||||
// our position in the reader is not correct
|
||||
let len_after = reader.len();
|
||||
let have_read = len_before - len_after;
|
||||
let desc_len = subhead.descriptor_length as usize;
|
||||
if desc_len > have_read {
|
||||
let mut left_to_read = desc_len - have_read;
|
||||
if left_to_read > len_after {
|
||||
left_to_read = len_after; // reader has not enough data?
|
||||
}
|
||||
let _ = reader.read_exact_allocated(left_to_read)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,9 @@ pub use tape_alert_flags::*;
|
||||
mod mam;
|
||||
pub use mam::*;
|
||||
|
||||
mod report_density;
|
||||
pub use report_density::*;
|
||||
|
||||
use proxmox::{
|
||||
sys::error::SysResult,
|
||||
tools::io::{ReadExt, WriteExt},
|
||||
@ -103,6 +106,7 @@ pub struct LtoTapeStatus {
|
||||
pub struct SgTape {
|
||||
file: File,
|
||||
info: InquiryInfo,
|
||||
density_code: u8, // drive type
|
||||
encryption_key_loaded: bool,
|
||||
}
|
||||
|
||||
@ -120,9 +124,13 @@ impl SgTape {
|
||||
if info.peripheral_type != 1 {
|
||||
bail!("not a tape device (peripheral_type = {})", info.peripheral_type);
|
||||
}
|
||||
|
||||
let density_code = report_density(&mut file)?;
|
||||
|
||||
Ok(Self {
|
||||
file,
|
||||
info,
|
||||
density_code,
|
||||
encryption_key_loaded: false,
|
||||
})
|
||||
}
|
||||
@ -193,14 +201,18 @@ impl SgTape {
|
||||
let mut sg_raw = SgRaw::new(&mut self.file, 16)?;
|
||||
sg_raw.set_timeout(Self::SCSI_TAPE_DEFAULT_TIMEOUT);
|
||||
let mut cmd = Vec::new();
|
||||
|
||||
if self.density_code >= 0x58 { // FORMAT requires LTO5 or newer)
|
||||
cmd.extend(&[0x04, 0, 0, 0, 0, 0]);
|
||||
|
||||
sg_raw.do_command(&cmd)
|
||||
.map_err(|err| format_err!("erase failed - {}", err))?;
|
||||
|
||||
sg_raw.do_command(&cmd)?;
|
||||
if !fast {
|
||||
self.erase_media(false)?; // overwrite everything
|
||||
}
|
||||
} else {
|
||||
// try rewind/erase instead
|
||||
self.rewind()?;
|
||||
self.erase_media(fast)?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
69
src/tape/drive/lto/sg_tape/report_density.rs
Normal file
69
src/tape/drive/lto/sg_tape/report_density.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use std::io::Read;
|
||||
use endian_trait::Endian;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
use proxmox::tools::io::ReadExt;
|
||||
|
||||
use crate::tools::sgutils2::SgRaw;
|
||||
|
||||
#[repr(C, packed)]
|
||||
#[derive(Endian)]
|
||||
struct DesnityDescriptorBlock {
|
||||
primary_density_code: u8,
|
||||
secondary_density_code: u8,
|
||||
flags2: u8,
|
||||
reserved: [u8; 2],
|
||||
bits_per_mm: [u8; 3],
|
||||
media_width: u16,
|
||||
tracks: u16,
|
||||
capacity: u32,
|
||||
organizazion: [u8; 8],
|
||||
density_name: [u8; 8],
|
||||
description: [u8; 20],
|
||||
}
|
||||
|
||||
// Returns the maximum supported drive density code
|
||||
pub fn report_density<F: AsRawFd>(file: &mut F) -> Result<u8, Error> {
|
||||
let alloc_len: u16 = 8192;
|
||||
let mut sg_raw = SgRaw::new(file, alloc_len as usize)?;
|
||||
|
||||
let mut cmd = Vec::new();
|
||||
cmd.extend(&[0x44, 0, 0, 0, 0, 0, 0]); // REPORT DENSITY SUPPORT (MEDIA=0)
|
||||
cmd.extend(&alloc_len.to_be_bytes()); // alloc len
|
||||
cmd.push(0u8); // control byte
|
||||
|
||||
let data = sg_raw.do_command(&cmd)
|
||||
.map_err(|err| format_err!("report density failed - {}", err))?;
|
||||
|
||||
let mut max_density = 0u8;
|
||||
|
||||
proxmox::try_block!({
|
||||
let mut reader = &data[..];
|
||||
|
||||
let page_len: u16 = unsafe { reader.read_be_value()? };
|
||||
let page_len = page_len as usize;
|
||||
|
||||
if (page_len + 2) > data.len() {
|
||||
bail!("invalid page length {} {}", page_len + 2, data.len());
|
||||
} else {
|
||||
// Note: Quantum hh7 returns the allocation_length instead of real data_len
|
||||
reader = &data[2..page_len+2];
|
||||
}
|
||||
let mut reserved = [0u8; 2];
|
||||
reader.read_exact(&mut reserved)?;
|
||||
|
||||
loop {
|
||||
if reader.is_empty() { break; }
|
||||
let block: DesnityDescriptorBlock = unsafe { reader.read_be_value()? };
|
||||
if block.primary_density_code > max_density {
|
||||
max_density = block.primary_density_code;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
}).map_err(|err| format_err!("decode report density failed - {}", err))?;
|
||||
|
||||
Ok(max_density)
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
//! Generic AsyncRead/AsyncWrite utilities.
|
||||
//! AsyncRead/AsyncWrite utilities.
|
||||
|
||||
use std::io;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
@ -9,52 +9,92 @@ use futures::stream::{Stream, TryStream};
|
||||
use futures::ready;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio::net::TcpListener;
|
||||
use hyper::client::connect::Connection;
|
||||
use tokio_openssl::SslStream;
|
||||
use hyper::client::connect::{Connection, Connected};
|
||||
|
||||
pub enum EitherStream<L, R> {
|
||||
Left(L),
|
||||
Right(R),
|
||||
/// Asynchronous stream, possibly encrypted and proxied
|
||||
///
|
||||
/// Usefule for HTTP client implementations using hyper.
|
||||
pub enum MaybeTlsStream<S> {
|
||||
Normal(S),
|
||||
Proxied(S),
|
||||
Secured(SslStream<S>),
|
||||
}
|
||||
|
||||
impl<L: AsyncRead + Unpin, R: AsyncRead + Unpin> AsyncRead for EitherStream<L, R> {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &mut ReadBuf,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherStream::Left(ref mut s) => {
|
||||
MaybeTlsStream::Normal(ref mut s) => {
|
||||
Pin::new(s).poll_read(cx, buf)
|
||||
}
|
||||
EitherStream::Right(ref mut s) => {
|
||||
MaybeTlsStream::Proxied(ref mut s) => {
|
||||
Pin::new(s).poll_read(cx, buf)
|
||||
}
|
||||
MaybeTlsStream::Secured(ref mut s) => {
|
||||
Pin::new(s).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L, R> {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherStream::Left(ref mut s) => {
|
||||
MaybeTlsStream::Normal(ref mut s) => {
|
||||
Pin::new(s).poll_write(cx, buf)
|
||||
}
|
||||
EitherStream::Right(ref mut s) => {
|
||||
MaybeTlsStream::Proxied(ref mut s) => {
|
||||
Pin::new(s).poll_write(cx, buf)
|
||||
}
|
||||
MaybeTlsStream::Secured(ref mut s) => {
|
||||
Pin::new(s).poll_write(cx, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[io::IoSlice<'_>],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
match self.get_mut() {
|
||||
MaybeTlsStream::Normal(ref mut s) => {
|
||||
Pin::new(s).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
MaybeTlsStream::Proxied(ref mut s) => {
|
||||
Pin::new(s).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
MaybeTlsStream::Secured(ref mut s) => {
|
||||
Pin::new(s).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
match self {
|
||||
MaybeTlsStream::Normal(s) => s.is_write_vectored(),
|
||||
MaybeTlsStream::Proxied(s) => s.is_write_vectored(),
|
||||
MaybeTlsStream::Secured(s) => s.is_write_vectored(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherStream::Left(ref mut s) => {
|
||||
MaybeTlsStream::Normal(ref mut s) => {
|
||||
Pin::new(s).poll_flush(cx)
|
||||
}
|
||||
EitherStream::Right(ref mut s) => {
|
||||
MaybeTlsStream::Proxied(ref mut s) => {
|
||||
Pin::new(s).poll_flush(cx)
|
||||
}
|
||||
MaybeTlsStream::Secured(ref mut s) => {
|
||||
Pin::new(s).poll_flush(cx)
|
||||
}
|
||||
}
|
||||
@ -62,25 +102,27 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherStream::Left(ref mut s) => {
|
||||
MaybeTlsStream::Normal(ref mut s) => {
|
||||
Pin::new(s).poll_shutdown(cx)
|
||||
}
|
||||
EitherStream::Right(ref mut s) => {
|
||||
MaybeTlsStream::Proxied(ref mut s) => {
|
||||
Pin::new(s).poll_shutdown(cx)
|
||||
}
|
||||
MaybeTlsStream::Secured(ref mut s) => {
|
||||
Pin::new(s).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we need this for crate::client::http_client:
|
||||
impl Connection for EitherStream<
|
||||
tokio::net::TcpStream,
|
||||
Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
|
||||
> {
|
||||
fn connected(&self) -> hyper::client::connect::Connected {
|
||||
// we need this for the hyper http client
|
||||
impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
|
||||
{
|
||||
fn connected(&self) -> Connected {
|
||||
match self {
|
||||
EitherStream::Left(s) => s.connected(),
|
||||
EitherStream::Right(s) => s.get_ref().connected(),
|
||||
MaybeTlsStream::Normal(s) => s.connected(),
|
||||
MaybeTlsStream::Proxied(s) => s.connected().proxy(true),
|
||||
MaybeTlsStream::Secured(s) => s.get_ref().connected(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,68 +1,76 @@
|
||||
use anyhow::{Error, format_err, bail};
|
||||
use lazy_static::lazy_static;
|
||||
use std::task::{Context, Poll};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use hyper::{Uri, Body};
|
||||
use hyper::client::{Client, HttpConnector};
|
||||
use http::{Request, Response};
|
||||
use openssl::ssl::{SslConnector, SslMethod};
|
||||
use futures::*;
|
||||
use tokio::{
|
||||
io::{
|
||||
AsyncRead,
|
||||
AsyncReadExt,
|
||||
AsyncWriteExt,
|
||||
},
|
||||
net::TcpStream,
|
||||
};
|
||||
use tokio_openssl::SslStream;
|
||||
|
||||
use crate::tools::{
|
||||
async_io::EitherStream,
|
||||
async_io::MaybeTlsStream,
|
||||
socket::{
|
||||
set_tcp_keepalive,
|
||||
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
|
||||
},
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
static ref HTTP_CLIENT: Client<HttpsConnector, Body> = {
|
||||
let connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
|
||||
let httpc = HttpConnector::new();
|
||||
let https = HttpsConnector::with_connector(httpc, connector);
|
||||
Client::builder().build(https)
|
||||
};
|
||||
/// HTTP Proxy Configuration
|
||||
#[derive(Clone)]
|
||||
pub struct ProxyConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub force_connect: bool,
|
||||
}
|
||||
|
||||
pub async fn get_string(uri: &str, extra_headers: Option<&HashMap<String, String>>) -> Result<String, Error> {
|
||||
let mut request = Request::builder()
|
||||
.method("GET")
|
||||
.uri(uri)
|
||||
.header("User-Agent", "proxmox-backup-client/1.0");
|
||||
|
||||
if let Some(hs) = extra_headers {
|
||||
for (h, v) in hs.iter() {
|
||||
request = request.header(h, v);
|
||||
}
|
||||
}
|
||||
|
||||
let request = request.body(Body::empty())?;
|
||||
|
||||
let res = HTTP_CLIENT.request(request).await?;
|
||||
|
||||
let status = res.status();
|
||||
if !status.is_success() {
|
||||
bail!("Got bad status '{}' from server", status)
|
||||
}
|
||||
|
||||
response_body_string(res).await
|
||||
/// Asyncrounous HTTP client implementation
|
||||
pub struct SimpleHttp {
|
||||
client: Client<HttpsConnector, Body>,
|
||||
}
|
||||
|
||||
pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> {
|
||||
let buf = hyper::body::to_bytes(res).await?;
|
||||
String::from_utf8(buf.to_vec())
|
||||
.map_err(|err| format_err!("Error converting HTTP result data: {}", err))
|
||||
}
|
||||
impl SimpleHttp {
|
||||
|
||||
pub async fn post(
|
||||
pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
|
||||
let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
|
||||
Self::with_ssl_connector(ssl_connector, proxy_config)
|
||||
}
|
||||
|
||||
pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
|
||||
let connector = HttpConnector::new();
|
||||
let mut https = HttpsConnector::with_connector(connector, ssl_connector);
|
||||
if let Some(proxy_config) = proxy_config {
|
||||
https.set_proxy(proxy_config);
|
||||
}
|
||||
let client = Client::builder().build(https);
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
self.client.request(request)
|
||||
.map_err(Error::from)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn post(
|
||||
&mut self,
|
||||
uri: &str,
|
||||
body: Option<String>,
|
||||
content_type: Option<&str>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
|
||||
let body = if let Some(body) = body {
|
||||
Body::from(body)
|
||||
} else {
|
||||
@ -77,76 +85,222 @@ pub async fn post(
|
||||
.header(hyper::header::CONTENT_TYPE, content_type)
|
||||
.body(body)?;
|
||||
|
||||
|
||||
HTTP_CLIENT.request(request)
|
||||
self.client.request(request)
|
||||
.map_err(Error::from)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_string(
|
||||
&mut self,
|
||||
uri: &str,
|
||||
extra_headers: Option<&HashMap<String, String>>,
|
||||
) -> Result<String, Error> {
|
||||
|
||||
let mut request = Request::builder()
|
||||
.method("GET")
|
||||
.uri(uri)
|
||||
.header("User-Agent", "proxmox-backup-client/1.0");
|
||||
|
||||
if let Some(hs) = extra_headers {
|
||||
for (h, v) in hs.iter() {
|
||||
request = request.header(h, v);
|
||||
}
|
||||
}
|
||||
|
||||
let request = request.body(Body::empty())?;
|
||||
|
||||
let res = self.client.request(request).await?;
|
||||
|
||||
let status = res.status();
|
||||
if !status.is_success() {
|
||||
bail!("Got bad status '{}' from server", status)
|
||||
}
|
||||
|
||||
Self::response_body_string(res).await
|
||||
}
|
||||
|
||||
pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> {
|
||||
let buf = hyper::body::to_bytes(res).await?;
|
||||
String::from_utf8(buf.to_vec())
|
||||
.map_err(|err| format_err!("Error converting HTTP result data: {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpsConnector {
|
||||
http: HttpConnector,
|
||||
ssl_connector: std::sync::Arc<SslConnector>,
|
||||
connector: HttpConnector,
|
||||
ssl_connector: Arc<SslConnector>,
|
||||
proxy: Option<ProxyConfig>,
|
||||
}
|
||||
|
||||
impl HttpsConnector {
|
||||
pub fn with_connector(mut http: HttpConnector, ssl_connector: SslConnector) -> Self {
|
||||
http.enforce_http(false);
|
||||
|
||||
pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self {
|
||||
connector.enforce_http(false);
|
||||
Self {
|
||||
http,
|
||||
ssl_connector: std::sync::Arc::new(ssl_connector),
|
||||
connector,
|
||||
ssl_connector: Arc::new(ssl_connector),
|
||||
proxy: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_proxy(&mut self, proxy: ProxyConfig) {
|
||||
self.proxy = Some(proxy);
|
||||
}
|
||||
|
||||
async fn secure_stream(
|
||||
tcp_stream: TcpStream,
|
||||
ssl_connector: &SslConnector,
|
||||
host: &str,
|
||||
) -> Result<MaybeTlsStream<TcpStream>, Error> {
|
||||
let config = ssl_connector.configure()?;
|
||||
let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
|
||||
Pin::new(&mut conn).connect().await?;
|
||||
Ok(MaybeTlsStream::Secured(conn))
|
||||
}
|
||||
|
||||
async fn read_connect_response<R: AsyncRead + Unpin>(
|
||||
stream: &mut R,
|
||||
) -> Result<String, Error> {
|
||||
|
||||
let mut data: Vec<u8> = Vec::new();
|
||||
let mut buffer = [0u8; 256];
|
||||
const END_MARK: &[u8; 4] = b"\r\n\r\n";
|
||||
|
||||
'outer: loop {
|
||||
let n = stream.read(&mut buffer[..]).await?;
|
||||
if n == 0 { break; }
|
||||
let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
|
||||
data.extend(&buffer[..n]);
|
||||
if data.len() >= END_MARK.len() {
|
||||
if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
|
||||
if pos != data.len() - END_MARK.len() {
|
||||
bail!("unexpected data after connect response");
|
||||
}
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
if data.len() > 1024*32 { // max 32K (random chosen limit)
|
||||
bail!("too many bytes");
|
||||
}
|
||||
}
|
||||
|
||||
let response = String::from_utf8_lossy(&data);
|
||||
|
||||
match response.split("\r\n").next() {
|
||||
Some(status) => Ok(status.to_owned()),
|
||||
None => bail!("missing newline"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn parse_connect_status<R: AsyncRead + Unpin>(
|
||||
stream: &mut R,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let status_str = Self::read_connect_response(stream).await
|
||||
.map_err(|err| format_err!("invalid connect response: {}", err))?;
|
||||
|
||||
if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
|
||||
bail!("proxy connect failed - invalid status: {}", status_str)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type MaybeTlsStream = EitherStream<
|
||||
tokio::net::TcpStream,
|
||||
Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
|
||||
>;
|
||||
|
||||
impl hyper::service::Service<Uri> for HttpsConnector {
|
||||
type Response = MaybeTlsStream;
|
||||
type Response = MaybeTlsStream<TcpStream>;
|
||||
type Error = Error;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// This connector is always ready, but others might not be.
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.connector
|
||||
.poll_ready(ctx)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn call(&mut self, dst: Uri) -> Self::Future {
|
||||
let mut this = self.clone();
|
||||
let mut connector = self.connector.clone();
|
||||
let ssl_connector = Arc::clone(&self.ssl_connector);
|
||||
let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
|
||||
let host = match dst.host() {
|
||||
Some(host) => host.to_owned(),
|
||||
None => {
|
||||
return futures::future::err(format_err!("missing URL scheme")).boxed();
|
||||
}
|
||||
};
|
||||
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
|
||||
|
||||
if let Some(ref proxy) = self.proxy {
|
||||
|
||||
let use_connect = is_https || proxy.force_connect;
|
||||
|
||||
let proxy_url = format!("{}:{}", proxy.host, proxy.port);
|
||||
let proxy_uri = match Uri::builder()
|
||||
.scheme("http")
|
||||
.authority(proxy_url.as_str())
|
||||
.path_and_query("/")
|
||||
.build()
|
||||
{
|
||||
Ok(uri) => uri,
|
||||
Err(err) => return futures::future::err(err.into()).boxed(),
|
||||
};
|
||||
|
||||
if use_connect {
|
||||
async move {
|
||||
let is_https = dst
|
||||
.scheme()
|
||||
.ok_or_else(|| format_err!("missing URL scheme"))?
|
||||
== "https";
|
||||
|
||||
let host = dst
|
||||
.host()
|
||||
.ok_or_else(|| format_err!("missing hostname in destination url?"))?
|
||||
.to_string();
|
||||
let mut tcp_stream = connector
|
||||
.call(proxy_uri)
|
||||
.await
|
||||
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
|
||||
|
||||
let config = this.ssl_connector.configure();
|
||||
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
|
||||
let connect_request = format!(
|
||||
"CONNECT {0}:{1} HTTP/1.1\r\n\
|
||||
Host: {0}:{1}\r\n\r\n",
|
||||
host, port,
|
||||
);
|
||||
|
||||
tcp_stream.write_all(connect_request.as_bytes()).await?;
|
||||
tcp_stream.flush().await?;
|
||||
|
||||
Self::parse_connect_status(&mut tcp_stream).await?;
|
||||
|
||||
if is_https {
|
||||
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
|
||||
} else {
|
||||
Ok(MaybeTlsStream::Normal(tcp_stream))
|
||||
}
|
||||
}.boxed()
|
||||
} else {
|
||||
async move {
|
||||
let tcp_stream = connector
|
||||
.call(proxy_uri)
|
||||
.await
|
||||
.map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
|
||||
|
||||
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
|
||||
Ok(MaybeTlsStream::Proxied(tcp_stream))
|
||||
}.boxed()
|
||||
}
|
||||
} else {
|
||||
async move {
|
||||
let dst_str = dst.to_string(); // for error messages
|
||||
let conn = this
|
||||
.http
|
||||
let tcp_stream = connector
|
||||
.call(dst)
|
||||
.await
|
||||
.map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
|
||||
|
||||
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||
|
||||
if is_https {
|
||||
let conn: tokio_openssl::SslStream<tokio::net::TcpStream> = tokio_openssl::SslStream::new(config?.into_ssl(&host)?, conn)?;
|
||||
let mut conn = Box::pin(conn);
|
||||
conn.as_mut().connect().await?;
|
||||
Ok(MaybeTlsStream::Right(conn))
|
||||
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
|
||||
} else {
|
||||
Ok(MaybeTlsStream::Left(conn))
|
||||
Ok(MaybeTlsStream::Normal(tcp_stream))
|
||||
}
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,7 @@ use regex::Regex;
|
||||
|
||||
use proxmox::api::api;
|
||||
|
||||
use crate::tools;
|
||||
use crate::tools::http;
|
||||
use crate::tools::{self, http::SimpleHttp};
|
||||
use proxmox::tools::fs::{replace_file, CreateOptions};
|
||||
|
||||
/// How long the local key is valid for in between remote checks
|
||||
@ -102,10 +101,13 @@ async fn register_subscription(
|
||||
"ip": "localhost",
|
||||
"check_token": challenge,
|
||||
});
|
||||
|
||||
let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
|
||||
|
||||
let uri = "https://shop.maurer-it.com/modules/servers/licensing/verify.php";
|
||||
let query = tools::json_object_to_query(params)?;
|
||||
let response = http::post(uri, Some(query), Some("application/x-www-form-urlencoded")).await?;
|
||||
let body = http::response_body_string(response).await?;
|
||||
let response = client.post(uri, Some(query), Some("application/x-www-form-urlencoded")).await?;
|
||||
let body = SimpleHttp::response_body_string(response).await?;
|
||||
|
||||
Ok((body, challenge))
|
||||
}
|
||||
|
@ -170,6 +170,10 @@ pub fn is_valid_xattr_name(c_name: &CStr) -> bool {
|
||||
if name.starts_with(b"user.") || name.starts_with(b"trusted.") {
|
||||
return true;
|
||||
}
|
||||
// samba saves windows ACLs there
|
||||
if name == b"security.NTACL" {
|
||||
return true;
|
||||
}
|
||||
is_security_capability(c_name)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user