split out pbs-runtime module

These are mostly tokio specific "hacks" or "workarounds" we
only really need/want in our binaries without pulling it in
via our library crates.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2021-07-06 12:08:44 +02:00
parent 01fd2447b2
commit d420962fbc
26 changed files with 50 additions and 37 deletions

View File

@ -22,6 +22,7 @@ exclude = [ "build", "debian", "tests/catar_data/test_symlink/symlink1"]
[workspace] [workspace]
members = [ members = [
"pbs-buildcfg", "pbs-buildcfg",
"pbs-runtime",
] ]
[lib] [lib]
@ -91,7 +92,8 @@ proxmox-http = { version = "0.2.1", features = [ "client", "http-helpers", "webs
#proxmox-http = { version = "0.2.0", path = "../proxmox/proxmox-http", features = [ "client", "http-helpers", "websocket" ] } #proxmox-http = { version = "0.2.0", path = "../proxmox/proxmox-http", features = [ "client", "http-helpers", "websocket" ] }
proxmox-openid = "0.6.0" proxmox-openid = "0.6.0"
pbs-buildcfg = { path = "pbs-buildcfg", version = "0.1" } pbs-buildcfg = { path = "pbs-buildcfg" }
pbs-runtime = { path = "pbs-runtime" }
[features] [features]
default = [] default = []

View File

@ -31,7 +31,8 @@ RESTORE_BIN := \
proxmox-restore-daemon proxmox-restore-daemon
SUBCRATES := \ SUBCRATES := \
pbs-buildcfg pbs-buildcfg \
pbs-runtime
ifeq ($(BUILD_MODE), release) ifeq ($(BUILD_MODE), release)
CARGO_BUILD_ARGS += --release CARGO_BUILD_ARGS += --release

11
pbs-runtime/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "pbs-runtime"
version = "0.1.0"
authors = ["Proxmox Support Team <support@proxmox.com>"]
edition = "2018"
description = "tokio runtime related helpers required for binaries"
[dependencies]
lazy_static = "1.4"
pin-utils = "0.1.0"
tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }

View File

@ -218,7 +218,7 @@ async move {
}; };
if benchmark { if benchmark {
env.log("benchmark finished successfully"); env.log("benchmark finished successfully");
tools::runtime::block_in_place(|| env.remove_backup())?; pbs_runtime::block_in_place(|| env.remove_backup())?;
return Ok(()); return Ok(());
} }
@ -246,13 +246,13 @@ async move {
(Ok(_), Err(err)) => { (Ok(_), Err(err)) => {
env.log(format!("backup ended and finish failed: {}", err)); env.log(format!("backup ended and finish failed: {}", err));
env.log("removing unfinished backup"); env.log("removing unfinished backup");
tools::runtime::block_in_place(|| env.remove_backup())?; pbs_runtime::block_in_place(|| env.remove_backup())?;
Err(err) Err(err)
}, },
(Err(err), Err(_)) => { (Err(err), Err(_)) => {
env.log(format!("backup failed: {}", err)); env.log(format!("backup failed: {}", err));
env.log("removing failed backup"); env.log("removing failed backup");
tools::runtime::block_in_place(|| env.remove_backup())?; pbs_runtime::block_in_place(|| env.remove_backup())?;
Err(err) Err(err)
}, },
} }

View File

@ -61,7 +61,7 @@ impl Future for UploadChunk {
let (is_duplicate, compressed_size) = match proxmox::try_block! { let (is_duplicate, compressed_size) = match proxmox::try_block! {
let mut chunk = DataBlob::from_raw(raw_data)?; let mut chunk = DataBlob::from_raw(raw_data)?;
tools::runtime::block_in_place(|| { pbs_runtime::block_in_place(|| {
chunk.verify_unencrypted(this.size as usize, &this.digest)?; chunk.verify_unencrypted(this.size as usize, &this.digest)?;
// always comput CRC at server side // always comput CRC at server side

View File

@ -236,7 +236,7 @@ fn apt_get_changelog(
let changelog_url = &pkg_info[0].change_log_url; let changelog_url = &pkg_info[0].change_log_url;
// FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
if changelog_url.starts_with("http://download.proxmox.com/") { if changelog_url.starts_with("http://download.proxmox.com/") {
let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, None)) let changelog = pbs_runtime::block_on(client.get_string(changelog_url, None))
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?; .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
Ok(json!(changelog)) Ok(json!(changelog))
@ -260,7 +260,7 @@ fn apt_get_changelog(
auth_header.insert("Authorization".to_owned(), auth_header.insert("Authorization".to_owned(),
format!("Basic {}", base64::encode(format!("{}:{}", key, id)))); format!("Basic {}", base64::encode(format!("{}:{}", key, id))));
let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, Some(&auth_header))) let changelog = pbs_runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?; .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
Ok(json!(changelog)) Ok(json!(changelog))

View File

@ -322,7 +322,7 @@ fn download_chunk(
env.debug(format!("download chunk {:?}", path)); env.debug(format!("download chunk {:?}", path));
let data = tools::runtime::block_in_place(|| std::fs::read(path)) let data = pbs_runtime::block_in_place(|| std::fs::read(path))
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?; .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
let body = Body::from(data); let body = Body::from(data);

View File

@ -21,7 +21,7 @@ use pxar::{EntryKind, Metadata};
use crate::backup::catalog::{self, DirEntryAttribute}; use crate::backup::catalog::{self, DirEntryAttribute};
use crate::pxar::fuse::{Accessor, FileEntry}; use crate::pxar::fuse::{Accessor, FileEntry};
use crate::pxar::Flags; use crate::pxar::Flags;
use crate::tools::runtime::block_in_place; use pbs_runtime::block_in_place;
use crate::tools::ControlFlow; use crate::tools::ControlFlow;
type CatalogReader = crate::backup::CatalogReader<std::fs::File>; type CatalogReader = crate::backup::CatalogReader<std::fs::File>;

View File

@ -18,7 +18,7 @@ use proxmox_backup::config;
fn main() { fn main() {
proxmox_backup::tools::setup_safe_path_env(); proxmox_backup::tools::setup_safe_path_env();
if let Err(err) = proxmox_backup::tools::runtime::main(run()) { if let Err(err) = pbs_runtime::main(run()) {
eprintln!("Error: {}", err); eprintln!("Error: {}", err);
std::process::exit(-1); std::process::exit(-1);
} }

View File

@ -1494,6 +1494,6 @@ fn main() {
let rpcenv = CliEnvironment::new(); let rpcenv = CliEnvironment::new();
run_cli_command(cmd_def, rpcenv, Some(|future| { run_cli_command(cmd_def, rpcenv, Some(|future| {
proxmox_backup::tools::runtime::main(future) pbs_runtime::main(future)
})); }));
} }

View File

@ -397,7 +397,7 @@ fn main() {
let mut rpcenv = CliEnvironment::new(); let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam"))); rpcenv.set_auth_id(Some(String::from("root@pam")));
proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv)); pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
} }
// shell completion helper // shell completion helper
@ -408,7 +408,7 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
let _ = proxmox::try_block!({ let _ = proxmox::try_block!({
let remote = param.get("remote").ok_or_else(|| format_err!("no remote"))?; let remote = param.get("remote").ok_or_else(|| format_err!("no remote"))?;
let data = crate::tools::runtime::block_on(async move { let data = pbs_runtime::block_on(async move {
crate::api2::config::remote::scan_remote_datastores(remote.clone()).await crate::api2::config::remote::scan_remote_datastores(remote.clone()).await
})?; })?;

View File

@ -65,7 +65,7 @@ fn main() -> Result<(), Error> {
bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid); bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid);
} }
proxmox_backup::tools::runtime::main(run()) pbs_runtime::main(run())
} }
async fn run() -> Result<(), Error> { async fn run() -> Result<(), Error> {
@ -700,7 +700,7 @@ async fn schedule_task_log_rotate() {
if logrotate.rotate(max_size, None, Some(max_files))? { if logrotate.rotate(max_size, None, Some(max_files))? {
println!("rotated access log, telling daemons to re-open log file"); println!("rotated access log, telling daemons to re-open log file");
proxmox_backup::tools::runtime::block_on(command_reopen_logfiles())?; pbs_runtime::block_on(command_reopen_logfiles())?;
worker.log("API access log was rotated".to_string()); worker.log("API access log was rotated".to_string());
} else { } else {
worker.log("API access log was not rotated".to_string()); worker.log("API access log was not rotated".to_string());
@ -787,7 +787,7 @@ async fn generate_host_stats(save: bool) {
use proxmox_backup::config::datastore; use proxmox_backup::config::datastore;
proxmox_backup::tools::runtime::block_in_place(move || { pbs_runtime::block_in_place(move || {
match read_proc_stat() { match read_proc_stat() {
Ok(stat) => { Ok(stat) => {

View File

@ -91,7 +91,7 @@ fn main() {
let mut rpcenv = CliEnvironment::new(); let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam"))); rpcenv.set_auth_id(Some(String::from("root@pam")));
if let Err(err) = proxmox_backup::tools::runtime::main(do_update(&mut rpcenv)) { if let Err(err) = pbs_runtime::main(do_update(&mut rpcenv)) {
eprintln!("error during update: {}", err); eprintln!("error during update: {}", err);
std::process::exit(1); std::process::exit(1);
} }

View File

@ -473,6 +473,6 @@ fn main() {
run_cli_command( run_cli_command(
cmd_def, cmd_def,
rpcenv, rpcenv,
Some(|future| proxmox_backup::tools::runtime::main(future)), Some(|future| pbs_runtime::main(future)),
); );
} }

View File

@ -62,7 +62,7 @@ fn main() -> Result<(), Error> {
info!("disk scan complete, starting main runtime..."); info!("disk scan complete, starting main runtime...");
proxmox_backup::tools::runtime::main(run()) pbs_runtime::main(run())
} }
async fn run() -> Result<(), Error> { async fn run() -> Result<(), Error> {

View File

@ -1101,5 +1101,5 @@ fn main() {
let mut rpcenv = CliEnvironment::new(); let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam"))); rpcenv.set_auth_id(Some(String::from("root@pam")));
proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv)); pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
} }

View File

@ -139,7 +139,7 @@ fn mount(
if verbose { if verbose {
// This will stay in foreground with debug output enabled as None is // This will stay in foreground with debug output enabled as None is
// passed for the RawFd. // passed for the RawFd.
return proxmox_backup::tools::runtime::main(mount_do(param, None)); return pbs_runtime::main(mount_do(param, None));
} }
// Process should be daemonized. // Process should be daemonized.
@ -155,7 +155,7 @@ fn mount(
Ok(ForkResult::Child) => { Ok(ForkResult::Child) => {
drop(pr); drop(pr);
nix::unistd::setsid().unwrap(); nix::unistd::setsid().unwrap();
proxmox_backup::tools::runtime::main(mount_do(param, Some(pw))) pbs_runtime::main(mount_do(param, Some(pw)))
} }
Err(_) => bail!("failed to daemonize process"), Err(_) => bail!("failed to daemonize process"),
} }

View File

@ -107,7 +107,7 @@ pub async fn try_get(repo: &BackupRepository, url: &str) -> Value {
} }
pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_backup_group_do(param).await }) pbs_runtime::main(async { complete_backup_group_do(param).await })
} }
pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> {
@ -137,7 +137,7 @@ pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<St
} }
pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_group_or_snapshot_do(arg, param).await }) pbs_runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
} }
pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
@ -156,7 +156,7 @@ pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, St
} }
pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_backup_snapshot_do(param).await }) pbs_runtime::main(async { complete_backup_snapshot_do(param).await })
} }
pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> {
@ -188,7 +188,7 @@ pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec
} }
pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_server_file_name_do(param).await }) pbs_runtime::main(async { complete_server_file_name_do(param).await })
} }
pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> {
@ -279,7 +279,7 @@ pub fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<
} }
pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<String> { pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_auth_id_do(param).await }) pbs_runtime::main(async { complete_auth_id_do(param).await })
} }
pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> { pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> {

View File

@ -491,6 +491,6 @@ fn main() {
let rpcenv = CliEnvironment::new(); let rpcenv = CliEnvironment::new();
run_cli_command(cmd_def, rpcenv, Some(|future| { run_cli_command(cmd_def, rpcenv, Some(|future| {
proxmox_backup::tools::runtime::main(future) pbs_runtime::main(future)
})); }));
} }

View File

@ -73,7 +73,7 @@ async fn pull_index_chunks<I: IndexFile>(
let verify_and_write_channel = verify_and_write_channel.clone(); let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move { Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| { let chunk_exists = pbs_runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false) target.cond_touch_chunk(&info.digest, false)
})?; })?;
if chunk_exists { if chunk_exists {
@ -85,7 +85,7 @@ async fn pull_index_chunks<I: IndexFile>(
let raw_size = chunk.raw_size() as usize; let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput // decode, verify and write in a separate threads to maximize throughput
crate::tools::runtime::block_in_place(|| { pbs_runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size())) verify_and_write_channel.send((chunk, info.digest, info.size()))
})?; })?;

View File

@ -113,7 +113,7 @@ impl Stream for PxarBackupStream {
} }
} }
match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
Ok(data) => Poll::Ready(Some(data)), Ok(data) => Poll::Ready(Some(data)),
Err(_) => { Err(_) => {
let error = self.error.lock().unwrap(); let error = self.error.lock().unwrap();

View File

@ -7,7 +7,7 @@ use anyhow::{bail, Error};
use super::BackupReader; use super::BackupReader;
use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk}; use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
use crate::tools::runtime::block_on; use pbs_runtime::block_on;
/// Read chunks from remote host using ``BackupReader`` /// Read chunks from remote host using ``BackupReader``
#[derive(Clone)] #[derive(Clone)]

View File

@ -47,7 +47,6 @@ pub mod loopdev;
pub mod lru_cache; pub mod lru_cache;
pub mod async_lru_cache; pub mod async_lru_cache;
pub mod nom; pub mod nom;
pub mod runtime;
pub mod serde_filter; pub mod serde_filter;
pub mod statistics; pub mod statistics;
pub mod subscription; pub mod subscription;

View File

@ -228,7 +228,7 @@ pub fn check_subscription(key: String, server_id: String) -> Result<Subscription
let now = proxmox::tools::time::epoch_i64(); let now = proxmox::tools::time::epoch_i64();
let (response, challenge) = tools::runtime::block_on(register_subscription(&key, &server_id, now)) let (response, challenge) = pbs_runtime::block_on(register_subscription(&key, &server_id, now))
.map_err(|err| format_err!("Error checking subscription: {}", err))?; .map_err(|err| format_err!("Error checking subscription: {}", err))?;
parse_register_response(&response, key, server_id, now, &challenge) parse_register_response(&response, key, server_id, now, &challenge)

View File

@ -7,7 +7,7 @@ use tokio::io::{AsyncRead, ReadBuf};
use futures::ready; use futures::ready;
use futures::stream::Stream; use futures::stream::Stream;
use crate::tools::runtime::block_in_place; use pbs_runtime::block_in_place;
/// Wrapper struct to convert a Reader into a Stream /// Wrapper struct to convert a Reader into a Stream
pub struct WrappedReaderStream<R: Read + Unpin> { pub struct WrappedReaderStream<R: Read + Unpin> {
@ -108,7 +108,7 @@ mod test {
#[test] #[test]
fn test_wrapped_stream_reader() -> Result<(), Error> { fn test_wrapped_stream_reader() -> Result<(), Error> {
crate::tools::runtime::main(async { pbs_runtime::main(async {
run_wrapped_stream_reader_test().await run_wrapped_stream_reader_test().await
}) })
} }