proxmox-backup/src/server/command_socket.rs
Thomas Lamprecht f3df613cb7 server: add CommandoSocket where multiple users can register commands
This is a preparatory step to replace the task control socket with it
and provide a "reopen log file" command for the rest server.

Kept it simple by disallowing to register new commands after the
socket gets spawned, this avoids the need for locking.

If we really need that we can always wrap it in a Arc<RWLock<..>> or
something like that, or even nicer, register at compile time.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-11-02 19:32:22 +01:00

216 lines
7.1 KiB
Rust

use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;
use futures::*;
use tokio::net::UnixListener;
use serde_json::Value;
use nix::sys::socket;
/// Listens on a Unix Socket to handle simple command asynchronously
pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
{
let path: PathBuf = path.into();
let backup_user = crate::backup::backup_user()?;
let backup_gid = backup_user.gid.as_raw();
let mut socket = UnixListener::bind(&path)?;
let func = Arc::new(func);
let control_future = async move {
loop {
let (conn, _addr) = match socket.accept().await {
Ok(data) => data,
Err(err) => {
eprintln!("failed to accept on control socket {:?}: {}", path, err);
continue;
}
};
let opt = socket::sockopt::PeerCredentials {};
let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
Ok(cred) => cred,
Err(err) => {
eprintln!("no permissions - unable to read peer credential - {}", err);
continue;
}
};
// check permissions (same gid, root user, or backup group)
let mygid = unsafe { libc::getgid() };
if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
eprintln!("no permissions for {:?}", cred);
continue;
}
let (rx, mut tx) = tokio::io::split(conn);
let abort_future = super::last_worker_future().map(|_| ());
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let func = Arc::clone(&func);
let path = path.clone();
tokio::spawn(futures::future::select(
async move {
let mut rx = tokio::io::BufReader::new(rx);
let mut line = String::new();
loop {
line.clear();
match rx.read_line({ line.clear(); &mut line }).await {
Ok(0) => break,
Ok(_) => (),
Err(err) => {
eprintln!("control socket {:?} read error: {}", path, err);
return;
}
}
let response = match line.parse::<Value>() {
Ok(param) => match func(param) {
Ok(res) => format!("OK: {}\n", res),
Err(err) => format!("ERROR: {}\n", err),
}
Err(err) => format!("ERROR: {}\n", err),
};
if let Err(err) = tx.write_all(response.as_bytes()).await {
eprintln!("control socket {:?} write response error: {}", path, err);
return;
}
}
}.boxed(),
abort_future,
).map(|_| ()));
}
}.boxed();
let abort_future = super::last_worker_future().map_err(|_| {});
let task = futures::future::select(
control_future,
abort_future,
).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
Ok(task)
}
pub async fn send_command<P>(
path: P,
params: Value
) -> Result<Value, Error>
where P: Into<PathBuf>,
{
let path: PathBuf = path.into();
tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err))
.and_then(move |mut conn| {
let mut command_string = params.to_string();
command_string.push('\n');
async move {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
conn.write_all(command_string.as_bytes()).await?;
AsyncWriteExt::shutdown(&mut conn).await?;
let mut rx = tokio::io::BufReader::new(conn);
let mut data = String::new();
if rx.read_line(&mut data).await? == 0 {
bail!("no response");
}
if data.starts_with("OK: ") {
match data[4..].parse::<Value>() {
Ok(v) => Ok(v),
Err(err) => bail!("unable to parse json response - {}", err),
}
} else if data.starts_with("ERROR: ") {
bail!("{}", &data[7..]);
} else {
bail!("unable to parse response: {}", data);
}
}
}).await
}
/// A callback for a specific commando socket.
pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
/// Tooling to get a single control command socket where one can register multiple commands
/// dynamically.
/// You need to call `spawn()` to make the socket active.
pub struct CommandoSocket {
socket: PathBuf,
commands: HashMap<String, CommandoSocketFn>,
}
impl CommandoSocket {
pub fn new<P>(path: P) -> Self
where P: Into<PathBuf>,
{
CommandoSocket {
socket: path.into(),
commands: HashMap::new(),
}
}
/// Spawn the socket and consume self, meaning you cannot register commands anymore after
/// calling this.
pub fn spawn(self) -> Result<(), Error> {
let control_future = create_control_socket(self.socket.to_owned(), move |param| {
let param = param
.as_object()
.ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
let command = match param.get("command") {
Some(Value::String(command)) => command.as_str(),
None => bail!("no command"),
_ => bail!("unable to parse command"),
};
if !self.commands.contains_key(command) {
bail!("got unknown command '{}'", command);
}
match self.commands.get(command) {
None => bail!("got unknown command '{}'", command),
Some(handler) => {
let args = param.get("args"); //.unwrap_or(&Value::Null);
(handler)(args)
},
}
})?;
tokio::spawn(control_future);
Ok(())
}
/// Register a new command with a callback.
pub fn register_command<F>(
&mut self,
command: String,
handler: F,
) -> Result<(), Error>
where
F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
{
if self.commands.contains_key(&command) {
bail!("command '{}' already exists!", command);
}
self.commands.insert(command, Box::new(handler));
Ok(())
}
}