move proxmox_restore_daemon code into extra crate
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
committed by
Thomas Lamprecht
parent
6fbf0acc76
commit
6523588c8d
36
proxmox-restore-daemon/Cargo.toml
Normal file
36
proxmox-restore-daemon/Cargo.toml
Normal file
@ -0,0 +1,36 @@
|
||||
[package]
|
||||
name = "proxmox-restore-daemon"
|
||||
version = "0.1.0"
|
||||
authors = ["Proxmox Support Team <support@proxmox.com>"]
|
||||
edition = "2018"
|
||||
description = "Proxmox Restore Daemon"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
base64 = "0.12"
|
||||
env_logger = "0.7"
|
||||
futures = "0.3"
|
||||
http = "0.2"
|
||||
hyper = { version = "0.14", features = [ "full" ] }
|
||||
lazy_static = "1.4"
|
||||
libc = "0.2"
|
||||
log = "0.4"
|
||||
nix = "0.19.1"
|
||||
regex = "1.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
tokio = { version = "1.6", features = ["parking_lot", "sync"] }
|
||||
tokio-stream = "0.1.0"
|
||||
tokio-util = { version = "0.6", features = [ "codec", "io" ] }
|
||||
|
||||
pathpatterns = "0.1.2"
|
||||
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
|
||||
|
||||
proxmox = { version = "0.13.3", features = [ "router", "sortable-macro" ] }
|
||||
|
||||
pbs-api-types = { path = "../pbs-api-types" }
|
||||
pbs-runtime = { path = "../pbs-runtime" }
|
||||
pbs-tools = { path = "../pbs-tools" }
|
||||
pbs-datastore = { path = "../pbs-datastore" }
|
||||
proxmox-rest-server = { path = "../proxmox-rest-server" }
|
||||
pbs-client = { path = "../pbs-client" }
|
173
proxmox-restore-daemon/src/main.rs
Normal file
173
proxmox-restore-daemon/src/main.rs
Normal file
@ -0,0 +1,173 @@
|
||||
///! Daemon binary to run inside a micro-VM for secure single file restore of disk images
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
use std::os::unix::{
|
||||
io::{FromRawFd, RawFd},
|
||||
net,
|
||||
};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use lazy_static::lazy_static;
|
||||
use log::{error, info};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use http::request::Parts;
|
||||
use http::Response;
|
||||
use hyper::{Body, StatusCode};
|
||||
use hyper::header;
|
||||
|
||||
use proxmox::api::RpcEnvironmentType;
|
||||
|
||||
use pbs_client::DEFAULT_VSOCK_PORT;
|
||||
use proxmox_rest_server::{ApiConfig, RestServer};
|
||||
|
||||
mod proxmox_restore_daemon;
|
||||
use proxmox_restore_daemon::*;
|
||||
|
||||
/// Maximum amount of pending requests. If saturated, virtio-vsock returns ETIMEDOUT immediately.
|
||||
/// We should never have more than a few requests in queue, so use a low number.
|
||||
pub const MAX_PENDING: usize = 32;
|
||||
|
||||
/// Will be present in base initramfs
|
||||
pub const VM_DETECT_FILE: &str = "/restore-vm-marker";
|
||||
|
||||
lazy_static! {
|
||||
/// The current disks state. Use for accessing data on the attached snapshots.
|
||||
pub static ref DISK_STATE: Arc<Mutex<DiskState>> = {
|
||||
Arc::new(Mutex::new(DiskState::scan().unwrap()))
|
||||
};
|
||||
}
|
||||
|
||||
/// This is expected to be run by 'proxmox-file-restore' within a mini-VM
|
||||
fn main() -> Result<(), Error> {
|
||||
if !Path::new(VM_DETECT_FILE).exists() {
|
||||
bail!(
|
||||
"This binary is not supposed to be run manually, use 'proxmox-file-restore' instead."
|
||||
);
|
||||
}
|
||||
|
||||
// don't have a real syslog (and no persistance), so use env_logger to print to a log file (via
|
||||
// stdout to a serial terminal attached by QEMU)
|
||||
env_logger::from_env(env_logger::Env::default().default_filter_or("info"))
|
||||
.write_style(env_logger::WriteStyle::Never)
|
||||
.format_timestamp_millis()
|
||||
.init();
|
||||
|
||||
info!("setup basic system environment...");
|
||||
setup_system_env().map_err(|err| format_err!("system environment setup failed: {}", err))?;
|
||||
|
||||
// scan all attached disks now, before starting the API
|
||||
// this will panic and stop the VM if anything goes wrong
|
||||
info!("scanning all disks...");
|
||||
{
|
||||
let _disk_state = DISK_STATE.lock().unwrap();
|
||||
}
|
||||
|
||||
info!("disk scan complete, starting main runtime...");
|
||||
|
||||
pbs_runtime::main(run())
|
||||
}
|
||||
|
||||
/// ensure we have our /run dirs, system users and stuff like that setup
|
||||
fn setup_system_env() -> Result<(), Error> {
|
||||
// the API may save some stuff there, e.g., the memcon tracking file
|
||||
// we do not care much, but it's way less headache to just create it
|
||||
std::fs::create_dir_all("/run/proxmox-backup")?;
|
||||
|
||||
// we now ensure that all lock files are owned by the backup user, and as we reuse the
|
||||
// specialized REST module from pbs api/daemon we have some checks there for user/acl stuff
|
||||
// that gets locked, and thus needs the backup system user to work.
|
||||
std::fs::create_dir_all("/etc")?;
|
||||
let mut passwd = File::create("/etc/passwd")?;
|
||||
writeln!(passwd, "root:x:0:0:root:/root:/bin/sh")?;
|
||||
writeln!(passwd, "backup:x:34:34:backup:/var/backups:/usr/sbin/nologin")?;
|
||||
|
||||
let mut group = File::create("/etc/group")?;
|
||||
writeln!(group, "root:x:0:")?;
|
||||
writeln!(group, "backup:x:34:")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_index(
|
||||
_auth_id: Option<String>,
|
||||
_language: Option<String>,
|
||||
_api: &ApiConfig,
|
||||
_parts: Parts,
|
||||
) -> Response<Body> {
|
||||
|
||||
let index = "<center><h1>Proxmox Backup Restore Daemon/h1></center>";
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "text/html")
|
||||
.body(index.into())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn run() -> Result<(), Error> {
|
||||
watchdog_init();
|
||||
|
||||
let auth_config = Arc::new(
|
||||
auth::ticket_auth().map_err(|err| format_err!("reading ticket file failed: {}", err))?,
|
||||
);
|
||||
let config = ApiConfig::new("", &ROUTER, RpcEnvironmentType::PUBLIC, auth_config, get_index)?;
|
||||
let rest_server = RestServer::new(config);
|
||||
|
||||
let vsock_fd = get_vsock_fd()?;
|
||||
let connections = accept_vsock_connections(vsock_fd);
|
||||
let receiver_stream = ReceiverStream::new(connections);
|
||||
let acceptor = hyper::server::accept::from_stream(receiver_stream);
|
||||
|
||||
hyper::Server::builder(acceptor).serve(rest_server).await?;
|
||||
|
||||
bail!("hyper server exited");
|
||||
}
|
||||
|
||||
fn accept_vsock_connections(
|
||||
vsock_fd: RawFd,
|
||||
) -> mpsc::Receiver<Result<tokio::net::UnixStream, Error>> {
|
||||
use nix::sys::socket::*;
|
||||
let (sender, receiver) = mpsc::channel(MAX_PENDING);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let stream: Result<tokio::net::UnixStream, Error> = tokio::task::block_in_place(|| {
|
||||
// we need to accept manually, as UnixListener aborts if socket type != AF_UNIX ...
|
||||
let client_fd = accept(vsock_fd)?;
|
||||
let stream = unsafe { net::UnixStream::from_raw_fd(client_fd) };
|
||||
stream.set_nonblocking(true)?;
|
||||
tokio::net::UnixStream::from_std(stream).map_err(|err| err.into())
|
||||
});
|
||||
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
if sender.send(Ok(stream)).await.is_err() {
|
||||
error!("connection accept channel was closed");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("error accepting vsock connetion: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
receiver
|
||||
}
|
||||
|
||||
fn get_vsock_fd() -> Result<RawFd, Error> {
|
||||
use nix::sys::socket::*;
|
||||
let sock_fd = socket(
|
||||
AddressFamily::Vsock,
|
||||
SockType::Stream,
|
||||
SockFlag::empty(),
|
||||
None,
|
||||
)?;
|
||||
let sock_addr = VsockAddr::new(libc::VMADDR_CID_ANY, DEFAULT_VSOCK_PORT as u32);
|
||||
bind(sock_fd, &SockAddr::Vsock(sock_addr))?;
|
||||
listen(sock_fd, MAX_PENDING)?;
|
||||
Ok(sock_fd)
|
||||
}
|
388
proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
Normal file
388
proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
Normal file
@ -0,0 +1,388 @@
|
||||
///! File-restore API running inside the restore VM
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
use futures::FutureExt;
|
||||
use hyper::http::request::Parts;
|
||||
use hyper::{header, Body, Response, StatusCode};
|
||||
use log::error;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use pathpatterns::{MatchEntry, MatchPattern, MatchType, Pattern};
|
||||
use proxmox::api::{
|
||||
api, schema::*, ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment,
|
||||
SubdirMap,
|
||||
};
|
||||
use proxmox::{identity, list_subdirs_api_method, sortable};
|
||||
|
||||
use pbs_api_types::file_restore::RestoreDaemonStatus;
|
||||
use pbs_client::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
|
||||
use pbs_datastore::catalog::{ArchiveEntry, DirEntryAttribute};
|
||||
use pbs_tools::fs::read_subdir;
|
||||
use pbs_tools::json::required_string_param;
|
||||
use pbs_tools::zip::zip_directory;
|
||||
|
||||
use pxar::encoder::aio::TokioWriter;
|
||||
|
||||
use super::{disk::ResolveResult, watchdog_remaining, watchdog_inhibit, watchdog_ping};
|
||||
|
||||
// NOTE: All API endpoints must have Permission::Superuser, as the configs for authentication do
|
||||
// not exist within the restore VM. Safety is guaranteed by checking a ticket via a custom ApiAuth.
|
||||
|
||||
const SUBDIRS: SubdirMap = &[
|
||||
("extract", &Router::new().get(&API_METHOD_EXTRACT)),
|
||||
("list", &Router::new().get(&API_METHOD_LIST)),
|
||||
("status", &Router::new().get(&API_METHOD_STATUS)),
|
||||
("stop", &Router::new().get(&API_METHOD_STOP)),
|
||||
];
|
||||
|
||||
pub const ROUTER: Router = Router::new()
|
||||
.get(&list_subdirs_api_method!(SUBDIRS))
|
||||
.subdirs(SUBDIRS);
|
||||
|
||||
static DOWNLOAD_SEM: Semaphore = Semaphore::const_new(8);
|
||||
|
||||
fn read_uptime() -> Result<f32, Error> {
|
||||
let uptime = fs::read_to_string("/proc/uptime")?;
|
||||
// unwrap the Option, if /proc/uptime is empty we have bigger problems
|
||||
Ok(uptime.split_ascii_whitespace().next().unwrap().parse()?)
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
"keep-timeout": {
|
||||
type: bool,
|
||||
description: "If true, do not reset the watchdog timer on this API call.",
|
||||
default: false,
|
||||
optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
access: {
|
||||
description: "Permissions are handled outside restore VM. This call can be made without a ticket, but keep-timeout is always assumed 'true' then.",
|
||||
permission: &Permission::World,
|
||||
},
|
||||
returns: {
|
||||
type: RestoreDaemonStatus,
|
||||
}
|
||||
)]
|
||||
/// General status information
|
||||
fn status(rpcenv: &mut dyn RpcEnvironment, keep_timeout: bool) -> Result<RestoreDaemonStatus, Error> {
|
||||
if !keep_timeout && rpcenv.get_auth_id().is_some() {
|
||||
watchdog_ping();
|
||||
}
|
||||
Ok(RestoreDaemonStatus {
|
||||
uptime: read_uptime()? as i64,
|
||||
timeout: watchdog_remaining(),
|
||||
})
|
||||
}
|
||||
|
||||
#[api(
|
||||
access: {
|
||||
description: "Permissions are handled outside restore VM.",
|
||||
permission: &Permission::Superuser,
|
||||
},
|
||||
)]
|
||||
/// Stop the restore VM immediately, this will never return if successful
|
||||
fn stop() {
|
||||
use nix::sys::reboot;
|
||||
println!("/stop called, shutting down");
|
||||
let err = reboot::reboot(reboot::RebootMode::RB_POWER_OFF).unwrap_err();
|
||||
println!("'reboot' syscall failed: {}", err);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
fn get_dir_entry(path: &Path) -> Result<DirEntryAttribute, Error> {
|
||||
use nix::sys::stat;
|
||||
|
||||
let stat = stat::stat(path)?;
|
||||
Ok(match stat.st_mode & libc::S_IFMT {
|
||||
libc::S_IFREG => DirEntryAttribute::File {
|
||||
size: stat.st_size as u64,
|
||||
mtime: stat.st_mtime,
|
||||
},
|
||||
libc::S_IFDIR => DirEntryAttribute::Directory { start: 0 },
|
||||
_ => bail!("unsupported file type: {}", stat.st_mode),
|
||||
})
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
"path": {
|
||||
type: String,
|
||||
description: "base64-encoded path to list files and directories under",
|
||||
},
|
||||
},
|
||||
},
|
||||
access: {
|
||||
description: "Permissions are handled outside restore VM.",
|
||||
permission: &Permission::Superuser,
|
||||
},
|
||||
)]
|
||||
/// List file details for given file or a list of files and directories under the given path if it
|
||||
/// points to a directory.
|
||||
fn list(
|
||||
path: String,
|
||||
_info: &ApiMethod,
|
||||
_rpcenv: &mut dyn RpcEnvironment,
|
||||
) -> Result<Vec<ArchiveEntry>, Error> {
|
||||
watchdog_ping();
|
||||
|
||||
let mut res = Vec::new();
|
||||
|
||||
let param_path = base64::decode(path)?;
|
||||
let mut path = param_path.clone();
|
||||
if let Some(b'/') = path.last() {
|
||||
path.pop();
|
||||
}
|
||||
let path_str = OsStr::from_bytes(&path[..]);
|
||||
let param_path_buf = Path::new(path_str);
|
||||
|
||||
let mut disk_state = crate::DISK_STATE.lock().unwrap();
|
||||
let query_result = disk_state.resolve(¶m_path_buf)?;
|
||||
|
||||
match query_result {
|
||||
ResolveResult::Path(vm_path) => {
|
||||
let root_entry = get_dir_entry(&vm_path)?;
|
||||
match root_entry {
|
||||
DirEntryAttribute::File { .. } => {
|
||||
// list on file, return details
|
||||
res.push(ArchiveEntry::new(¶m_path, Some(&root_entry)));
|
||||
}
|
||||
DirEntryAttribute::Directory { .. } => {
|
||||
// list on directory, return all contained files/dirs
|
||||
for f in read_subdir(libc::AT_FDCWD, &vm_path)? {
|
||||
if let Ok(f) = f {
|
||||
let name = f.file_name().to_bytes();
|
||||
let path = &Path::new(OsStr::from_bytes(name));
|
||||
if path.components().count() == 1 {
|
||||
// ignore '.' and '..'
|
||||
match path.components().next().unwrap() {
|
||||
std::path::Component::CurDir
|
||||
| std::path::Component::ParentDir => continue,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut full_vm_path = PathBuf::new();
|
||||
full_vm_path.push(&vm_path);
|
||||
full_vm_path.push(path);
|
||||
let mut full_path = PathBuf::new();
|
||||
full_path.push(param_path_buf);
|
||||
full_path.push(path);
|
||||
|
||||
let entry = get_dir_entry(&full_vm_path);
|
||||
if let Ok(entry) = entry {
|
||||
res.push(ArchiveEntry::new(
|
||||
full_path.as_os_str().as_bytes(),
|
||||
Some(&entry),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
ResolveResult::BucketTypes(types) => {
|
||||
for t in types {
|
||||
let mut t_path = path.clone();
|
||||
t_path.push(b'/');
|
||||
t_path.extend(t.as_bytes());
|
||||
res.push(ArchiveEntry::new(
|
||||
&t_path[..],
|
||||
None,
|
||||
));
|
||||
}
|
||||
}
|
||||
ResolveResult::BucketComponents(comps) => {
|
||||
for c in comps {
|
||||
let mut c_path = path.clone();
|
||||
c_path.push(b'/');
|
||||
c_path.extend(c.0.as_bytes());
|
||||
res.push(ArchiveEntry::new_with_size(
|
||||
&c_path[..],
|
||||
// this marks the beginning of a filesystem, i.e. '/', so this is a Directory
|
||||
Some(&DirEntryAttribute::Directory { start: 0 }),
|
||||
c.1,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[sortable]
|
||||
pub const API_METHOD_EXTRACT: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&extract),
|
||||
&ObjectSchema::new(
|
||||
"Extract a file or directory from the VM as a pxar archive.",
|
||||
&sorted!([
|
||||
(
|
||||
"path",
|
||||
false,
|
||||
&StringSchema::new("base64-encoded path to list files and directories under")
|
||||
.schema()
|
||||
),
|
||||
(
|
||||
"pxar",
|
||||
true,
|
||||
&BooleanSchema::new(concat!(
|
||||
"if true, return a pxar archive, otherwise either the ",
|
||||
"file content or the directory as a zip file"
|
||||
))
|
||||
.default(true)
|
||||
.schema()
|
||||
)
|
||||
]),
|
||||
),
|
||||
)
|
||||
.access(None, &Permission::Superuser);
|
||||
|
||||
fn extract(
|
||||
_parts: Parts,
|
||||
_req_body: Body,
|
||||
param: Value,
|
||||
_info: &ApiMethod,
|
||||
_rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
// download can take longer than watchdog timeout, inhibit until done
|
||||
let _inhibitor = watchdog_inhibit();
|
||||
async move {
|
||||
let _inhibitor = _inhibitor;
|
||||
|
||||
let _permit = match DOWNLOAD_SEM.try_acquire() {
|
||||
Ok(permit) => permit,
|
||||
Err(_) => bail!("maximum concurrent download limit reached, please wait for another restore to finish before attempting a new one"),
|
||||
};
|
||||
|
||||
let path = required_string_param(¶m, "path")?;
|
||||
let mut path = base64::decode(path)?;
|
||||
if let Some(b'/') = path.last() {
|
||||
path.pop();
|
||||
}
|
||||
let path = Path::new(OsStr::from_bytes(&path[..]));
|
||||
|
||||
let pxar = param["pxar"].as_bool().unwrap_or(true);
|
||||
|
||||
let query_result = {
|
||||
let mut disk_state = crate::DISK_STATE.lock().unwrap();
|
||||
disk_state.resolve(&path)?
|
||||
};
|
||||
|
||||
let vm_path = match query_result {
|
||||
ResolveResult::Path(vm_path) => vm_path,
|
||||
_ => bail!("invalid path, cannot restore meta-directory: {:?}", path),
|
||||
};
|
||||
|
||||
// check here so we can return a real error message, failing in the async task will stop
|
||||
// the transfer, but not return a useful message
|
||||
if !vm_path.exists() {
|
||||
bail!("file or directory {:?} does not exist", path);
|
||||
}
|
||||
|
||||
let (mut writer, reader) = tokio::io::duplex(1024 * 64);
|
||||
|
||||
if pxar {
|
||||
tokio::spawn(async move {
|
||||
let _inhibitor = _inhibitor;
|
||||
let _permit = _permit;
|
||||
let result = async move {
|
||||
// pxar always expects a directory as it's root, so to accommodate files as
|
||||
// well we encode the parent dir with a filter only matching the target instead
|
||||
let mut patterns = vec![MatchEntry::new(
|
||||
MatchPattern::Pattern(Pattern::path(b"*").unwrap()),
|
||||
MatchType::Exclude,
|
||||
)];
|
||||
|
||||
let name = match vm_path.file_name() {
|
||||
Some(name) => name,
|
||||
None => bail!("no file name found for path: {:?}", vm_path),
|
||||
};
|
||||
|
||||
if vm_path.is_dir() {
|
||||
let mut pat = name.as_bytes().to_vec();
|
||||
patterns.push(MatchEntry::new(
|
||||
MatchPattern::Pattern(Pattern::path(pat.clone())?),
|
||||
MatchType::Include,
|
||||
));
|
||||
pat.extend(b"/**/*".iter());
|
||||
patterns.push(MatchEntry::new(
|
||||
MatchPattern::Pattern(Pattern::path(pat)?),
|
||||
MatchType::Include,
|
||||
));
|
||||
} else {
|
||||
patterns.push(MatchEntry::new(
|
||||
MatchPattern::Literal(name.as_bytes().to_vec()),
|
||||
MatchType::Include,
|
||||
));
|
||||
}
|
||||
|
||||
let dir_path = vm_path.parent().unwrap_or_else(|| Path::new("/"));
|
||||
let dir = nix::dir::Dir::open(
|
||||
dir_path,
|
||||
nix::fcntl::OFlag::O_NOFOLLOW,
|
||||
nix::sys::stat::Mode::empty(),
|
||||
)?;
|
||||
|
||||
let options = PxarCreateOptions {
|
||||
entries_max: ENCODER_MAX_ENTRIES,
|
||||
device_set: None,
|
||||
patterns,
|
||||
verbose: false,
|
||||
skip_lost_and_found: false,
|
||||
};
|
||||
|
||||
let pxar_writer = TokioWriter::new(writer);
|
||||
create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options)
|
||||
.await
|
||||
}
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
error!("pxar streaming task failed - {}", err);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
tokio::spawn(async move {
|
||||
let _inhibitor = _inhibitor;
|
||||
let _permit = _permit;
|
||||
let result = async move {
|
||||
if vm_path.is_dir() {
|
||||
zip_directory(&mut writer, &vm_path).await?;
|
||||
Ok(())
|
||||
} else if vm_path.is_file() {
|
||||
let mut file = tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(vm_path)
|
||||
.await?;
|
||||
tokio::io::copy(&mut file, &mut writer).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("invalid entry type for path: {:?}", vm_path);
|
||||
}
|
||||
}
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
error!("file or dir streaming task failed - {}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let stream = tokio_util::io::ReaderStream::new(reader);
|
||||
|
||||
let body = Body::wrap_stream(stream);
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
}
|
||||
.boxed()
|
||||
}
|
54
proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs
Normal file
54
proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs
Normal file
@ -0,0 +1,54 @@
|
||||
//! Authentication via a static ticket file
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
|
||||
use proxmox::api::UserInformation;
|
||||
|
||||
use proxmox_rest_server::{ApiAuth, AuthError};
|
||||
|
||||
const TICKET_FILE: &str = "/ticket";
|
||||
|
||||
struct SimpleUserInformation {}
|
||||
|
||||
impl UserInformation for SimpleUserInformation {
|
||||
fn is_superuser(&self, userid: &str) -> bool {
|
||||
userid == "root@pam"
|
||||
}
|
||||
fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false }
|
||||
fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 }
|
||||
}
|
||||
|
||||
pub struct StaticAuth {
|
||||
ticket: String,
|
||||
}
|
||||
|
||||
impl ApiAuth for StaticAuth {
|
||||
fn check_auth(
|
||||
&self,
|
||||
headers: &http::HeaderMap,
|
||||
_method: &hyper::Method,
|
||||
) -> Result<(String, Box<dyn UserInformation + Send + Sync>), AuthError> {
|
||||
match headers.get(hyper::header::AUTHORIZATION) {
|
||||
Some(header) if header.to_str().unwrap_or("") == &self.ticket => {
|
||||
Ok((String::from("root@pam"), Box::new(SimpleUserInformation {})))
|
||||
}
|
||||
_ => {
|
||||
return Err(AuthError::Generic(format_err!(
|
||||
"invalid file restore ticket provided"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ticket_auth() -> Result<StaticAuth, Error> {
|
||||
let mut ticket_file = File::open(TICKET_FILE)?;
|
||||
let mut ticket = String::new();
|
||||
let len = ticket_file.read_to_string(&mut ticket)?;
|
||||
if len <= 0 {
|
||||
bail!("invalid ticket: cannot be empty");
|
||||
}
|
||||
Ok(StaticAuth { ticket })
|
||||
}
|
801
proxmox-restore-daemon/src/proxmox_restore_daemon/disk.rs
Normal file
801
proxmox-restore-daemon/src/proxmox_restore_daemon/disk.rs
Normal file
@ -0,0 +1,801 @@
|
||||
//! Low-level disk (image) access functions for file restore VMs.
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::process::Command;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use lazy_static::lazy_static;
|
||||
use log::{info, warn};
|
||||
|
||||
use proxmox::const_regex;
|
||||
use proxmox::tools::fs;
|
||||
|
||||
use pbs_api_types::BLOCKDEVICE_NAME_REGEX;
|
||||
use pbs_tools::run_command;
|
||||
|
||||
const_regex! {
|
||||
VIRTIO_PART_REGEX = r"^vd[a-z]+(\d+)$";
|
||||
ZPOOL_POOL_NAME_REGEX = r"^ {3}pool: (.*)$";
|
||||
ZPOOL_IMPORT_DISK_REGEX = r"^\t {2,4}(vd[a-z]+(?:\d+)?)\s+ONLINE$";
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref FS_OPT_MAP: HashMap<&'static str, &'static str> = {
|
||||
let mut m = HashMap::new();
|
||||
|
||||
// otherwise ext complains about mounting read-only
|
||||
m.insert("ext2", "noload");
|
||||
m.insert("ext3", "noload");
|
||||
m.insert("ext4", "noload");
|
||||
|
||||
m.insert("xfs", "norecovery");
|
||||
|
||||
// ufs2 is used as default since FreeBSD 5.0 released in 2003, so let's assume that
|
||||
// whatever the user is trying to restore is not using anything older...
|
||||
m.insert("ufs", "ufstype=ufs2");
|
||||
|
||||
m.insert("ntfs", "utf8");
|
||||
|
||||
m
|
||||
};
|
||||
}
|
||||
|
||||
pub enum ResolveResult {
|
||||
Path(PathBuf),
|
||||
BucketTypes(Vec<&'static str>),
|
||||
BucketComponents(Vec<(String, Option<u64>)>),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PartitionBucketData {
|
||||
dev_node: String,
|
||||
number: i32,
|
||||
mountpoint: Option<PathBuf>,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ZFSBucketData {
|
||||
name: String,
|
||||
mountpoint: Option<PathBuf>,
|
||||
size: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct LVMBucketData {
|
||||
vg_name: String,
|
||||
lv_name: String,
|
||||
mountpoint: Option<PathBuf>,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
/// A "Bucket" represents a mapping found on a disk, e.g. a partition, a zfs dataset or an LV. A
|
||||
/// uniquely identifying path to a file then consists of four components:
|
||||
/// "/disk/bucket/component/path"
|
||||
/// where
|
||||
/// disk: fidx file name
|
||||
/// bucket: bucket type
|
||||
/// component: identifier of the specific bucket
|
||||
/// path: relative path of the file on the filesystem indicated by the other parts, may contain
|
||||
/// more subdirectories
|
||||
/// e.g.: "/drive-scsi0/part/0/etc/passwd"
|
||||
#[derive(Clone)]
|
||||
enum Bucket {
|
||||
Partition(PartitionBucketData),
|
||||
RawFs(PartitionBucketData),
|
||||
ZPool(ZFSBucketData),
|
||||
LVM(LVMBucketData),
|
||||
}
|
||||
|
||||
impl Bucket {
|
||||
fn filter_mut<'a, A: AsRef<str>, B: AsRef<str>>(
|
||||
haystack: &'a mut Vec<Bucket>,
|
||||
ty: A,
|
||||
comp: &[B],
|
||||
) -> Option<&'a mut Bucket> {
|
||||
let ty = ty.as_ref();
|
||||
haystack.iter_mut().find(|b| match b {
|
||||
Bucket::Partition(data) => {
|
||||
if let Some(comp) = comp.get(0) {
|
||||
ty == "part" && comp.as_ref().parse::<i32>().unwrap() == data.number
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
Bucket::RawFs(_) => ty == "raw",
|
||||
Bucket::ZPool(data) => {
|
||||
if let Some(ref comp) = comp.get(0) {
|
||||
ty == "zpool" && comp.as_ref() == &data.name
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
Bucket::LVM(data) => {
|
||||
if let (Some(ref vg), Some(ref lv)) = (comp.get(0), comp.get(1)) {
|
||||
ty == "lvm" && vg.as_ref() == &data.vg_name && lv.as_ref() == &data.lv_name
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn type_string(&self) -> &'static str {
|
||||
match self {
|
||||
Bucket::Partition(_) => "part",
|
||||
Bucket::RawFs(_) => "raw",
|
||||
Bucket::ZPool(_) => "zpool",
|
||||
Bucket::LVM(_) => "lvm",
|
||||
}
|
||||
}
|
||||
|
||||
fn component_string(&self, idx: usize) -> Result<String, Error> {
|
||||
let max_depth = Self::component_depth(self.type_string())?;
|
||||
if idx >= max_depth {
|
||||
bail!(
|
||||
"internal error: component index out of range {}/{} ({})",
|
||||
idx,
|
||||
max_depth,
|
||||
self.type_string()
|
||||
);
|
||||
}
|
||||
Ok(match self {
|
||||
Bucket::Partition(data) => data.number.to_string(),
|
||||
Bucket::RawFs(_) => "raw".to_owned(),
|
||||
Bucket::ZPool(data) => data.name.clone(),
|
||||
Bucket::LVM(data) => {
|
||||
if idx == 0 {
|
||||
data.vg_name.clone()
|
||||
} else {
|
||||
data.lv_name.clone()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn component_depth(type_string: &str) -> Result<usize, Error> {
|
||||
Ok(match type_string {
|
||||
"part" => 1,
|
||||
"raw" => 0,
|
||||
"zpool" => 1,
|
||||
"lvm" => 2,
|
||||
_ => bail!("invalid bucket type for component depth: {}", type_string),
|
||||
})
|
||||
}
|
||||
|
||||
fn size(&self, idx: usize) -> Option<u64> {
|
||||
match self {
|
||||
Bucket::Partition(data) | Bucket::RawFs(data) => Some(data.size),
|
||||
Bucket::ZPool(data) => data.size,
|
||||
Bucket::LVM(data) => {
|
||||
if idx == 1 {
|
||||
Some(data.size)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Functions related to the local filesystem. This mostly exists so we can use 'supported_fs' in
|
||||
/// try_mount while a Bucket is still mutably borrowed from DiskState.
|
||||
struct Filesystems {
|
||||
supported_fs: Vec<String>,
|
||||
}
|
||||
|
||||
impl Filesystems {
|
||||
fn scan() -> Result<Self, Error> {
|
||||
// detect kernel supported filesystems
|
||||
let mut supported_fs = Vec::new();
|
||||
for f in BufReader::new(File::open("/proc/filesystems")?)
|
||||
.lines()
|
||||
.filter_map(Result::ok)
|
||||
{
|
||||
// ZFS is treated specially, don't attempt to do a regular mount with it
|
||||
let f = f.trim();
|
||||
if !f.starts_with("nodev") && f != "zfs" {
|
||||
supported_fs.push(f.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
info!("Supported FS: {}", supported_fs.join(", "));
|
||||
|
||||
Ok(Self { supported_fs })
|
||||
}
|
||||
|
||||
fn ensure_mounted(&self, bucket: &mut Bucket) -> Result<PathBuf, Error> {
|
||||
match bucket {
|
||||
Bucket::Partition(data) | Bucket::RawFs(data) => {
|
||||
// regular data partition à la "/dev/vdxN" or FS directly on a disk
|
||||
if let Some(mp) = &data.mountpoint {
|
||||
return Ok(mp.clone());
|
||||
}
|
||||
|
||||
let mp = format!("/mnt{}/", data.dev_node);
|
||||
self.try_mount(&data.dev_node, &mp)?;
|
||||
let mp = PathBuf::from(mp);
|
||||
data.mountpoint = Some(mp.clone());
|
||||
Ok(mp)
|
||||
}
|
||||
Bucket::ZPool(data) => {
|
||||
if let Some(mp) = &data.mountpoint {
|
||||
return Ok(mp.clone());
|
||||
}
|
||||
|
||||
let mntpath = format!("/mnt/zpool/{}", &data.name);
|
||||
create_dir_all(&mntpath)?;
|
||||
|
||||
// call ZFS tools to import and mount the pool with the root mount at 'mntpath'
|
||||
let mut cmd = Command::new("/sbin/zpool");
|
||||
cmd.args(
|
||||
[
|
||||
"import",
|
||||
"-f",
|
||||
"-o",
|
||||
"readonly=on",
|
||||
"-d",
|
||||
"/dev",
|
||||
"-R",
|
||||
&mntpath,
|
||||
&data.name,
|
||||
]
|
||||
.iter(),
|
||||
);
|
||||
if let Err(msg) = run_command(cmd, None) {
|
||||
// ignore double import, this may happen if a previous attempt failed further
|
||||
// down below - this way we can at least try again
|
||||
if !msg
|
||||
.to_string()
|
||||
.contains("a pool with that name already exists")
|
||||
{
|
||||
return Err(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// 'mount -a' simply mounts all datasets that haven't been automounted, which
|
||||
// should only be ones that we've imported just now
|
||||
let mut cmd = Command::new("/sbin/zfs");
|
||||
cmd.args(["mount", "-a"].iter());
|
||||
run_command(cmd, None)?;
|
||||
|
||||
// detect any datasets with 'legacy' mountpoints
|
||||
let mut cmd = Command::new("/sbin/zfs");
|
||||
cmd.args(["list", "-Hpro", "name,mountpoint", &data.name].iter());
|
||||
let mps = run_command(cmd, None)?;
|
||||
for subvol in mps.lines() {
|
||||
let subvol = subvol.splitn(2, '\t').collect::<Vec<&str>>();
|
||||
if subvol.len() != 2 {
|
||||
continue;
|
||||
}
|
||||
let name = subvol[0];
|
||||
let mp = subvol[1];
|
||||
|
||||
if mp == "legacy" {
|
||||
let mut newmp = PathBuf::from(format!(
|
||||
"{}/legacy-{}",
|
||||
&mntpath,
|
||||
name.replace('/', "_")
|
||||
));
|
||||
let mut i = 1;
|
||||
while newmp.exists() {
|
||||
newmp.set_extension(i.to_string());
|
||||
i += 1;
|
||||
}
|
||||
create_dir_all(&newmp)?;
|
||||
self.do_mount(Some(name), newmp.to_string_lossy().as_ref(), "zfs")?;
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we have imported the pool, we can also query the size
|
||||
let mut cmd = Command::new("/sbin/zpool");
|
||||
cmd.args(["list", "-o", "size", "-Hp", &data.name].iter());
|
||||
let size = run_command(cmd, None)?;
|
||||
if let Ok(size) = size.trim().parse::<u64>() {
|
||||
data.size = Some(size);
|
||||
}
|
||||
|
||||
let mp = PathBuf::from(mntpath);
|
||||
data.mountpoint = Some(mp.clone());
|
||||
Ok(mp)
|
||||
}
|
||||
Bucket::LVM(data) => {
|
||||
if let Some(mp) = &data.mountpoint {
|
||||
return Ok(mp.clone());
|
||||
}
|
||||
|
||||
let mntpath = format!("/mnt/lvm/{}/{}", &data.vg_name, &data.lv_name);
|
||||
create_dir_all(&mntpath)?;
|
||||
|
||||
let mapper_path = format!(
|
||||
"/dev/mapper/{}-{}",
|
||||
&data.vg_name.replace('-', "--"),
|
||||
&data.lv_name.replace('-', "--")
|
||||
);
|
||||
self.try_mount(&mapper_path, &mntpath)?;
|
||||
|
||||
let mp = PathBuf::from(mntpath);
|
||||
data.mountpoint = Some(mp.clone());
|
||||
Ok(mp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_mount(&self, source: &str, target: &str) -> Result<(), Error> {
|
||||
create_dir_all(target)?;
|
||||
|
||||
// try all supported fs until one works - this is the way Busybox's 'mount' does it too:
|
||||
// https://git.busybox.net/busybox/tree/util-linux/mount.c?id=808d93c0eca49e0b22056e23d965f0d967433fbb#n2152
|
||||
// note that ZFS is intentionally left out (see scan())
|
||||
for fs in &self.supported_fs {
|
||||
let fs: &str = fs.as_ref();
|
||||
match self.do_mount(Some(source), target, fs) {
|
||||
Ok(()) => {
|
||||
info!("mounting '{}' succeeded, fstype: '{}'", source, fs);
|
||||
return Ok(());
|
||||
}
|
||||
Err(nix::Error::Sys(nix::errno::Errno::EINVAL)) => {}
|
||||
Err(nix::Error::Sys(nix::errno::Errno::EBUSY)) => return Ok(()),
|
||||
Err(err) => {
|
||||
warn!("mount error on '{}' ({}) - {}", source, fs, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bail!("all mounts failed or no supported file system")
|
||||
}
|
||||
|
||||
fn do_mount(&self, source: Option<&str>, target: &str, fs: &str) -> Result<(), nix::Error> {
|
||||
use nix::mount::*;
|
||||
let flags =
|
||||
MsFlags::MS_RDONLY | MsFlags::MS_NOEXEC | MsFlags::MS_NOSUID | MsFlags::MS_NODEV;
|
||||
let opts = FS_OPT_MAP.get(fs).copied();
|
||||
mount(source, target, Some(fs), flags, opts)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DiskState {
|
||||
filesystems: Filesystems,
|
||||
disk_map: HashMap<String, Vec<Bucket>>,
|
||||
}
|
||||
|
||||
impl DiskState {
|
||||
/// Scan all disks for supported buckets.
|
||||
pub fn scan() -> Result<Self, Error> {
|
||||
let filesystems = Filesystems::scan()?;
|
||||
|
||||
let mut disk_map = HashMap::new();
|
||||
let mut drive_info = HashMap::new();
|
||||
|
||||
// create mapping for virtio drives and .fidx files (via serial description)
|
||||
// note: disks::DiskManager relies on udev, which we don't have
|
||||
for entry in pbs_tools::fs::scan_subdir(
|
||||
libc::AT_FDCWD,
|
||||
"/sys/block",
|
||||
&BLOCKDEVICE_NAME_REGEX,
|
||||
)?
|
||||
.filter_map(Result::ok)
|
||||
{
|
||||
let name = unsafe { entry.file_name_utf8_unchecked() };
|
||||
if !name.starts_with("vd") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let sys_path: &str = &format!("/sys/block/{}", name);
|
||||
|
||||
let serial = fs::file_read_string(&format!("{}/serial", sys_path));
|
||||
let fidx = match serial {
|
||||
Ok(serial) => serial,
|
||||
Err(err) => {
|
||||
warn!("disk '{}': could not read serial file - {}", name, err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
drive_info.insert(name.to_owned(), fidx.clone());
|
||||
|
||||
// attempt to mount device directly
|
||||
let dev_node = format!("/dev/{}", name);
|
||||
let size = Self::make_dev_node(&dev_node, &sys_path)?;
|
||||
let mut dfs_bucket = Bucket::RawFs(PartitionBucketData {
|
||||
dev_node: dev_node.clone(),
|
||||
number: 0,
|
||||
mountpoint: None,
|
||||
size,
|
||||
});
|
||||
if let Ok(_) = filesystems.ensure_mounted(&mut dfs_bucket) {
|
||||
// mount succeeded, add bucket and skip any other checks for the disk
|
||||
info!(
|
||||
"drive '{}' ('{}', '{}') contains fs directly ({}B)",
|
||||
name, fidx, dev_node, size
|
||||
);
|
||||
disk_map.insert(fidx, vec![dfs_bucket]);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut parts = Vec::new();
|
||||
for entry in pbs_tools::fs::scan_subdir(
|
||||
libc::AT_FDCWD,
|
||||
sys_path,
|
||||
&VIRTIO_PART_REGEX,
|
||||
)?
|
||||
.filter_map(Result::ok)
|
||||
{
|
||||
let part_name = unsafe { entry.file_name_utf8_unchecked() };
|
||||
let dev_node = format!("/dev/{}", part_name);
|
||||
let part_path = format!("/sys/block/{}/{}", name, part_name);
|
||||
|
||||
// create partition device node for further use
|
||||
let size = Self::make_dev_node(&dev_node, &part_path)?;
|
||||
|
||||
let number = fs::file_read_firstline(&format!("{}/partition", part_path))?
|
||||
.trim()
|
||||
.parse::<i32>()?;
|
||||
|
||||
info!(
|
||||
"drive '{}' ('{}'): found partition '{}' ({}, {}B)",
|
||||
name, fidx, dev_node, number, size
|
||||
);
|
||||
|
||||
let bucket = Bucket::Partition(PartitionBucketData {
|
||||
dev_node,
|
||||
mountpoint: None,
|
||||
number,
|
||||
size,
|
||||
});
|
||||
|
||||
parts.push(bucket);
|
||||
|
||||
drive_info.insert(part_name.to_owned(), fidx.clone());
|
||||
}
|
||||
|
||||
disk_map.insert(fidx, parts);
|
||||
}
|
||||
|
||||
// After the above, every valid disk should have a device node in /dev, so we can query all
|
||||
// of them for zpools
|
||||
let mut cmd = Command::new("/sbin/zpool");
|
||||
cmd.args(["import", "-d", "/dev"].iter());
|
||||
let result = run_command(cmd, None).unwrap();
|
||||
for (pool, disks) in Self::parse_zpool_import(&result) {
|
||||
let mut bucket = Bucket::ZPool(ZFSBucketData {
|
||||
name: pool.clone(),
|
||||
size: None,
|
||||
mountpoint: None,
|
||||
});
|
||||
|
||||
// anything more than 5 disks we assume to take too long to mount, so we don't
|
||||
// automatically - this means that no size can be reported
|
||||
if disks.len() <= 5 {
|
||||
let mp = filesystems.ensure_mounted(&mut bucket);
|
||||
info!(
|
||||
"zpool '{}' (on: {:?}) auto-mounted at '{:?}' (size: {:?})",
|
||||
&pool,
|
||||
&disks,
|
||||
mp,
|
||||
bucket.size(0)
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"zpool '{}' (on: {:?}) auto-mount skipped, too many disks",
|
||||
&pool, &disks
|
||||
);
|
||||
}
|
||||
|
||||
for disk in disks {
|
||||
if let Some(fidx) = drive_info.get(&disk) {
|
||||
match disk_map.get_mut(fidx) {
|
||||
Some(v) => v.push(bucket.clone()),
|
||||
None => {
|
||||
disk_map.insert(fidx.to_owned(), vec![bucket.clone()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self::scan_lvm(&mut disk_map, &drive_info)?;
|
||||
|
||||
Ok(Self {
|
||||
filesystems,
|
||||
disk_map,
|
||||
})
|
||||
}
|
||||
|
||||
/// scan for LVM volumes and create device nodes for them to later mount on demand
|
||||
fn scan_lvm(
|
||||
disk_map: &mut HashMap<String, Vec<Bucket>>,
|
||||
drive_info: &HashMap<String, String>,
|
||||
) -> Result<(), Error> {
|
||||
// first get mapping between devices and vgs
|
||||
let mut pv_map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
let mut cmd = Command::new("/sbin/pvs");
|
||||
cmd.args(["-o", "pv_name,vg_name", "--reportformat", "json"].iter());
|
||||
let result = run_command(cmd, None).unwrap();
|
||||
let result: serde_json::Value = serde_json::from_str(&result)?;
|
||||
if let Some(result) = result["report"][0]["pv"].as_array() {
|
||||
for pv in result {
|
||||
let vg_name = pv["vg_name"].as_str().unwrap();
|
||||
if vg_name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let pv_name = pv["pv_name"].as_str().unwrap();
|
||||
// remove '/dev/' part
|
||||
let pv_name = &pv_name[pv_name.rfind('/').map(|i| i + 1).unwrap_or(0)..];
|
||||
if let Some(fidx) = drive_info.get(pv_name) {
|
||||
info!("LVM: found VG '{}' on '{}' ({})", vg_name, pv_name, fidx);
|
||||
match pv_map.get_mut(vg_name) {
|
||||
Some(list) => list.push(fidx.to_owned()),
|
||||
None => {
|
||||
pv_map.insert(vg_name.to_owned(), vec![fidx.to_owned()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mknodes = || {
|
||||
let mut cmd = Command::new("/sbin/vgscan");
|
||||
cmd.arg("--mknodes");
|
||||
if let Err(err) = run_command(cmd, None) {
|
||||
warn!("LVM: 'vgscan --mknodes' failed: {}", err);
|
||||
}
|
||||
};
|
||||
|
||||
// then scan for LVs and assign their buckets to the correct disks
|
||||
let mut cmd = Command::new("/sbin/lvs");
|
||||
cmd.args(
|
||||
[
|
||||
"-o",
|
||||
"vg_name,lv_name,lv_size,metadata_lv",
|
||||
"--units",
|
||||
"B",
|
||||
"--reportformat",
|
||||
"json",
|
||||
]
|
||||
.iter(),
|
||||
);
|
||||
let result = run_command(cmd, None).unwrap();
|
||||
let result: serde_json::Value = serde_json::from_str(&result)?;
|
||||
let mut thinpools = Vec::new();
|
||||
if let Some(result) = result["report"][0]["lv"].as_array() {
|
||||
// first, look for thin-pools
|
||||
for lv in result {
|
||||
let metadata = lv["metadata_lv"].as_str().unwrap_or_default();
|
||||
if !metadata.is_empty() {
|
||||
// this is a thin-pool, activate the metadata LV
|
||||
let vg_name = lv["vg_name"].as_str().unwrap();
|
||||
let metadata = metadata.trim_matches(&['[', ']'][..]);
|
||||
info!("LVM: attempting to activate thinpool '{}'", metadata);
|
||||
let mut cmd = Command::new("/sbin/lvchange");
|
||||
cmd.args(["-ay", "-y", &format!("{}/{}", vg_name, metadata)].iter());
|
||||
if let Err(err) = run_command(cmd, None) {
|
||||
// not critical, will simply mean its children can't be loaded
|
||||
warn!("LVM: activating thinpool failed: {}", err);
|
||||
} else {
|
||||
thinpools.push((vg_name, metadata));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now give the metadata LVs a device node
|
||||
mknodes();
|
||||
|
||||
// cannot leave the metadata LV active, otherwise child-LVs won't activate
|
||||
for (vg_name, metadata) in thinpools {
|
||||
let mut cmd = Command::new("/sbin/lvchange");
|
||||
cmd.args(["-an", "-y", &format!("{}/{}", vg_name, metadata)].iter());
|
||||
let _ = run_command(cmd, None);
|
||||
}
|
||||
|
||||
for lv in result {
|
||||
let lv_name = lv["lv_name"].as_str().unwrap();
|
||||
let vg_name = lv["vg_name"].as_str().unwrap();
|
||||
let metadata = lv["metadata_lv"].as_str().unwrap_or_default();
|
||||
if lv_name.is_empty() || vg_name.is_empty() || !metadata.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let lv_size = lv["lv_size"].as_str().unwrap();
|
||||
// lv_size is in bytes with a capital 'B' at the end
|
||||
let lv_size = lv_size[..lv_size.len() - 1].parse::<u64>().unwrap_or(0);
|
||||
|
||||
let bucket = Bucket::LVM(LVMBucketData {
|
||||
vg_name: vg_name.to_owned(),
|
||||
lv_name: lv_name.to_owned(),
|
||||
size: lv_size,
|
||||
mountpoint: None,
|
||||
});
|
||||
|
||||
// activate the LV so 'vgscan' can create a node later - this may fail, and if it
|
||||
// does, we ignore it and continue
|
||||
let mut cmd = Command::new("/sbin/lvchange");
|
||||
cmd.args(["-ay", &format!("{}/{}", vg_name, lv_name)].iter());
|
||||
if let Err(err) = run_command(cmd, None) {
|
||||
warn!(
|
||||
"LVM: LV '{}' on '{}' ({}B) failed to activate: {}",
|
||||
lv_name, vg_name, lv_size, err
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(
|
||||
"LVM: found LV '{}' on '{}' ({}B)",
|
||||
lv_name, vg_name, lv_size
|
||||
);
|
||||
|
||||
if let Some(drives) = pv_map.get(vg_name) {
|
||||
for fidx in drives {
|
||||
match disk_map.get_mut(fidx) {
|
||||
Some(v) => v.push(bucket.clone()),
|
||||
None => {
|
||||
disk_map.insert(fidx.to_owned(), vec![bucket.clone()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now that we've imported and activated all LV's, we let vgscan create the dev nodes
|
||||
mknodes();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Given a path like "/drive-scsi0.img.fidx/part/0/etc/passwd", this will mount the first
|
||||
/// partition of 'drive-scsi0' on-demand (i.e. if not already mounted) and return a path
|
||||
/// pointing to the requested file locally, e.g. "/mnt/vda1/etc/passwd", which can be used to
|
||||
/// read the file. Given a partial path, i.e. only "/drive-scsi0.img.fidx" or
|
||||
/// "/drive-scsi0.img.fidx/part", it will return a list of available bucket types or bucket
|
||||
/// components respectively
|
||||
pub fn resolve(&mut self, path: &Path) -> Result<ResolveResult, Error> {
|
||||
let mut cmp = path.components().peekable();
|
||||
match cmp.peek() {
|
||||
Some(Component::RootDir) | Some(Component::CurDir) => {
|
||||
cmp.next();
|
||||
}
|
||||
None => bail!("empty path cannot be resolved to file location"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let req_fidx = match cmp.next() {
|
||||
Some(Component::Normal(x)) => x.to_string_lossy(),
|
||||
_ => bail!("no or invalid image in path"),
|
||||
};
|
||||
|
||||
let buckets = match self.disk_map.get_mut(
|
||||
req_fidx
|
||||
.strip_suffix(".img.fidx")
|
||||
.unwrap_or_else(|| req_fidx.as_ref()),
|
||||
) {
|
||||
Some(x) => x,
|
||||
None => bail!("given image '{}' not found", req_fidx),
|
||||
};
|
||||
|
||||
let bucket_type = match cmp.next() {
|
||||
Some(Component::Normal(x)) => x.to_string_lossy(),
|
||||
Some(c) => bail!("invalid bucket in path: {:?}", c),
|
||||
None => {
|
||||
// list bucket types available
|
||||
let mut types = buckets
|
||||
.iter()
|
||||
.map(|b| b.type_string())
|
||||
.collect::<Vec<&'static str>>();
|
||||
// dedup requires duplicates to be consecutive, which is the case - see scan()
|
||||
types.dedup();
|
||||
return Ok(ResolveResult::BucketTypes(types));
|
||||
}
|
||||
};
|
||||
|
||||
let mut components = Vec::new();
|
||||
let component_count = Bucket::component_depth(&bucket_type)?;
|
||||
|
||||
while components.len() < component_count {
|
||||
let component = match cmp.next() {
|
||||
Some(Component::Normal(x)) => x.to_string_lossy(),
|
||||
Some(c) => bail!("invalid bucket component in path: {:?}", c),
|
||||
None => {
|
||||
// list bucket components available at this level
|
||||
let mut comps = buckets
|
||||
.iter()
|
||||
.filter_map(|b| {
|
||||
if b.type_string() != bucket_type {
|
||||
return None;
|
||||
}
|
||||
match b.component_string(components.len()) {
|
||||
Ok(cs) => Some((cs.to_owned(), b.size(components.len()))),
|
||||
Err(_) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<(String, Option<u64>)>>();
|
||||
comps.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
comps.dedup();
|
||||
return Ok(ResolveResult::BucketComponents(comps));
|
||||
}
|
||||
};
|
||||
|
||||
components.push(component);
|
||||
}
|
||||
|
||||
let mut bucket = match Bucket::filter_mut(buckets, &bucket_type, &components) {
|
||||
Some(bucket) => bucket,
|
||||
None => bail!(
|
||||
"bucket/component path not found: {}/{}/{}",
|
||||
req_fidx,
|
||||
bucket_type,
|
||||
components.join("/")
|
||||
),
|
||||
};
|
||||
|
||||
// bucket found, check mount
|
||||
let mountpoint = self
|
||||
.filesystems
|
||||
.ensure_mounted(&mut bucket)
|
||||
.map_err(|err| {
|
||||
format_err!(
|
||||
"mounting '{}/{}/{}' failed: {}",
|
||||
req_fidx,
|
||||
bucket_type,
|
||||
components.join("/"),
|
||||
err
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut local_path = PathBuf::new();
|
||||
local_path.push(mountpoint);
|
||||
for rem in cmp {
|
||||
local_path.push(rem);
|
||||
}
|
||||
|
||||
Ok(ResolveResult::Path(local_path))
|
||||
}
|
||||
|
||||
fn make_dev_node(devnode: &str, sys_path: &str) -> Result<u64, Error> {
|
||||
let dev_num_str = fs::file_read_firstline(&format!("{}/dev", sys_path))?;
|
||||
let (major, minor) = dev_num_str.split_at(dev_num_str.find(':').unwrap());
|
||||
Self::mknod_blk(&devnode, major.parse()?, minor[1..].trim_end().parse()?)?;
|
||||
|
||||
// this *always* contains the number of 512-byte sectors, regardless of the true
|
||||
// blocksize of this disk - which should always be 512 here anyway
|
||||
let size = fs::file_read_firstline(&format!("{}/size", sys_path))?
|
||||
.trim()
|
||||
.parse::<u64>()?
|
||||
* 512;
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn mknod_blk(path: &str, maj: u64, min: u64) -> Result<(), Error> {
|
||||
use nix::sys::stat;
|
||||
let dev = stat::makedev(maj, min);
|
||||
stat::mknod(path, stat::SFlag::S_IFBLK, stat::Mode::S_IRWXU, dev)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_zpool_import(data: &str) -> Vec<(String, Vec<String>)> {
|
||||
let mut ret = Vec::new();
|
||||
let mut disks = Vec::new();
|
||||
let mut cur = "".to_string();
|
||||
for line in data.lines() {
|
||||
if let Some(groups) = (ZPOOL_POOL_NAME_REGEX.regex_obj)().captures(line) {
|
||||
if let Some(name) = groups.get(1) {
|
||||
if !disks.is_empty() {
|
||||
ret.push((cur, disks.clone()));
|
||||
}
|
||||
disks.clear();
|
||||
cur = name.as_str().to_owned();
|
||||
}
|
||||
} else if let Some(groups) = (ZPOOL_IMPORT_DISK_REGEX.regex_obj)().captures(line) {
|
||||
if let Some(disk) = groups.get(1) {
|
||||
disks.push(disk.as_str().to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
if !disks.is_empty() && !cur.is_empty() {
|
||||
ret.push((cur, disks));
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
11
proxmox-restore-daemon/src/proxmox_restore_daemon/mod.rs
Normal file
11
proxmox-restore-daemon/src/proxmox_restore_daemon/mod.rs
Normal file
@ -0,0 +1,11 @@
|
||||
///! File restore VM related functionality
|
||||
mod api;
|
||||
pub use api::*;
|
||||
|
||||
pub mod auth;
|
||||
|
||||
mod watchdog;
|
||||
pub use watchdog::*;
|
||||
|
||||
mod disk;
|
||||
pub use disk::*;
|
@ -0,0 +1,64 @@
|
||||
//! Tokio-based watchdog that shuts down the VM if not pinged for TIMEOUT
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
use proxmox::tools::time::epoch_i64;
|
||||
|
||||
const TIMEOUT: i64 = 600; // seconds
|
||||
static TRIGGERED: AtomicI64 = AtomicI64::new(0);
|
||||
static INHIBITORS: AtomicI64 = AtomicI64::new(0);
|
||||
|
||||
pub struct WatchdogInhibitor {}
|
||||
|
||||
fn handle_expired() -> ! {
|
||||
use nix::sys::reboot;
|
||||
println!("watchdog expired, shutting down");
|
||||
let err = reboot::reboot(reboot::RebootMode::RB_POWER_OFF).unwrap_err();
|
||||
println!("'reboot' syscall failed: {}", err);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
async fn watchdog_loop() {
|
||||
use tokio::time::{sleep, Duration};
|
||||
loop {
|
||||
let remaining = watchdog_remaining();
|
||||
if remaining <= 0 {
|
||||
handle_expired();
|
||||
}
|
||||
sleep(Duration::from_secs(remaining as u64)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize watchdog
|
||||
pub fn watchdog_init() {
|
||||
watchdog_ping();
|
||||
tokio::spawn(watchdog_loop());
|
||||
}
|
||||
|
||||
/// Trigger watchdog keepalive
|
||||
pub fn watchdog_ping() {
|
||||
TRIGGERED.fetch_max(epoch_i64(), Ordering::AcqRel);
|
||||
}
|
||||
|
||||
/// Returns the remaining time before watchdog expiry in seconds
|
||||
pub fn watchdog_remaining() -> i64 {
|
||||
if INHIBITORS.load(Ordering::Acquire) > 0 {
|
||||
TIMEOUT
|
||||
} else {
|
||||
TIMEOUT - (epoch_i64() - TRIGGERED.load(Ordering::Acquire))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an object that inhibts watchdog expiry for its lifetime, it will issue a ping on Drop
|
||||
pub fn watchdog_inhibit() -> WatchdogInhibitor {
|
||||
let prev = INHIBITORS.fetch_add(1, Ordering::AcqRel);
|
||||
log::info!("Inhibit added: {}", prev + 1);
|
||||
WatchdogInhibitor {}
|
||||
}
|
||||
|
||||
impl Drop for WatchdogInhibitor {
|
||||
fn drop(&mut self) {
|
||||
watchdog_ping();
|
||||
let prev = INHIBITORS.fetch_sub(1, Ordering::AcqRel);
|
||||
log::info!("Inhibit dropped: {}", prev - 1);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user