refactor send_command

- refactor the combinators,
- make it take a `&T: Serialize` instead of a Value, and
  allow sending the raw string via `send_raw_command`.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2021-05-11 15:53:59 +02:00 committed by Thomas Lamprecht
parent a723c08715
commit 45b8a0327f
3 changed files with 42 additions and 41 deletions

View File

@ -750,15 +750,11 @@ async fn command_reopen_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones // only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit. // should not respond to new requests anyway, but only finish their current one and then exit.
let sock = server::our_ctrl_sock(); let sock = server::our_ctrl_sock();
let f1 = server::send_command(sock, serde_json::json!({ let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
"command": "api-access-log-reopen",
}));
let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?; let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
let sock = server::ctrl_sock_from_pid(pid); let sock = server::ctrl_sock_from_pid(pid);
let f2 = server::send_command(sock, serde_json::json!({ let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
"command": "api-access-log-reopen",
}));
match futures::join!(f1, f2) { match futures::join!(f1, f2) {
(Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)), (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),

View File

@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error};
use std::collections::HashMap; use std::collections::HashMap;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::path::PathBuf; use std::path::{PathBuf, Path};
use std::sync::Arc; use std::sync::Arc;
use futures::*; use futures::*;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use nix::sys::socket; use nix::sys::socket;
@ -102,43 +103,47 @@ where
} }
pub async fn send_command<P>( pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
path: P, where
params: Value P: AsRef<Path>,
) -> Result<Value, Error> T: ?Sized + Serialize,
where P: Into<PathBuf>,
{ {
let path: PathBuf = path.into(); let mut command_string = serde_json::to_string(params)?;
command_string.push('\n');
send_raw_command(path.as_ref(), &command_string).await
}
tokio::net::UnixStream::connect(path) pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
where
P: AsRef<Path>,
{
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let mut conn = tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err)) .map_err(move |err| format_err!("control socket connect failed - {}", err))
.and_then(move |mut conn| { .await?;
let mut command_string = params.to_string(); conn.write_all(command_string.as_bytes()).await?;
command_string.push('\n'); if !command_string.as_bytes().ends_with(b"\n") {
conn.write_all(b"\n").await?;
}
async move { AsyncWriteExt::shutdown(&mut conn).await?;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; let mut rx = tokio::io::BufReader::new(conn);
let mut data = String::new();
conn.write_all(command_string.as_bytes()).await?; if rx.read_line(&mut data).await? == 0 {
AsyncWriteExt::shutdown(&mut conn).await?; bail!("no response");
let mut rx = tokio::io::BufReader::new(conn); }
let mut data = String::new(); if let Some(res) = data.strip_prefix("OK: ") {
if rx.read_line(&mut data).await? == 0 { match res.parse::<Value>() {
bail!("no response"); Ok(v) => Ok(v),
} Err(err) => bail!("unable to parse json response - {}", err),
if let Some(res) = data.strip_prefix("OK: ") { }
match res.parse::<Value>() { } else if let Some(err) = data.strip_prefix("ERROR: ") {
Ok(v) => Ok(v), bail!("{}", err);
Err(err) => bail!("unable to parse json response - {}", err), } else {
} bail!("unable to parse response: {}", data);
} else if let Some(err) = data.strip_prefix("ERROR: ") { }
bail!("{}", err);
} else {
bail!("unable to parse response: {}", data);
}
}
}).await
} }
/// A callback for a specific commando socket. /// A callback for a specific commando socket.

View File

@ -59,7 +59,7 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
"upid": upid.to_string(), "upid": upid.to_string(),
}, },
}); });
let status = super::send_command(sock, cmd).await?; let status = super::send_command(sock, &cmd).await?;
if let Some(active) = status.as_bool() { if let Some(active) = status.as_bool() {
Ok(active) Ok(active)
@ -133,7 +133,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
"upid": upid.to_string(), "upid": upid.to_string(),
}, },
}); });
super::send_command(sock, cmd).map_ok(|_| ()).await super::send_command(sock, &cmd).map_ok(|_| ()).await
} }
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {