split proxmox-file-restore into its own crate

This also moves a couple of required utilities such as
logrotate and some file descriptor methods to pbs-tools.

Note that the logrotate usage and run-dir handling should be
improved to work as a regular user as this *should* (IMHO)
be a regular unprivileged command (including running
qemu given the kvm privileges...)

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller
2021-09-01 12:21:51 +02:00
parent e5f9b7f79e
commit 6c76aa434d
23 changed files with 182 additions and 79 deletions

View File

@ -0,0 +1,27 @@
[package]
name = "proxmox-file-restore"
version = "0.1.0"
authors = ["Proxmox Support Team <support@proxmox.com>"]
edition = "2018"
[dependencies]
anyhow = "1.0"
base64 = "0.12"
futures = "0.3"
libc = "0.2"
nix = "0.19.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time" ] }
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-buildcfg = { path = "../pbs-buildcfg" }
pbs-client = { path = "../pbs-client" }
pbs-datastore = { path = "../pbs-datastore" }
pbs-runtime = { path = "../pbs-runtime" }
pbs-systemd = { path = "../pbs-systemd" }
pbs-tools = { path = "../pbs-tools" }

View File

@ -0,0 +1,208 @@
//! Abstraction layer over different methods of accessing a block backup
use std::collections::HashMap;
use std::future::Future;
use std::hash::BuildHasher;
use std::pin::Pin;
use anyhow::{bail, Error};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use proxmox::api::{api, cli::*};
use pbs_client::BackupRepository;
use pbs_datastore::backup_info::BackupDir;
use pbs_datastore::catalog::ArchiveEntry;
use pbs_datastore::manifest::BackupManifest;
use super::block_driver_qemu::QemuBlockDriver;
/// Contains details about a snapshot that is to be accessed by block file restore
pub struct SnapRestoreDetails {
pub repo: BackupRepository,
pub snapshot: BackupDir,
pub manifest: BackupManifest,
pub keyfile: Option<String>,
}
/// Return value of a BlockRestoreDriver.status() call, 'id' must be valid for .stop(id)
pub struct DriverStatus {
pub id: String,
pub data: Value,
}
pub type Async<R> = Pin<Box<dyn Future<Output = R> + Send>>;
/// An abstract implementation for retrieving data out of a block file backup
pub trait BlockRestoreDriver {
/// List ArchiveEntrys for the given image file and path
fn data_list(
&self,
details: SnapRestoreDetails,
img_file: String,
path: Vec<u8>,
) -> Async<Result<Vec<ArchiveEntry>, Error>>;
/// pxar=true:
/// Attempt to create a pxar archive of the given file path and return a reader instance for it
/// pxar=false:
/// Attempt to read the file or folder at the given path and return the file content or a zip
/// file as a stream
fn data_extract(
&self,
details: SnapRestoreDetails,
img_file: String,
path: Vec<u8>,
pxar: bool,
) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>>;
/// Return status of all running/mapped images, result value is (id, extra data), where id must
/// match with the ones returned from list()
fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>>;
/// Stop/Close a running restore method
fn stop(&self, id: String) -> Async<Result<(), Error>>;
/// Returned ids must be prefixed with driver type so that they cannot collide between drivers,
/// the returned values must be passable to stop()
fn list(&self) -> Vec<String>;
}
#[api()]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
pub enum BlockDriverType {
/// Uses a small QEMU/KVM virtual machine to map images securely. Requires PVE-patched QEMU.
Qemu,
}
impl BlockDriverType {
fn resolve(&self) -> impl BlockRestoreDriver {
match self {
BlockDriverType::Qemu => QemuBlockDriver {},
}
}
}
const DEFAULT_DRIVER: BlockDriverType = BlockDriverType::Qemu;
const ALL_DRIVERS: &[BlockDriverType] = &[BlockDriverType::Qemu];
pub async fn data_list(
driver: Option<BlockDriverType>,
details: SnapRestoreDetails,
img_file: String,
path: Vec<u8>,
) -> Result<Vec<ArchiveEntry>, Error> {
let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
driver.data_list(details, img_file, path).await
}
pub async fn data_extract(
driver: Option<BlockDriverType>,
details: SnapRestoreDetails,
img_file: String,
path: Vec<u8>,
pxar: bool,
) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, Error> {
let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
driver.data_extract(details, img_file, path, pxar).await
}
#[api(
input: {
properties: {
"driver": {
type: BlockDriverType,
optional: true,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
},
},
)]
/// Retrieve status information about currently running/mapped restore images
pub async fn status(driver: Option<BlockDriverType>, param: Value) -> Result<(), Error> {
let output_format = get_output_format(&param);
let text = output_format == "text";
let mut ret = json!({});
for dt in ALL_DRIVERS {
if driver.is_some() && &driver.unwrap() != dt {
continue;
}
let drv_name = format!("{:?}", dt);
let drv = dt.resolve();
match drv.status().await {
Ok(data) if data.is_empty() => {
if text {
println!("{}: no mappings", drv_name);
} else {
ret[drv_name] = json!({});
}
}
Ok(data) => {
if text {
println!("{}:", &drv_name);
}
ret[&drv_name]["ids"] = json!({});
for status in data {
if text {
println!("{} \t({})", status.id, status.data);
} else {
ret[&drv_name]["ids"][status.id] = status.data;
}
}
}
Err(err) => {
if text {
eprintln!("error getting status from driver '{}' - {}", drv_name, err);
} else {
ret[drv_name] = json!({ "error": format!("{}", err) });
}
}
}
}
if !text {
format_and_print_result(&ret, &output_format);
}
Ok(())
}
#[api(
input: {
properties: {
"name": {
type: String,
description: "The name of the VM to stop.",
},
},
},
)]
/// Immediately stop/unmap a given image. Not typically necessary, as VMs will stop themselves
/// after a timer anyway.
pub async fn stop(name: String) -> Result<(), Error> {
for drv in ALL_DRIVERS.iter().map(BlockDriverType::resolve) {
if drv.list().contains(&name) {
return drv.stop(name).await;
}
}
bail!("no mapping with name '{}' found", name);
}
/// Autocompletion handler for block mappings
pub fn complete_block_driver_ids<S: BuildHasher>(
_arg: &str,
_param: &HashMap<String, String, S>,
) -> Vec<String> {
ALL_DRIVERS
.iter()
.map(BlockDriverType::resolve)
.map(|d| d.list())
.flatten()
.collect()
}

View File

@ -0,0 +1,333 @@
//! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{prelude::*, SeekFrom};
use anyhow::{bail, Error};
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use serde_json::json;
use proxmox::tools::fs::lock_file;
use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient};
use pbs_datastore::backup_info::BackupDir;
use pbs_datastore::catalog::ArchiveEntry;
use super::block_driver::*;
use crate::get_user_run_dir;
const RESTORE_VM_MAP: &str = "restore-vm-map.json";
pub struct QemuBlockDriver {}
#[derive(Clone, Hash, Serialize, Deserialize)]
struct VMState {
pid: i32,
cid: i32,
ticket: String,
}
struct VMStateMap {
map: HashMap<String, VMState>,
file: File,
}
impl VMStateMap {
fn open_file_raw(write: bool) -> Result<File, Error> {
use std::os::unix::fs::OpenOptionsExt;
let mut path = get_user_run_dir()?;
path.push(RESTORE_VM_MAP);
OpenOptions::new()
.read(true)
.write(write)
.create(write)
.mode(0o600)
.open(path)
.map_err(Error::from)
}
/// Acquire a lock on the state map and retrieve a deserialized version
fn load() -> Result<Self, Error> {
let mut file = Self::open_file_raw(true)?;
lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
let map = serde_json::from_reader(&file).unwrap_or_default();
Ok(Self { map, file })
}
/// Load a read-only copy of the current VM map. Only use for informational purposes, like
/// shell auto-completion, for anything requiring consistency use load() !
fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
let file = Self::open_file_raw(false)?;
Ok(serde_json::from_reader(&file).unwrap_or_default())
}
/// Write back a potentially modified state map, consuming the held lock
fn write(mut self) -> Result<(), Error> {
self.file.seek(SeekFrom::Start(0))?;
self.file.set_len(0)?;
serde_json::to_writer(self.file, &self.map)?;
// drop ourselves including file lock
Ok(())
}
/// Return the map, but drop the lock immediately
fn read_only(self) -> HashMap<String, VMState> {
self.map
}
}
fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
let full = format!("qemu_{}/{}", repo, snap);
pbs_systemd::escape_unit(&full, false)
}
/// remove non-responsive VMs from given map, returns 'true' if map was modified
async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
let mut to_remove = Vec::new();
for (name, state) in map.iter() {
let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
let res = client
.get("api2/json/status", Some(json!({"keep-timeout": true})))
.await;
if res.is_err() {
// VM is not reachable, remove from map and inform user
to_remove.push(name.clone());
eprintln!(
"VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
name, state.pid, state.cid
);
let _ = super::qemu_helper::try_kill_vm(state.pid);
}
}
for tr in &to_remove {
map.remove(tr);
}
!to_remove.is_empty()
}
fn new_ticket() -> String {
proxmox::tools::Uuid::generate().to_string()
}
async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> {
let name = make_name(&details.repo, &details.snapshot);
let mut state = VMStateMap::load()?;
cleanup_map(&mut state.map).await;
let new_cid;
let vms = match state.map.get(&name) {
Some(vm) => {
let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
let res = client.get("api2/json/status", None).await;
match res {
Ok(_) => {
// VM is running and we just reset its timeout, nothing to do
return Ok(client);
}
Err(err) => {
eprintln!("stale VM detected, restarting ({})", err);
// VM is dead, restart
let _ = super::qemu_helper::try_kill_vm(vm.pid);
let vms = start_vm(vm.cid, details).await?;
new_cid = vms.cid;
state.map.insert(name, vms.clone());
vms
}
}
}
None => {
let mut cid = state
.map
.iter()
.map(|v| v.1.cid)
.max()
.unwrap_or(0)
.wrapping_add(1);
// offset cid by user id, to avoid unneccessary retries
let running_uid = nix::unistd::Uid::current();
cid = cid.wrapping_add(running_uid.as_raw() as i32);
// some low CIDs have special meaning, start at 10 to avoid them
cid = cid.max(10);
let vms = start_vm(cid, details).await?;
new_cid = vms.cid;
state.map.insert(name, vms.clone());
vms
}
};
state.write()?;
Ok(VsockClient::new(
new_cid,
DEFAULT_VSOCK_PORT,
Some(vms.ticket.clone()),
))
}
async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
let ticket = new_ticket();
let files = details
.manifest
.files()
.iter()
.map(|file| file.filename.clone())
.filter(|name| name.ends_with(".img.fidx"));
let (pid, cid) =
super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
.await?;
Ok(VMState { pid, cid, ticket })
}
impl BlockRestoreDriver for QemuBlockDriver {
fn data_list(
&self,
details: SnapRestoreDetails,
img_file: String,
mut path: Vec<u8>,
) -> Async<Result<Vec<ArchiveEntry>, Error>> {
async move {
let client = ensure_running(&details).await?;
if !path.is_empty() && path[0] != b'/' {
path.insert(0, b'/');
}
let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
let mut result = client
.get("api2/json/list", Some(json!({ "path": path })))
.await?;
serde_json::from_value(result["data"].take()).map_err(|err| err.into())
}
.boxed()
}
fn data_extract(
&self,
details: SnapRestoreDetails,
img_file: String,
mut path: Vec<u8>,
pxar: bool,
) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
async move {
let client = ensure_running(&details).await?;
if !path.is_empty() && path[0] != b'/' {
path.insert(0, b'/');
}
let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
tokio::spawn(async move {
if let Err(err) = client
.download(
"api2/json/extract",
Some(json!({ "path": path, "pxar": pxar })),
&mut tx,
)
.await
{
eprintln!("reading file extraction stream failed - {}", err);
std::process::exit(1);
}
});
Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
}
.boxed()
}
fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
async move {
let mut state_map = VMStateMap::load()?;
let modified = cleanup_map(&mut state_map.map).await;
let map = if modified {
let m = state_map.map.clone();
state_map.write()?;
m
} else {
state_map.read_only()
};
let mut result = Vec::new();
for (n, s) in map.iter() {
let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
let resp = client
.get("api2/json/status", Some(json!({"keep-timeout": true})))
.await;
let name = pbs_systemd::unescape_unit(n)
.unwrap_or_else(|_| "<invalid name>".to_owned());
let mut extra = json!({"pid": s.pid, "cid": s.cid});
match resp {
Ok(status) => match status["data"].as_object() {
Some(map) => {
for (k, v) in map.iter() {
extra[k] = v.clone();
}
}
None => {
let err = format!(
"invalid JSON received from /status call: {}",
status.to_string()
);
extra["error"] = json!(err);
}
},
Err(err) => {
let err = format!("error during /status API call: {}", err);
extra["error"] = json!(err);
}
}
result.push(DriverStatus {
id: name,
data: extra,
});
}
Ok(result)
}
.boxed()
}
fn stop(&self, id: String) -> Async<Result<(), Error>> {
async move {
let name = pbs_systemd::escape_unit(&id, false);
let mut map = VMStateMap::load()?;
let map_mod = cleanup_map(&mut map.map).await;
match map.map.get(&name) {
Some(state) => {
let client =
VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
// ignore errors, this either fails because:
// * the VM is unreachable/dead, in which case we don't want it in the map
// * the call was successful and the connection reset when the VM stopped
let _ = client.get("api2/json/stop", None).await;
map.map.remove(&name);
map.write()?;
}
None => {
if map_mod {
map.write()?;
}
bail!("VM with name '{}' not found", name);
}
}
Ok(())
}
.boxed()
}
fn list(&self) -> Vec<String> {
match VMStateMap::load_read_only() {
Ok(state) => state
.iter()
.filter_map(|(name, _)| pbs_systemd::unescape_unit(&name).ok())
.collect(),
Err(_) => Vec::new(),
}
}
}

View File

@ -0,0 +1,74 @@
//! Provides a very basic "newc" format cpio encoder.
//! See 'man 5 cpio' for format details, as well as:
//! https://www.kernel.org/doc/html/latest/driver-api/early-userspace/buffer-format.html
//! This does not provide full support for the format, only what is needed to include files in an
//! initramfs intended for a linux kernel.
use std::ffi::{CString, CStr};
use anyhow::{bail, Error};
use tokio::io::{copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
/// Write a cpio file entry to an AsyncWrite.
pub async fn append_file<W: AsyncWrite + Unpin, R: AsyncRead + Unpin>(
mut target: W,
content: R,
name: &CStr,
inode: u16,
mode: u16,
uid: u16,
gid: u16,
// negative mtimes are generally valid, but cpio defines all fields as unsigned
mtime: u64,
// c_filesize has 8 bytes, but man page claims that 4 GB files are the maximum, let's be safe
size: u32,
) -> Result<(), Error> {
let name = name.to_bytes_with_nul();
target.write_all(b"070701").await?; // c_magic
print_cpio_hex(&mut target, inode as u64).await?; // c_ino
print_cpio_hex(&mut target, mode as u64).await?; // c_mode
print_cpio_hex(&mut target, uid as u64).await?; // c_uid
print_cpio_hex(&mut target, gid as u64).await?; // c_gid
print_cpio_hex(&mut target, 0).await?; // c_nlink
print_cpio_hex(&mut target, mtime as u64).await?; // c_mtime
print_cpio_hex(&mut target, size as u64).await?; // c_filesize
print_cpio_hex(&mut target, 0).await?; // c_devmajor
print_cpio_hex(&mut target, 0).await?; // c_devminor
print_cpio_hex(&mut target, 0).await?; // c_rdevmajor
print_cpio_hex(&mut target, 0).await?; // c_rdevminor
print_cpio_hex(&mut target, name.len() as u64).await?; // c_namesize
print_cpio_hex(&mut target, 0).await?; // c_check (ignored for newc)
target.write_all(name).await?;
let header_size = 6 + 8*13 + name.len();
let mut name_pad = header_size;
while name_pad & 3 != 0 {
target.write_u8(0).await?;
name_pad += 1;
}
let mut content = content.take(size as u64);
let copied = copy(&mut content, &mut target).await?;
if copied < size as u64 {
bail!("cpio: not enough data, or size to big - encoding invalid");
}
let mut data_pad = copied;
while data_pad & 3 != 0 {
target.write_u8(0).await?;
data_pad += 1;
}
Ok(())
}
/// Write the TRAILER!!! file to an AsyncWrite, signifying the end of a cpio archive. Note that you
/// can immediately add more files after, to create a concatenated archive, the kernel for example
/// will merge these upon loading an initramfs.
pub async fn append_trailer<W: AsyncWrite + Unpin>(target: W) -> Result<(), Error> {
let name = CString::new("TRAILER!!!").unwrap();
append_file(target, tokio::io::empty(), &name, 0, 0, 0, 0, 0, 0).await
}
async fn print_cpio_hex<W: AsyncWrite + Unpin>(target: &mut W, value: u64) -> Result<(), Error> {
target.write_all(format!("{:08x}", value).as_bytes()).await.map_err(|e| e.into())
}

View File

@ -0,0 +1,511 @@
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
use serde_json::{json, Value};
use proxmox::api::{
api,
cli::{
default_table_format_options, format_and_print_result_full, get_output_format,
run_cli_command, CliCommand, CliCommandMap, CliEnvironment, ColumnConfig, OUTPUT_FORMAT,
},
};
use proxmox::tools::fs::{create_path, CreateOptions};
use pxar::accessor::aio::Accessor;
use pxar::decoder::aio::Decoder;
use pbs_api_types::CryptMode;
use pbs_datastore::{CryptConfig, CATALOG_NAME};
use pbs_datastore::backup_info::BackupDir;
use pbs_datastore::catalog::{ArchiveEntry, CatalogReader, DirEntryAttribute};
use pbs_datastore::dynamic_index::{BufferedDynamicReader, LocalDynamicReadAt};
use pbs_datastore::index::IndexFile;
use pbs_datastore::key_derivation::decrypt_key;
use pbs_client::{BackupReader, RemoteChunkReader};
use pbs_client::pxar::{create_zip, extract_sub_dir, extract_sub_dir_seq};
use pbs_client::tools::{
complete_group_or_snapshot, complete_repository, connect, extract_repository_from_value,
key_source::{
crypto_parameters_keep_fd, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
KEYFILE_SCHEMA,
},
REPO_URL_SCHEMA,
};
pub mod block_driver;
pub use block_driver::*;
pub mod cpio;
mod qemu_helper;
mod block_driver_qemu;
enum ExtractPath {
ListArchives,
Pxar(String, Vec<u8>),
VM(String, Vec<u8>),
}
fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
let mut bytes = if base64 {
base64::decode(&path)
.map_err(|err| format_err!("Failed base64-decoding path '{}' - {}", path, err))?
} else {
path.into_bytes()
};
if bytes == b"/" {
return Ok(ExtractPath::ListArchives);
}
while !bytes.is_empty() && bytes[0] == b'/' {
bytes.remove(0);
}
let (file, path) = {
let slash_pos = bytes.iter().position(|c| *c == b'/').unwrap_or(bytes.len());
let path = bytes.split_off(slash_pos);
let file = String::from_utf8(bytes)?;
(file, path)
};
if file.ends_with(".pxar.didx") {
Ok(ExtractPath::Pxar(file, path))
} else if file.ends_with(".img.fidx") {
Ok(ExtractPath::VM(file, path))
} else {
bail!("'{}' is not supported for file-restore", file);
}
}
fn keyfile_path(param: &Value) -> Option<String> {
if let Some(Value::String(keyfile)) = param.get("keyfile") {
return Some(keyfile.to_owned());
}
if let Some(Value::Number(keyfd)) = param.get("keyfd") {
return Some(format!("/dev/fd/{}", keyfd));
}
None
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
snapshot: {
type: String,
description: "Group/Snapshot path.",
},
"path": {
description: "Path to restore. Directories will be restored as .zip files.",
type: String,
},
"base64": {
type: Boolean,
description: "If set, 'path' will be interpreted as base64 encoded.",
optional: true,
default: false,
},
keyfile: {
schema: KEYFILE_SCHEMA,
optional: true,
},
"keyfd": {
schema: KEYFD_SCHEMA,
optional: true,
},
"crypt-mode": {
type: CryptMode,
optional: true,
},
"driver": {
type: BlockDriverType,
optional: true,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
}
},
returns: {
description: "A list of elements under the given path",
type: Array,
items: {
type: ArchiveEntry,
}
}
)]
/// List a directory from a backup snapshot.
async fn list(
snapshot: String,
path: String,
base64: bool,
param: Value,
) -> Result<(), Error> {
let repo = extract_repository_from_value(&param)?;
let snapshot: BackupDir = snapshot.parse()?;
let path = parse_path(path, base64)?;
let keyfile = keyfile_path(&param);
let crypto = crypto_parameters_keep_fd(&param)?;
let crypt_config = match crypto.enc_key {
None => None,
Some(ref key) => {
let (key, _, _) =
decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
eprintln!("{}", format_key_source(&key.source, "encryption"));
err
})?;
Some(Arc::new(CryptConfig::new(key)?))
}
};
let client = connect(&repo)?;
let client = BackupReader::start(
client,
crypt_config.clone(),
repo.store(),
&snapshot.group().backup_type(),
&snapshot.group().backup_id(),
snapshot.backup_time(),
true,
)
.await?;
let (manifest, _) = client.download_manifest().await?;
manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
let result = match path {
ExtractPath::ListArchives => {
let mut entries = vec![];
for file in manifest.files() {
if !file.filename.ends_with(".pxar.didx") && !file.filename.ends_with(".img.fidx") {
continue;
}
let path = format!("/{}", file.filename);
let attr = if file.filename.ends_with(".pxar.didx") {
// a pxar file is a file archive, so it's root is also a directory root
Some(&DirEntryAttribute::Directory { start: 0 })
} else {
None
};
entries.push(ArchiveEntry::new_with_size(path.as_bytes(), attr, Some(file.size)));
}
Ok(entries)
}
ExtractPath::Pxar(file, mut path) => {
let index = client
.download_dynamic_index(&manifest, CATALOG_NAME)
.await?;
let most_used = index.find_most_used_chunks(8);
let file_info = manifest.lookup_file_info(&CATALOG_NAME)?;
let chunk_reader = RemoteChunkReader::new(
client.clone(),
crypt_config,
file_info.chunk_crypt_mode(),
most_used,
);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let mut catalog_reader = CatalogReader::new(reader);
let mut fullpath = file.into_bytes();
fullpath.append(&mut path);
catalog_reader.list_dir_contents(&fullpath)
}
ExtractPath::VM(file, path) => {
let details = SnapRestoreDetails {
manifest,
repo,
snapshot,
keyfile,
};
let driver: Option<BlockDriverType> = match param.get("driver") {
Some(drv) => Some(serde_json::from_value(drv.clone())?),
None => None,
};
data_list(driver, details, file, path).await
}
}?;
let options = default_table_format_options()
.sortby("type", false)
.sortby("text", false)
.column(ColumnConfig::new("type"))
.column(ColumnConfig::new("text").header("name"))
.column(ColumnConfig::new("mtime").header("last modified"))
.column(ColumnConfig::new("size"));
let output_format = get_output_format(&param);
format_and_print_result_full(
&mut json!(result),
&API_METHOD_LIST.returns,
&output_format,
&options,
);
Ok(())
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
snapshot: {
type: String,
description: "Group/Snapshot path.",
},
"path": {
description: "Path to restore. Directories will be restored as .zip files if extracted to stdout.",
type: String,
},
"base64": {
type: Boolean,
description: "If set, 'path' will be interpreted as base64 encoded.",
optional: true,
default: false,
},
target: {
type: String,
optional: true,
description: "Target directory path. Use '-' to write to standard output.",
},
keyfile: {
schema: KEYFILE_SCHEMA,
optional: true,
},
"keyfd": {
schema: KEYFD_SCHEMA,
optional: true,
},
"crypt-mode": {
type: CryptMode,
optional: true,
},
verbose: {
type: Boolean,
description: "Print verbose information",
optional: true,
default: false,
},
"driver": {
type: BlockDriverType,
optional: true,
},
}
}
)]
/// Restore files from a backup snapshot.
async fn extract(
snapshot: String,
path: String,
base64: bool,
target: Option<String>,
verbose: bool,
param: Value,
) -> Result<(), Error> {
let repo = extract_repository_from_value(&param)?;
let snapshot: BackupDir = snapshot.parse()?;
let orig_path = path;
let path = parse_path(orig_path.clone(), base64)?;
let target = match target {
Some(target) if target == "-" => None,
Some(target) => Some(PathBuf::from(target)),
None => Some(std::env::current_dir()?),
};
let keyfile = keyfile_path(&param);
let crypto = crypto_parameters_keep_fd(&param)?;
let crypt_config = match crypto.enc_key {
None => None,
Some(ref key) => {
let (key, _, _) =
decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
eprintln!("{}", format_key_source(&key.source, "encryption"));
err
})?;
Some(Arc::new(CryptConfig::new(key)?))
}
};
let client = connect(&repo)?;
let client = BackupReader::start(
client,
crypt_config.clone(),
repo.store(),
&snapshot.group().backup_type(),
&snapshot.group().backup_id(),
snapshot.backup_time(),
true,
)
.await?;
let (manifest, _) = client.download_manifest().await?;
match path {
ExtractPath::Pxar(archive_name, path) => {
let file_info = manifest.lookup_file_info(&archive_name)?;
let index = client
.download_dynamic_index(&manifest, &archive_name)
.await?;
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(
client.clone(),
crypt_config,
file_info.chunk_crypt_mode(),
most_used,
);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader = LocalDynamicReadAt::new(reader);
let decoder = Accessor::new(reader, archive_size).await?;
extract_to_target(decoder, &path, target, verbose).await?;
}
ExtractPath::VM(file, path) => {
let details = SnapRestoreDetails {
manifest,
repo,
snapshot,
keyfile,
};
let driver: Option<BlockDriverType> = match param.get("driver") {
Some(drv) => Some(serde_json::from_value(drv.clone())?),
None => None,
};
if let Some(mut target) = target {
let reader = data_extract(driver, details, file, path.clone(), true).await?;
let decoder = Decoder::from_tokio(reader).await?;
extract_sub_dir_seq(&target, decoder, verbose).await?;
// we extracted a .pxarexclude-cli file auto-generated by the VM when encoding the
// archive, this file is of no use for the user, so try to remove it
target.push(".pxarexclude-cli");
std::fs::remove_file(target).map_err(|e| {
format_err!("unable to remove temporary .pxarexclude-cli file - {}", e)
})?;
} else {
let mut reader = data_extract(driver, details, file, path.clone(), false).await?;
tokio::io::copy(&mut reader, &mut tokio::io::stdout()).await?;
}
}
_ => {
bail!("cannot extract '{}'", orig_path);
}
}
Ok(())
}
async fn extract_to_target<T>(
decoder: Accessor<T>,
path: &[u8],
target: Option<PathBuf>,
verbose: bool,
) -> Result<(), Error>
where
T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
{
let path = if path.is_empty() { b"/" } else { path };
let root = decoder.open_root().await?;
let file = root
.lookup(OsStr::from_bytes(path))
.await?
.ok_or_else(|| format_err!("error opening '{:?}'", path))?;
if let Some(target) = target {
extract_sub_dir(target, decoder, OsStr::from_bytes(path), verbose).await?;
} else {
match file.kind() {
pxar::EntryKind::File { .. } => {
tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout()).await?;
}
_ => {
create_zip(
tokio::io::stdout(),
decoder,
OsStr::from_bytes(path),
verbose,
)
.await?;
}
}
}
Ok(())
}
fn main() {
let list_cmd_def = CliCommand::new(&API_METHOD_LIST)
.arg_param(&["snapshot", "path"])
.completion_cb("repository", complete_repository)
.completion_cb("snapshot", complete_group_or_snapshot);
let restore_cmd_def = CliCommand::new(&API_METHOD_EXTRACT)
.arg_param(&["snapshot", "path", "target"])
.completion_cb("repository", complete_repository)
.completion_cb("snapshot", complete_group_or_snapshot)
.completion_cb("target", pbs_tools::fs::complete_file_name);
let status_cmd_def = CliCommand::new(&API_METHOD_STATUS);
let stop_cmd_def = CliCommand::new(&API_METHOD_STOP)
.arg_param(&["name"])
.completion_cb("name", complete_block_driver_ids);
let cmd_def = CliCommandMap::new()
.insert("list", list_cmd_def)
.insert("extract", restore_cmd_def)
.insert("status", status_cmd_def)
.insert("stop", stop_cmd_def);
let rpcenv = CliEnvironment::new();
run_cli_command(
cmd_def,
rpcenv,
Some(|future| pbs_runtime::main(future)),
);
}
/// Returns a runtime dir owned by the current user.
/// Note that XDG_RUNTIME_DIR is not always available, especially for non-login users like
/// "www-data", so we use a custom one in /run/proxmox-backup/<uid> instead.
pub fn get_user_run_dir() -> Result<std::path::PathBuf, Error> {
let uid = nix::unistd::Uid::current();
let mut path: std::path::PathBuf = pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR.into();
path.push(uid.to_string());
create_run_dir()?;
std::fs::create_dir_all(&path)?;
Ok(path)
}
/// FIXME: proxmox-file-restore should not depend on this!
fn create_run_dir() -> Result<(), Error> {
let backup_user = backup_user()?;
let opts = CreateOptions::new()
.owner(backup_user.uid)
.group(backup_user.gid);
let _: bool = create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), None, Some(opts))?;
Ok(())
}
/// Return User info for the 'backup' user (``getpwnam_r(3)``)
pub fn backup_user() -> Result<nix::unistd::User, Error> {
pbs_tools::sys::query_user(pbs_buildcfg::BACKUP_USER_NAME)?
.ok_or_else(|| format_err!("Unable to lookup '{}' user.", pbs_buildcfg::BACKUP_USER_NAME))
}

View File

@ -0,0 +1,329 @@
//! Helper to start a QEMU VM for single file restore.
use std::fs::{File, OpenOptions};
use std::io::prelude::*;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, format_err, Error};
use tokio::time;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use proxmox::tools::fs::{create_path, file_read_string, make_tmp_file, CreateOptions};
use pbs_client::{VsockClient, DEFAULT_VSOCK_PORT};
use crate::{cpio, backup_user};
use super::SnapRestoreDetails;
const PBS_VM_NAME: &str = "pbs-restore-vm";
const MAX_CID_TRIES: u64 = 32;
fn create_restore_log_dir() -> Result<String, Error> {
let logpath = format!("{}/file-restore", pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR);
proxmox::try_block!({
let backup_user = backup_user()?;
let opts = CreateOptions::new()
.owner(backup_user.uid)
.group(backup_user.gid);
let opts_root = CreateOptions::new()
.owner(nix::unistd::ROOT)
.group(nix::unistd::Gid::from_raw(0));
create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts))?;
create_path(&logpath, None, Some(opts_root))?;
Ok(())
})
.map_err(|err: Error| format_err!("unable to create file-restore log dir - {}", err))?;
Ok(logpath)
}
fn validate_img_existance(debug: bool) -> Result<(), Error> {
let kernel = PathBuf::from(pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN);
let initramfs = PathBuf::from(if debug {
pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
} else {
pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
});
if !kernel.exists() || !initramfs.exists() {
bail!("cannot run file-restore VM: package 'proxmox-backup-restore-image' is not (correctly) installed");
}
Ok(())
}
pub fn try_kill_vm(pid: i32) -> Result<(), Error> {
let pid = Pid::from_raw(pid);
if let Ok(()) = kill(pid, None) {
// process is running (and we could kill it), check if it is actually ours
// (if it errors assume we raced with the process's death and ignore it)
if let Ok(cmdline) = file_read_string(format!("/proc/{}/cmdline", pid)) {
if cmdline.split('\0').any(|a| a == PBS_VM_NAME) {
// yes, it's ours, kill it brutally with SIGKILL, no reason to take
// any chances - in this state it's most likely broken anyway
if let Err(err) = kill(pid, Signal::SIGKILL) {
bail!(
"reaping broken VM (pid {}) with SIGKILL failed: {}",
pid,
err
);
}
}
}
}
Ok(())
}
async fn create_temp_initramfs(ticket: &str, debug: bool) -> Result<(File, String), Error> {
use std::ffi::CString;
use tokio::fs::File;
let (tmp_file, tmp_path) =
make_tmp_file("/tmp/file-restore-qemu.initramfs.tmp", CreateOptions::new())?;
nix::unistd::unlink(&tmp_path)?;
pbs_tools::fd::fd_change_cloexec(tmp_file.as_raw_fd(), false)?;
let initramfs = if debug {
pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
} else {
pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
};
let mut f = File::from_std(tmp_file);
let mut base = File::open(initramfs).await?;
tokio::io::copy(&mut base, &mut f).await?;
let name = CString::new("ticket").unwrap();
cpio::append_file(
&mut f,
ticket.as_bytes(),
&name,
0,
(libc::S_IFREG | 0o400) as u16,
0,
0,
0,
ticket.len() as u32,
)
.await?;
cpio::append_trailer(&mut f).await?;
let tmp_file = f.into_std().await;
let path = format!("/dev/fd/{}", &tmp_file.as_raw_fd());
Ok((tmp_file, path))
}
pub async fn start_vm(
// u16 so we can do wrapping_add without going too high
mut cid: u16,
details: &SnapRestoreDetails,
files: impl Iterator<Item = String>,
ticket: &str,
) -> Result<(i32, i32), Error> {
if let Err(_) = std::env::var("PBS_PASSWORD") {
bail!("environment variable PBS_PASSWORD has to be set for QEMU VM restore");
}
let debug = if let Ok(val) = std::env::var("PBS_QEMU_DEBUG") {
!val.is_empty()
} else {
false
};
validate_img_existance(debug)?;
let pid;
let (mut pid_file, pid_path) = make_tmp_file("/tmp/file-restore-qemu.pid.tmp", CreateOptions::new())?;
nix::unistd::unlink(&pid_path)?;
pbs_tools::fd::fd_change_cloexec(pid_file.as_raw_fd(), false)?;
let (_ramfs_pid, ramfs_path) = create_temp_initramfs(ticket, debug).await?;
let logpath = create_restore_log_dir()?;
let logfile = &format!("{}/qemu.log", logpath);
let mut logrotate = pbs_tools::logrotate::LogRotate::new(logfile, false)
.ok_or_else(|| format_err!("could not get QEMU log file names"))?;
if let Err(err) = logrotate.do_rotate(CreateOptions::default(), Some(16)) {
eprintln!("warning: logrotate for QEMU log file failed - {}", err);
}
let mut logfd = OpenOptions::new()
.append(true)
.create_new(true)
.open(logfile)?;
pbs_tools::fd::fd_change_cloexec(logfd.as_raw_fd(), false)?;
// preface log file with start timestamp so one can see how long QEMU took to start
writeln!(logfd, "[{}] PBS file restore VM log", {
let now = proxmox::tools::time::epoch_i64();
proxmox::tools::time::epoch_to_rfc3339(now)?
},)?;
let base_args = [
"-chardev",
&format!(
"file,id=log,path=/dev/null,logfile=/dev/fd/{},logappend=on",
logfd.as_raw_fd()
),
"-serial",
"chardev:log",
"-vnc",
"none",
"-enable-kvm",
"-kernel",
pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN,
"-initrd",
&ramfs_path,
"-append",
&format!(
"{} panic=1 zfs_arc_min=0 zfs_arc_max=0",
if debug { "debug" } else { "quiet" }
),
"-daemonize",
"-pidfile",
&format!("/dev/fd/{}", pid_file.as_raw_fd()),
"-name",
PBS_VM_NAME,
];
// Generate drive arguments for all fidx files in backup snapshot
let mut drives = Vec::new();
let mut id = 0;
for file in files {
if !file.ends_with(".img.fidx") {
continue;
}
drives.push("-drive".to_owned());
let keyfile = if let Some(ref keyfile) = details.keyfile {
format!(",,keyfile={}", keyfile)
} else {
"".to_owned()
};
drives.push(format!(
"file=pbs:repository={},,snapshot={},,archive={}{},read-only=on,if=none,id=drive{}",
details.repo, details.snapshot, file, keyfile, id
));
// a PCI bus can only support 32 devices, so add a new one every 32
let bus = (id / 32) + 2;
if id % 32 == 0 {
drives.push("-device".to_owned());
drives.push(format!("pci-bridge,id=bridge{},chassis_nr={}", bus, bus));
}
drives.push("-device".to_owned());
// drive serial is used by VM to map .fidx files to /dev paths
let serial = file.strip_suffix(".img.fidx").unwrap_or(&file);
drives.push(format!(
"virtio-blk-pci,drive=drive{},serial={},bus=bridge{}",
id, serial, bus
));
id += 1;
}
let ram = if debug {
1024
} else {
// add more RAM if many drives are given
match id {
f if f < 10 => 192,
f if f < 20 => 256,
_ => 384,
}
};
// Try starting QEMU in a loop to retry if we fail because of a bad 'cid' value
let mut attempts = 0;
loop {
let mut qemu_cmd = std::process::Command::new("qemu-system-x86_64");
qemu_cmd.args(base_args.iter());
qemu_cmd.arg("-m");
qemu_cmd.arg(ram.to_string());
qemu_cmd.args(&drives);
qemu_cmd.arg("-device");
qemu_cmd.arg(format!(
"vhost-vsock-pci,guest-cid={},disable-legacy=on",
cid
));
if debug {
let debug_args = [
"-chardev",
&format!(
"socket,id=debugser,path=/run/proxmox-backup/file-restore-serial-{}.sock,server,nowait",
cid
),
"-serial",
"chardev:debugser",
];
qemu_cmd.args(debug_args.iter());
}
qemu_cmd.stdout(std::process::Stdio::null());
qemu_cmd.stderr(std::process::Stdio::piped());
let res = tokio::task::block_in_place(|| qemu_cmd.spawn()?.wait_with_output())?;
if res.status.success() {
// at this point QEMU is already daemonized and running, so if anything fails we
// technically leave behind a zombie-VM... this shouldn't matter, as it will stop
// itself soon enough (timer), and the following operations are unlikely to fail
let mut pidstr = String::new();
pid_file.read_to_string(&mut pidstr)?;
pid = pidstr.trim_end().parse().map_err(|err| {
format_err!("cannot parse PID returned by QEMU ('{}'): {}", &pidstr, err)
})?;
break;
} else {
let out = String::from_utf8_lossy(&res.stderr);
if out.contains("unable to set guest cid: Address already in use") {
attempts += 1;
if attempts >= MAX_CID_TRIES {
bail!("CID '{}' in use, but max attempts reached, aborting", cid);
}
// CID in use, try next higher one
eprintln!("CID '{}' in use by other VM, attempting next one", cid);
// skip special-meaning low values
cid = cid.wrapping_add(1).max(10);
} else {
eprint!("{}", out);
bail!("Starting VM failed. See output above for more information.");
}
}
}
// QEMU has started successfully, now wait for virtio socket to become ready
let pid_t = Pid::from_raw(pid);
for _ in 0..60 {
let client = VsockClient::new(cid as i32, DEFAULT_VSOCK_PORT, Some(ticket.to_owned()));
if let Ok(Ok(_)) =
time::timeout(Duration::from_secs(2), client.get("api2/json/status", None)).await
{
if debug {
eprintln!(
"Connect to '/run/proxmox-backup/file-restore-serial-{}.sock' for shell access",
cid
)
}
return Ok((pid, cid as i32));
}
if kill(pid_t, None).is_err() { // check if QEMU process exited in between
bail!("VM exited before connection could be established");
}
time::sleep(Duration::from_millis(200)).await;
}
// start failed
if let Err(err) = try_kill_vm(pid) {
eprintln!("killing failed VM failed: {}", err);
}
bail!("starting VM timed out");
}