diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 2403bea6..4a206309 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -1970,6 +1970,8 @@ fn main() { .insert("status", status_cmd_def) .insert("key", key::cli()) .insert("mount", mount_cmd_def()) + .insert("map", map_cmd_def()) + .insert("unmap", unmap_cmd_def()) .insert("catalog", catalog_mgmt_cli()) .insert("task", task_mgmt_cli()) .insert("version", version_cmd_def) diff --git a/src/bin/proxmox_backup_client/mount.rs b/src/bin/proxmox_backup_client/mount.rs index 54bf848e..c634674a 100644 --- a/src/bin/proxmox_backup_client/mount.rs +++ b/src/bin/proxmox_backup_client/mount.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::os::unix::io::RawFd; use std::path::Path; use std::ffi::OsStr; +use std::collections::HashMap; use anyhow::{bail, format_err, Error}; use serde_json::Value; @@ -10,6 +11,7 @@ use tokio::signal::unix::{signal, SignalKind}; use nix::unistd::{fork, ForkResult, pipe}; use futures::select; use futures::future::FutureExt; +use futures::stream::{StreamExt, TryStreamExt}; use proxmox::{sortable, identity}; use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment, schema::*, cli::*}; @@ -23,6 +25,7 @@ use proxmox_backup::backup::{ BackupDir, BackupGroup, BufferedDynamicReader, + AsyncIndexReader, }; use proxmox_backup::client::*; @@ -50,7 +53,34 @@ const API_METHOD_MOUNT: ApiMethod = ApiMethod::new( ("target", false, &StringSchema::new("Target directory path.").schema()), ("repository", true, &REPO_URL_SCHEMA), ("keyfile", true, &StringSchema::new("Path to encryption key.").schema()), - ("verbose", true, &BooleanSchema::new("Verbose output.").default(false).schema()), + ("verbose", true, &BooleanSchema::new("Verbose output and stay in foreground.").default(false).schema()), + ]), + ) +); + +#[sortable] +const API_METHOD_MAP: ApiMethod = ApiMethod::new( + &ApiHandler::Sync(&mount), + &ObjectSchema::new( + "Map a drive image from a VM backup to a local loopback device. Use 'unmap' to undo. +WARNING: Only do this with *trusted* backups!", + &sorted!([ + ("snapshot", false, &StringSchema::new("Group/Snapshot path.").schema()), + ("archive-name", false, &StringSchema::new("Backup archive name.").schema()), + ("repository", true, &REPO_URL_SCHEMA), + ("keyfile", true, &StringSchema::new("Path to encryption key.").schema()), + ("verbose", true, &BooleanSchema::new("Verbose output and stay in foreground.").default(false).schema()), + ]), + ) +); + +#[sortable] +const API_METHOD_UNMAP: ApiMethod = ApiMethod::new( + &ApiHandler::Sync(&unmap), + &ObjectSchema::new( + "Unmap a loop device mapped with 'map' and release all resources.", + &sorted!([ + ("loopdev", false, &StringSchema::new("Path to loopdev (/dev/loopX) or loop device number.").schema()), ]), ) ); @@ -65,6 +95,22 @@ pub fn mount_cmd_def() -> CliCommand { .completion_cb("target", tools::complete_file_name) } +pub fn map_cmd_def() -> CliCommand { + + CliCommand::new(&API_METHOD_MAP) + .arg_param(&["snapshot", "archive-name"]) + .completion_cb("repository", complete_repository) + .completion_cb("snapshot", complete_group_or_snapshot) + .completion_cb("archive-name", complete_pxar_archive_name) +} + +pub fn unmap_cmd_def() -> CliCommand { + + CliCommand::new(&API_METHOD_UNMAP) + .arg_param(&["loopdev"]) + .completion_cb("loopdev", tools::complete_file_name) +} + fn mount( param: Value, _info: &ApiMethod, @@ -100,9 +146,10 @@ fn mount( async fn mount_do(param: Value, pipe: Option) -> Result { let repo = extract_repository_from_value(¶m)?; let archive_name = tools::required_string_param(¶m, "archive-name")?; - let target = tools::required_string_param(¶m, "target")?; let client = connect(repo.host(), repo.port(), repo.user())?; + let target = param["target"].as_str(); + record_repository(&repo); let path = tools::required_string_param(¶m, "snapshot")?; @@ -124,9 +171,17 @@ async fn mount_do(param: Value, pipe: Option) -> Result { }; let server_archive_name = if archive_name.ends_with(".pxar") { + if let None = target { + bail!("use the 'mount' command to mount pxar archives"); + } format!("{}.didx", archive_name) + } else if archive_name.ends_with(".img") { + if let Some(_) = target { + bail!("use the 'map' command to map drive images"); + } + format!("{}.fidx", archive_name) } else { - bail!("Can only mount pxar archives."); + bail!("Can only mount/map pxar archives and drive images."); }; let client = BackupReader::start( @@ -143,25 +198,7 @@ async fn mount_do(param: Value, pipe: Option) -> Result { let file_info = manifest.lookup_file_info(&server_archive_name)?; - if server_archive_name.ends_with(".didx") { - let index = client.download_dynamic_index(&manifest, &server_archive_name).await?; - let most_used = index.find_most_used_chunks(8); - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used); - let reader = BufferedDynamicReader::new(index, chunk_reader); - let archive_size = reader.archive_size(); - let reader: proxmox_backup::pxar::fuse::Reader = - Arc::new(BufferedDynamicReadAt::new(reader)); - let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?; - let options = OsStr::new("ro,default_permissions"); - - let session = proxmox_backup::pxar::fuse::Session::mount( - decoder, - &options, - false, - Path::new(target), - ) - .map_err(|err| format_err!("pxar mount failed: {}", err))?; - + let daemonize = || -> Result<(), Error> { if let Some(pipe) = pipe { nix::unistd::chdir(Path::new("/")).unwrap(); // Finish creation of daemon by redirecting filedescriptors. @@ -182,10 +219,35 @@ async fn mount_do(param: Value, pipe: Option) -> Result { nix::unistd::close(pipe).unwrap(); } - // handle SIGINT and SIGTERM - let mut interrupt_int = signal(SignalKind::interrupt())?; - let mut interrupt_term = signal(SignalKind::terminate())?; - let mut interrupt = futures::future::select(interrupt_int.next(), interrupt_term.next()); + Ok(()) + }; + + let options = OsStr::new("ro,default_permissions"); + + // handle SIGINT and SIGTERM + let mut interrupt_int = signal(SignalKind::interrupt())?; + let mut interrupt_term = signal(SignalKind::terminate())?; + let mut interrupt = futures::future::select(interrupt_int.next(), interrupt_term.next()); + + if server_archive_name.ends_with(".didx") { + let index = client.download_dynamic_index(&manifest, &server_archive_name).await?; + let most_used = index.find_most_used_chunks(8); + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used); + let reader = BufferedDynamicReader::new(index, chunk_reader); + let archive_size = reader.archive_size(); + let reader: proxmox_backup::pxar::fuse::Reader = + Arc::new(BufferedDynamicReadAt::new(reader)); + let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?; + + let session = proxmox_backup::pxar::fuse::Session::mount( + decoder, + &options, + false, + Path::new(target.unwrap()), + ) + .map_err(|err| format_err!("pxar mount failed: {}", err))?; + + daemonize()?; select! { res = session.fuse() => res?, @@ -193,9 +255,73 @@ async fn mount_do(param: Value, pipe: Option) -> Result { // exit on interrupted } } + } else if server_archive_name.ends_with(".fidx") { + let index = client.download_fixed_index(&manifest, &server_archive_name).await?; + let size = index.index_bytes(); + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new()); + let reader = AsyncIndexReader::new(index, chunk_reader); + + let mut session = tools::fuse_loop::FuseLoopSession::map_loop(size, reader, options).await?; + let loopdev = session.loopdev_path.clone(); + + let (st_send, st_recv) = futures::channel::mpsc::channel(1); + let (mut abort_send, abort_recv) = futures::channel::mpsc::channel(1); + let mut st_recv = st_recv.fuse(); + let mut session_fut = session.main(st_send, abort_recv).boxed().fuse(); + + // poll until loop file is mapped (or errors) + select! { + res = session_fut => { + bail!("FUSE session unexpectedly ended before loop file mapping"); + }, + res = st_recv.try_next() => { + if let Err(err) = res { + // init went wrong, abort now + abort_send.try_send(()).map_err(|err| + format_err!("error while sending abort signal - {}", err))?; + // ignore and keep original error cause + let _ = session_fut.await; + return Err(err); + } + } + } + + // daemonize only now to be able to print mapped loopdev or startup errors + println!("Image mapped as {}", loopdev); + daemonize()?; + + // continue polling until complete or interrupted (which also happens on unmap) + select! { + res = session_fut => res?, + _ = interrupt => { + // exit on interrupted + abort_send.try_send(()).map_err(|err| + format_err!("error while sending abort signal - {}", err))?; + session_fut.await?; + } + } + + println!("Image unmapped"); } else { - bail!("unknown archive file extension (expected .pxar)"); + bail!("unknown archive file extension (expected .pxar or .img)"); } Ok(Value::Null) } + +fn unmap( + param: Value, + _info: &ApiMethod, + _rpcenv: &mut dyn RpcEnvironment, +) -> Result { + + let mut path = tools::required_string_param(¶m, "loopdev")?.to_owned(); + + if let Ok(num) = path.parse::() { + path = format!("/dev/loop{}", num); + } + + tools::fuse_loop::unmap(path)?; + + Ok(Value::Null) +} diff --git a/src/tools.rs b/src/tools.rs index 33986b1c..5b244c02 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -33,6 +33,8 @@ pub mod statistics; pub mod systemd; pub mod nom; pub mod logrotate; +pub mod loopdev; +pub mod fuse_loop; mod parallel_handler; pub use parallel_handler::*; diff --git a/src/tools/fuse_loop.rs b/src/tools/fuse_loop.rs new file mode 100644 index 00000000..8c1e04aa --- /dev/null +++ b/src/tools/fuse_loop.rs @@ -0,0 +1,258 @@ +use anyhow::{Error, format_err, bail}; +use std::ffi::OsStr; +use std::path::{Path, PathBuf}; +use std::fs::{File, remove_file, read_to_string}; +use std::io::SeekFrom; +use std::io::prelude::*; + +use nix::unistd::{Pid, mkstemp}; +use nix::sys::signal::{self, Signal}; + +use tokio::io::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt}; +use futures::stream::{StreamExt, TryStreamExt}; +use futures::channel::mpsc::{Sender, Receiver}; + +use proxmox::try_block; +use proxmox_fuse::{*, requests::FuseRequest}; +use super::loopdev; + +const RUN_DIR: &'static str = "/run/pbs-loopdev"; + +pub struct FuseLoopSession { + session: Option, + stat: libc::stat, + reader: R, + fuse_path: String, + pid_path: String, + pub loopdev_path: String, +} + +impl FuseLoopSession { + + /// Prepare for mapping the given reader as a block device node at + /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap. + pub async fn map_loop(size: u64, mut reader: R, options: &OsStr) + -> Result + { + // attempt a single read to check if the reader is configured correctly + let _ = reader.read_u8().await?; + + std::fs::create_dir_all(RUN_DIR)?; + let mut base_path = PathBuf::from(RUN_DIR); + base_path.push("XXXXXX"); // template for mkstemp + let (_, path) = mkstemp(&base_path)?; + let mut pid_path = path.clone(); + pid_path.set_extension("pid"); + + let res: Result<(Fuse, String), Error> = try_block!{ + let session = Fuse::builder("pbs-block-dev")? + .options_os(options)? + .enable_read() + .build()? + .mount(&path)?; + + let loopdev_path = loopdev::get_or_create_free_dev().map_err(|err| { + format_err!("loop-control GET_FREE failed - {}", err) + })?; + + // write pidfile so unmap can later send us a signal to exit + Self::write_pidfile(&pid_path)?; + + Ok((session, loopdev_path)) + }; + + match res { + Ok((session, loopdev_path)) => + Ok(Self { + session: Some(session), + reader, + stat: minimal_stat(size as i64), + fuse_path: path.to_string_lossy().into_owned(), + pid_path: pid_path.to_string_lossy().into_owned(), + loopdev_path, + }), + Err(e) => { + // best-effort temp file cleanup in case of error + let _ = remove_file(&path); + let _ = remove_file(&pid_path); + Err(e) + } + } + } + + fn write_pidfile(path: &Path) -> Result<(), Error> { + let pid = unsafe { libc::getpid() }; + let mut file = File::create(path)?; + write!(file, "{}", pid)?; + Ok(()) + } + + /// Runs the FUSE request loop and assigns the loop device. Will send a + /// message on startup_chan once the loop device is assigned (or assignment + /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE. + /// An error on loopdev assignment does *not* automatically close the FUSE + /// handle or do cleanup, trigger abort_chan manually in case startup fails. + pub async fn main( + &mut self, + mut startup_chan: Sender>, + abort_chan: Receiver<()>, + ) -> Result<(), Error> { + + if let None = self.session { + panic!("internal error: fuse_loop::main called before ::map_loop"); + } + let mut session = self.session.take().unwrap().fuse(); + let mut abort_chan = abort_chan.fuse(); + + let (loopdev_path, fuse_path) = (self.loopdev_path.clone(), self.fuse_path.clone()); + tokio::task::spawn_blocking(move || { + if let Err(err) = loopdev::assign(loopdev_path, fuse_path) { + let _ = startup_chan.try_send(Err(format_err!("error while assigning loop device - {}", err))); + } else { + // device is assigned successfully, which means not only is the + // loopdev ready, but FUSE is also okay, since the assignment + // would have failed otherwise + let _ = startup_chan.try_send(Ok(())); + } + }); + + let (loopdev_path, fuse_path, pid_path) = + (self.loopdev_path.clone(), self.fuse_path.clone(), self.pid_path.clone()); + let cleanup = |session: futures::stream::Fuse| { + // only warn for errors on cleanup, if these fail nothing is lost + if let Err(err) = loopdev::unassign(&loopdev_path) { + eprintln!( + "cleanup: warning: could not unassign file {} from loop device {} - {}", + &fuse_path, + &loopdev_path, + err, + ); + } + + // force close FUSE handle before attempting to remove backing file + std::mem::drop(session); + + if let Err(err) = remove_file(&fuse_path) { + eprintln!( + "cleanup: warning: could not remove temporary file {} - {}", + &fuse_path, + err, + ); + } + if let Err(err) = remove_file(&pid_path) { + eprintln!( + "cleanup: warning: could not remove PID file {} - {}", + &pid_path, + err, + ); + } + }; + + loop { + tokio::select!{ + _ = abort_chan.next() => { + // aborted, do cleanup and exit + break; + }, + req = session.try_next() => { + let res = match req? { + Some(Request::Lookup(req)) => { + let stat = self.stat; + let entry = EntryParam::simple(stat.st_ino, stat); + req.reply(&entry) + }, + Some(Request::Getattr(req)) => { + req.reply(&self.stat, std::f64::MAX) + }, + Some(Request::Read(req)) => { + match self.reader.seek(SeekFrom::Start(req.offset)).await { + Ok(_) => { + let mut buf = vec![0u8; req.size]; + match self.reader.read_exact(&mut buf).await { + Ok(_) => { + req.reply(&buf) + }, + Err(e) => { + req.io_fail(e) + } + } + }, + Err(e) => { + req.io_fail(e) + } + } + }, + Some(_) => { + // only FUSE requests necessary for loop-mapping are implemented + eprintln!("Unimplemented FUSE request type encountered"); + Ok(()) + }, + None => { + // FUSE connection closed + break; + } + }; + if let Err(err) = res { + // error during FUSE reply, cleanup and exit + cleanup(session); + bail!(err); + } + } + } + } + + // non-error FUSE exit + cleanup(session); + Ok(()) + } +} + +/// Try and unmap a running proxmox-backup-client instance from the given +/// /dev/loopN device +pub fn unmap(loopdev: String) -> Result<(), Error> { + if loopdev.len() < 10 || !loopdev.starts_with("/dev/loop") { + bail!("malformed loopdev path, must be in format '/dev/loopX'"); + } + let num = loopdev.split_at(9).1.parse::().map_err(|err| + format_err!("malformed loopdev path, does not end with valid number - {}", err))?; + + let block_path = PathBuf::from(format!("/sys/devices/virtual/block/loop{}/loop/backing_file", num)); + let backing_file = read_to_string(block_path).map_err(|err| { + if err.kind() == std::io::ErrorKind::NotFound { + format_err!("nothing mapped to {}", loopdev) + } else { + format_err!("error reading backing file - {}", err) + } + })?; + + let backing_file = backing_file.trim(); + if !backing_file.starts_with(RUN_DIR) { + bail!( + "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')", + loopdev, + backing_file, + ); + } + + let mut pid_path = PathBuf::from(backing_file); + pid_path.set_extension("pid"); + + let pid_str = read_to_string(&pid_path).map_err(|err| + format_err!("error reading pidfile {:?}: {}", &pid_path, err))?; + let pid = pid_str.parse::().map_err(|err| + format_err!("malformed PID ({}) in pidfile - {}", pid_str, err))?; + + // send SIGINT to trigger cleanup and exit in target process + signal::kill(Pid::from_raw(pid), Signal::SIGINT)?; + + Ok(()) +} + +fn minimal_stat(size: i64) -> libc::stat { + let mut stat: libc::stat = unsafe { std::mem::zeroed() }; + stat.st_mode = libc::S_IFREG; + stat.st_ino = 1; + stat.st_nlink = 1; + stat.st_size = size; + stat +} diff --git a/src/tools/loopdev.rs b/src/tools/loopdev.rs new file mode 100644 index 00000000..9f1d6fd1 --- /dev/null +++ b/src/tools/loopdev.rs @@ -0,0 +1,93 @@ +use anyhow::Error; +use std::fs::{File, OpenOptions}; +use std::path::Path; +use std::os::unix::io::{RawFd, AsRawFd}; + +const LOOP_CONTROL: &str = "/dev/loop-control"; +const LOOP_NAME: &str = "/dev/loop"; + +/// Implements a subset of loop device ioctls necessary to assign and release +/// a single file from a free loopdev. +mod loop_ioctl { + use nix::{ioctl_none, ioctl_write_int_bad, ioctl_write_ptr_bad}; + + const LOOP_IOCTL: u16 = 0x4C; // 'L' + const LOOP_SET_FD: u16 = 0x00; + const LOOP_CLR_FD: u16 = 0x01; + const LOOP_SET_STATUS64: u16 = 0x04; + + const LOOP_CTRL_GET_FREE: u16 = 0x82; + + ioctl_write_int_bad!(ioctl_set_fd, (LOOP_IOCTL << 8) | LOOP_SET_FD); + ioctl_none!(ioctl_clr_fd, LOOP_IOCTL, LOOP_CLR_FD); + ioctl_none!(ioctl_ctrl_get_free, LOOP_IOCTL, LOOP_CTRL_GET_FREE); + ioctl_write_ptr_bad!(ioctl_set_status64, (LOOP_IOCTL << 8) | LOOP_SET_STATUS64, LoopInfo64); + + pub const LO_FLAGS_READ_ONLY: u32 = 1; + pub const LO_FLAGS_PARTSCAN: u32 = 8; + + const LO_NAME_SIZE: usize = 64; + const LO_KEY_SIZE: usize = 32; + + #[repr(C)] + pub struct LoopInfo64 { + pub lo_device: u64, + pub lo_inode: u64, + pub lo_rdevice: u64, + pub lo_offset: u64, + pub lo_sizelimit: u64, + pub lo_number: u32, + pub lo_encrypt_type: u32, + pub lo_encrypt_key_size: u32, + pub lo_flags: u32, + pub lo_file_name: [u8; LO_NAME_SIZE], + pub lo_crypt_name: [u8; LO_NAME_SIZE], + pub lo_encrypt_key: [u8; LO_KEY_SIZE], + pub lo_init: [u64; 2], + } +} + +// ioctl helpers create public fns, do not export them outside the module +// users should use the wrapper functions below +use loop_ioctl::*; + +/// Use the GET_FREE ioctl to get or add a free loop device, of which the +/// /dev/loopN path will be returned. This is inherently racy because of the +/// delay between this and calling assign, but since assigning is atomic it +/// does not matter much and will simply cause assign to fail. +pub fn get_or_create_free_dev() -> Result { + let ctrl_file = File::open(LOOP_CONTROL)?; + let free_num = unsafe { ioctl_ctrl_get_free(ctrl_file.as_raw_fd())? }; + let loop_file_path = format!("{}{}", LOOP_NAME, free_num); + Ok(loop_file_path) +} + +fn assign_dev(fd: RawFd, backing_fd: RawFd) -> Result<(), Error> { + unsafe { ioctl_set_fd(fd, backing_fd)?; } + + // set required read-only flag and partscan for convenience + let mut info: LoopInfo64 = unsafe { std::mem::zeroed() }; + info.lo_flags = LO_FLAGS_READ_ONLY | LO_FLAGS_PARTSCAN; + unsafe { ioctl_set_status64(fd, &info)?; } + + Ok(()) +} + +/// Open the next available /dev/loopN file and assign the given path to +/// it as it's backing file in read-only mode. +pub fn assign>(loop_dev: P, backing: P) -> Result<(), Error> { + let loop_file = File::open(loop_dev)?; + let backing_file = OpenOptions::new() + .read(true) + .open(backing)?; + assign_dev(loop_file.as_raw_fd(), backing_file.as_raw_fd())?; + Ok(()) +} + +/// Unassign any file descriptors currently attached to the given +/// /dev/loopN device. +pub fn unassign>(path: P) -> Result<(), Error> { + let loop_file = File::open(path)?; + unsafe { ioctl_clr_fd(loop_file.as_raw_fd())?; } + Ok(()) +}