//! Map a raw data reader as a loop device via FUSE use anyhow::{Error, format_err, bail}; use std::ffi::OsStr; use std::path::{Path, PathBuf}; use std::fs::{File, remove_file, read_to_string, OpenOptions}; use std::io::SeekFrom; use std::io::prelude::*; use std::collections::HashMap; use nix::unistd::Pid; use nix::sys::signal::{self, Signal}; use tokio::io::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt}; use futures::stream::{StreamExt, TryStreamExt}; use futures::channel::mpsc::{Sender, Receiver}; use proxmox::const_regex; use proxmox_fuse::{*, requests::FuseRequest}; use super::loopdev; use super::fs; const RUN_DIR: &'static str = "/run/pbs-loopdev"; const_regex! { pub LOOPDEV_REGEX = r"^loop\d+$"; } /// Represents an ongoing FUSE-session that has been mapped onto a loop device. /// Create with map_loop, then call 'main' and poll until startup_chan reports /// success. Then, daemonize or otherwise finish setup, and continue polling /// main's future until completion. pub struct FuseLoopSession { session: Option, stat: libc::stat, reader: R, fuse_path: String, pid_path: String, pub loopdev_path: String, } impl FuseLoopSession { /// Prepare for mapping the given reader as a block device node at /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap. pub async fn map_loop>(size: u64, mut reader: R, name: P, options: &OsStr) -> Result { // attempt a single read to check if the reader is configured correctly let _ = reader.read_u8().await?; std::fs::create_dir_all(RUN_DIR)?; let mut path = PathBuf::from(RUN_DIR); path.push(name.as_ref()); let mut pid_path = path.clone(); pid_path.set_extension("pid"); // cleanup previous instance with same name // if loopdev is actually still mapped, this will do nothing and the // create_new below will fail as intended cleanup_unused_run_files(Some(name.as_ref().to_owned())); match OpenOptions::new().write(true).create_new(true).open(&path) { Ok(_) => { /* file created, continue on */ }, Err(e) => { if e.kind() == std::io::ErrorKind::AlreadyExists { bail!("the given archive is already mapped, cannot map twice"); } else { bail!("error while creating backing file ({:?}) - {}", &path, e); } }, } let session = Fuse::builder("pbs-block-dev")? .options_os(options)? .enable_read() .build()? .mount(&path)?; let loopdev_path = loopdev::get_or_create_free_dev().map_err(|err| { format_err!("loop-control GET_FREE failed - {}", err) })?; // write pidfile so unmap can later send us a signal to exit Self::write_pidfile(&pid_path)?; Ok(Self { session: Some(session), reader, stat: minimal_stat(size as i64), fuse_path: path.to_string_lossy().into_owned(), pid_path: pid_path.to_string_lossy().into_owned(), loopdev_path, }) } fn write_pidfile(path: &Path) -> Result<(), Error> { let pid = unsafe { libc::getpid() }; let mut file = File::create(path)?; write!(file, "{}", pid)?; Ok(()) } /// Runs the FUSE request loop and assigns the loop device. Will send a /// message on startup_chan once the loop device is assigned (or assignment /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE. /// An error on loopdev assignment does *not* automatically close the FUSE /// handle or do cleanup, trigger abort_chan manually in case startup fails. pub async fn main( &mut self, mut startup_chan: Sender>, abort_chan: Receiver<()>, ) -> Result<(), Error> { if let None = self.session { panic!("internal error: fuse_loop::main called before ::map_loop"); } let mut session = self.session.take().unwrap().fuse(); let mut abort_chan = abort_chan.fuse(); let (loopdev_path, fuse_path) = (self.loopdev_path.clone(), self.fuse_path.clone()); tokio::task::spawn_blocking(move || { if let Err(err) = loopdev::assign(loopdev_path, fuse_path) { let _ = startup_chan.try_send(Err(format_err!("error while assigning loop device - {}", err))); } else { // device is assigned successfully, which means not only is the // loopdev ready, but FUSE is also okay, since the assignment // would have failed otherwise let _ = startup_chan.try_send(Ok(())); } }); let (loopdev_path, fuse_path, pid_path) = (self.loopdev_path.clone(), self.fuse_path.clone(), self.pid_path.clone()); let cleanup = |session: futures::stream::Fuse| { // only warn for errors on cleanup, if these fail nothing is lost if let Err(err) = loopdev::unassign(&loopdev_path) { eprintln!( "cleanup: warning: could not unassign file {} from loop device {} - {}", &fuse_path, &loopdev_path, err, ); } // force close FUSE handle before attempting to remove backing file std::mem::drop(session); if let Err(err) = remove_file(&fuse_path) { eprintln!( "cleanup: warning: could not remove temporary file {} - {}", &fuse_path, err, ); } if let Err(err) = remove_file(&pid_path) { eprintln!( "cleanup: warning: could not remove PID file {} - {}", &pid_path, err, ); } }; loop { tokio::select!{ _ = abort_chan.next() => { // aborted, do cleanup and exit break; }, req = session.try_next() => { let res = match req? { Some(Request::Lookup(req)) => { let stat = self.stat; let entry = EntryParam::simple(stat.st_ino, stat); req.reply(&entry) }, Some(Request::Getattr(req)) => { req.reply(&self.stat, std::f64::MAX) }, Some(Request::Read(req)) => { match self.reader.seek(SeekFrom::Start(req.offset)).await { Ok(_) => { let mut buf = vec![0u8; req.size]; match self.reader.read_exact(&mut buf).await { Ok(_) => { req.reply(&buf) }, Err(e) => { req.io_fail(e) } } }, Err(e) => { req.io_fail(e) } } }, Some(_) => { // only FUSE requests necessary for loop-mapping are implemented eprintln!("Unimplemented FUSE request type encountered"); Ok(()) }, None => { // FUSE connection closed break; } }; if let Err(err) = res { // error during FUSE reply, cleanup and exit cleanup(session); bail!(err); } } } } // non-error FUSE exit cleanup(session); Ok(()) } } /// Clean up leftover files as well as FUSE instances without a loop device /// connected. Best effort, never returns an error. /// If filter_name is Some("..."), only this name will be cleaned up. pub fn cleanup_unused_run_files(filter_name: Option) { if let Ok(maps) = find_all_mappings() { for (name, loopdev) in maps { if loopdev.is_none() && (filter_name.is_none() || &name == filter_name.as_ref().unwrap()) { let mut path = PathBuf::from(RUN_DIR); path.push(&name); // clean leftover FUSE instances (e.g. user called 'losetup -d' or similar) // does nothing if files are already stagnant (e.g. instance crashed etc...) if let Ok(_) = unmap_from_backing(&path) { // we have reaped some leftover instance, tell the user eprintln!( "Cleaned up dangling mapping '{}': no loop device assigned", &name ); } // remove remnant files // these we're not doing anything, so no need to inform the user let _ = remove_file(&path); path.set_extension("pid"); let _ = remove_file(&path); } } } } fn get_backing_file(loopdev: &str) -> Result { let num = loopdev.split_at(9).1.parse::().map_err(|err| format_err!("malformed loopdev path, does not end with valid number - {}", err))?; let block_path = PathBuf::from(format!("/sys/devices/virtual/block/loop{}/loop/backing_file", num)); let backing_file = read_to_string(block_path).map_err(|err| { if err.kind() == std::io::ErrorKind::NotFound { format_err!("nothing mapped to {}", loopdev) } else { format_err!("error reading backing file - {}", err) } })?; let backing_file = backing_file.trim(); if !backing_file.starts_with(RUN_DIR) { bail!( "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')", loopdev, backing_file, ); } Ok(backing_file.to_owned()) } fn unmap_from_backing(backing_file: &Path) -> Result<(), Error> { let mut pid_path = PathBuf::from(backing_file); pid_path.set_extension("pid"); let pid_str = read_to_string(&pid_path).map_err(|err| format_err!("error reading pidfile {:?}: {}", &pid_path, err))?; let pid = pid_str.parse::().map_err(|err| format_err!("malformed PID ({}) in pidfile - {}", pid_str, err))?; // send SIGINT to trigger cleanup and exit in target process signal::kill(Pid::from_raw(pid), Signal::SIGINT)?; Ok(()) } /// Returns an Iterator over a set of currently active mappings, i.e. /// FuseLoopSession instances. Returns ("backing-file-name", Some("/dev/loopX")) /// where .1 is None when a user has manually called 'losetup -d' or similar but /// the FUSE instance is still running. pub fn find_all_mappings() -> Result)>, Error> { // get map of all /dev/loop mappings belonging to us let mut loopmap = HashMap::new(); for ent in fs::scan_subdir(libc::AT_FDCWD, Path::new("/dev/"), &LOOPDEV_REGEX)? { match ent { Ok(ent) => { let loopdev = format!("/dev/{}", ent.file_name().to_string_lossy()); match get_backing_file(&loopdev) { Ok(file) => { // insert filename only, strip RUN_DIR/ loopmap.insert(file[RUN_DIR.len()+1..].to_owned(), loopdev); }, Err(_) => {}, } }, Err(_) => {}, } } Ok(fs::read_subdir(libc::AT_FDCWD, Path::new(RUN_DIR))? .filter_map(move |ent| { match ent { Ok(ent) => { let file = ent.file_name().to_string_lossy(); if file == "." || file == ".." || file.ends_with(".pid") { None } else { let loopdev = loopmap.get(file.as_ref()).map(String::to_owned); Some((file.into_owned(), loopdev)) } }, Err(_) => None, } })) } /// Try and unmap a running proxmox-backup-client instance from the given /// /dev/loopN device pub fn unmap_loopdev>(loopdev: S) -> Result<(), Error> { let loopdev = loopdev.as_ref(); if loopdev.len() < 10 || !loopdev.starts_with("/dev/loop") { bail!("malformed loopdev path, must be in format '/dev/loopX'"); } let backing_file = get_backing_file(loopdev)?; unmap_from_backing(Path::new(&backing_file)) } /// Try and unmap a running proxmox-backup-client instance from the given name pub fn unmap_name>(name: S) -> Result<(), Error> { for (mapping, _) in find_all_mappings()? { if mapping.ends_with(name.as_ref()) { let mut path = PathBuf::from(RUN_DIR); path.push(&mapping); return unmap_from_backing(&path); } } Err(format_err!("no mapping for name '{}' found", name.as_ref())) } fn minimal_stat(size: i64) -> libc::stat { let mut stat: libc::stat = unsafe { std::mem::zeroed() }; stat.st_mode = libc::S_IFREG; stat.st_ino = 1; stat.st_nlink = 1; stat.st_size = size; stat }