move more tools for the client into subcrates

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller
2021-07-15 12:15:50 +02:00
parent 9eb784076c
commit 4805edc4ec
29 changed files with 115 additions and 102 deletions

View File

@ -11,6 +11,7 @@ use proxmox::api::{api, Permission, RpcEnvironment};
use proxmox::{http_err, list_subdirs_api_method};
use proxmox::{identity, sortable};
use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::{self, Empty, Ticket};
use crate::api2::types::*;

View File

@ -14,6 +14,7 @@ use proxmox::tools::fs::open_file_locked;
use proxmox_openid::{OpenIdAuthenticator, OpenIdConfig};
use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M;
use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::Ticket;
use crate::server::ticket::ApiTicket;

View File

@ -12,6 +12,7 @@ use proxmox::api::{api, Permission, Router, RpcEnvironment};
use proxmox::list_subdirs_api_method;
use pbs_buildcfg::configdir;
use pbs_tools::cert;
use crate::acme::AcmeClient;
use crate::api2::types::Authid;
@ -20,7 +21,6 @@ use crate::api2::types::AcmeDomain;
use crate::config::acl::PRIV_SYS_MODIFY;
use crate::config::node::NodeConfig;
use crate::server::WorkerTask;
use crate::tools::cert;
pub const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))

View File

@ -20,6 +20,7 @@ use proxmox::list_subdirs_api_method;
use proxmox_http::websocket::WebSocket;
use proxmox::{identity, sortable};
use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::{self, Empty, Ticket};
use crate::api2::types::*;
@ -121,7 +122,7 @@ async fn termproxy(
let ticket = Ticket::new(ticket::TERM_PREFIX, &Empty)?
.sign(
crate::auth_helpers::private_auth_key(),
private_auth_key(),
Some(&tools::ticket::term_aad(&userid, &path, port)),
)?;

View File

@ -8,9 +8,10 @@ use proxmox::sys::linux::procfs;
use proxmox::api::{api, ApiMethod, Router, RpcEnvironment, Permission};
use pbs_tools::cert::CertInfo;
use crate::api2::types::*;
use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT};
use crate::tools::cert::CertInfo;
impl std::convert::From<procfs::ProcFsCPUInfo> for NodeCpuInformation {
fn from(info: procfs::ProcFsCPUInfo) -> Self {

View File

@ -1,18 +1,16 @@
use std::path::PathBuf;
use anyhow::{bail, format_err, Error};
use lazy_static::lazy_static;
use openssl::rsa::{Rsa};
use openssl::pkey::{PKey, Public, Private};
use openssl::pkey::{PKey, Public};
use openssl::rsa::Rsa;
use openssl::sha;
use std::path::PathBuf;
use proxmox::tools::fs::{file_get_contents, replace_file, CreateOptions};
use proxmox::try_block;
use pbs_buildcfg::configdir;
use crate::api2::types::Userid;
use pbs_api_types::Userid;
fn compute_csrf_secret_digest(
timestamp: i64,
@ -155,24 +153,6 @@ pub fn csrf_secret() -> &'static [u8] {
&SECRET
}
fn load_private_auth_key() -> Result<PKey<Private>, Error> {
let pem = file_get_contents(configdir!("/authkey.key"))?;
let rsa = Rsa::private_key_from_pem(&pem)?;
let key = PKey::from_rsa(rsa)?;
Ok(key)
}
pub fn private_auth_key() -> &'static PKey<Private> {
lazy_static! {
static ref KEY: PKey<Private> = load_private_auth_key().unwrap();
}
&KEY
}
fn load_public_auth_key() -> Result<PKey<Public>, Error> {
let pem = file_get_contents(configdir!("/authkey.pub"))?;

View File

@ -4,8 +4,8 @@ use futures::*;
use proxmox::try_block;
use proxmox::api::RpcEnvironmentType;
//use proxmox_backup::tools;
//use proxmox_backup::api_schema::config::*;
use pbs_tools::auth::private_auth_key;
use proxmox_backup::server::{
self,
auth::default_api_auth,

View File

@ -28,12 +28,9 @@ use proxmox::{
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use pbs_datastore::catalog::BackupCatalogWriter;
use pbs_tools::sync::StdChannelWriter;
use pbs_tools::tokio::TokioWriterAdapter;
use proxmox_backup::tools::{
self,
StdChannelWriter,
TokioWriterAdapter,
};
use proxmox_backup::api2::types::*;
use proxmox_backup::api2::version;
use proxmox_backup::client::*;
@ -64,6 +61,7 @@ use proxmox_backup::backup::{
Shell,
PruneOptions,
};
use proxmox_backup::tools;
mod proxmox_backup_client;
use proxmox_backup_client::*;

View File

@ -6,6 +6,8 @@ use serde_json::{json, Value};
use proxmox::api::{api, cli::*, RpcEnvironment};
use pbs_tools::percent_encoding::percent_encode_component;
use proxmox_backup::tools;
use proxmox_backup::config;
use proxmox_backup::api2::{self, types::* };
@ -188,7 +190,7 @@ async fn task_stop(param: Value) -> Result<Value, Error> {
let mut client = connect_to_localhost()?;
let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
Ok(Value::Null)

View File

@ -3,6 +3,8 @@ use serde_json::{json, Value};
use proxmox::api::{api, cli::*};
use pbs_tools::percent_encoding::percent_encode_component;
use proxmox_backup::tools;
use proxmox_backup::client::*;
@ -125,7 +127,7 @@ async fn task_stop(param: Value) -> Result<Value, Error> {
let mut client = connect(&repo)?;
let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
Ok(Value::Null)

View File

@ -2,9 +2,10 @@ use anyhow::{bail, Error};
use proxmox::api::{api, cli::*};
use pbs_tools::cert::CertInfo;
use proxmox_backup::config;
use proxmox_backup::auth_helpers::*;
use proxmox_backup::tools::cert::CertInfo;
#[api]
/// Display node certificate information.

View File

@ -14,10 +14,15 @@ use tokio_stream::wrappers::ReceiverStream;
use proxmox::tools::digest_to_hex;
use pbs_datastore::{CATALOG_NAME, CryptConfig};
use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
use pbs_tools::format::HumanByte;
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
use crate::backup::*;
use super::{H2Client, HttpClient};
@ -283,7 +288,7 @@ impl BackupWriter {
if let Some(manifest) = options.previous_manifest {
// try, but ignore errors
match archive_type(archive_name) {
match ArchiveType::from_path(archive_name) {
Ok(ArchiveType::FixedIndex) => {
let _ = self
.download_previous_fixed_index(

View File

@ -24,15 +24,13 @@ use proxmox_http::client::HttpsConnector;
use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid};
use pbs_tools::broadcast_future::BroadcastFuture;
use pbs_tools::json::json_object_to_query;
use pbs_tools::ticket;
use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
use super::pipe_to_stream::PipeToSendStream;
use crate::tools::{
BroadcastFuture,
DEFAULT_ENCODE_SET,
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
};
use super::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
/// Timeout used for several HTTP operations that are expected to finish quickly but may block in
/// certain error conditions. Keep it generous, to avoid false-positive under high load.

View File

@ -7,11 +7,8 @@ use anyhow::Error;
use pbs_api_types::{Authid, Userid};
use pbs_tools::ticket::Ticket;
use crate::{
tools::cert::CertInfo,
auth_helpers::private_auth_key,
};
use pbs_tools::cert::CertInfo;
use pbs_tools::auth::private_auth_key;
mod merge_known_chunks;
pub mod pipe_to_stream;
@ -43,6 +40,8 @@ pub use backup_repo::*;
mod backup_specification;
pub use backup_specification::*;
pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
/// Connect to localhost:8007 as root@pam
///
/// This automatically creates a ticket if run as 'root' user.

View File

@ -13,11 +13,8 @@ use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
use pbs_datastore::catalog::CatalogWriter;
use crate::tools::{
StdChannelWriter,
TokioWriterAdapter,
};
use pbs_tools::sync::StdChannelWriter;
use pbs_tools::tokio::TokioWriterAdapter;
/// Stream implementation to encode and upload .pxar archives.
///

View File

@ -7,15 +7,10 @@ use futures::*;
use proxmox::api::cli::format_and_print_result;
use super::HttpClient;
use crate::{
server::{
worker_is_active_local,
UPID,
},
tools,
};
use pbs_tools::percent_encoding::percent_encode_component;
use super::HttpClient;
use crate::server::{UPID, worker_is_active_local};
/// Display task log on console
///
@ -54,13 +49,13 @@ pub async fn display_task_log(
let abort = abort_count.load(Ordering::Relaxed);
if abort > 0 {
let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
}
let param = json!({ "start": start, "limit": limit, "test-status": true });
let path = format!("api2/json/nodes/localhost/tasks/{}/log", tools::percent_encode_component(upid_str));
let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str));
let result = client.get(&path, Some(param)).await?;
let active = result["active"].as_bool().unwrap();

View File

@ -1,10 +1,8 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::*;
use core::task::Context;
use std::pin::Pin;
use std::task::Poll;
use http::Uri;
use http::{Request, Response};
use hyper::client::connect::{Connected, Connection};

View File

@ -1,180 +0,0 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
use futures::future::{FutureExt, TryFutureExt};
use tokio::sync::oneshot;
/// Broadcast results to registered listeners using asnyc oneshot channels
#[derive(Default)]
pub struct BroadcastData<T> {
result: Option<Result<T, String>>,
listeners: Vec<oneshot::Sender<Result<T, Error>>>,
}
impl <T: Clone> BroadcastData<T> {
pub fn new() -> Self {
Self {
result: None,
listeners: vec![],
}
}
pub fn notify_listeners(&mut self, result: Result<T, String>) {
self.result = Some(result.clone());
loop {
match self.listeners.pop() {
None => { break; },
Some(ch) => {
match &result {
Ok(result) => { let _ = ch.send(Ok(result.clone())); },
Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
}
},
}
}
}
pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
use futures::future::{ok, Either};
match &self.result {
None => {},
Some(Ok(result)) => return Either::Left(ok(result.clone())),
Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
}
let (tx, rx) = oneshot::channel::<Result<T, Error>>();
self.listeners.push(tx);
Either::Right(rx
.map(|res| match res {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(e),
Err(e) => Err(Error::from(e)),
})
)
}
}
type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
struct BroadCastFutureBinding<T> {
broadcast: BroadcastData<T>,
future: Option<SourceFuture<T>>,
}
/// Broadcast future results to registered listeners
pub struct BroadcastFuture<T> {
inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
}
impl<T: Clone + Send + 'static> BroadcastFuture<T> {
/// Create instance for specified source future.
///
/// The result of the future is sent to all registered listeners.
pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
let inner = BroadCastFutureBinding {
broadcast: BroadcastData::new(),
future: Some(Pin::from(source)),
};
Self { inner: Arc::new(Mutex::new(inner)) }
}
/// Creates a new instance with a oneshot channel as trigger
pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
let (tx, rx) = oneshot::channel::<Result<T, Error>>();
let rx = rx
.map_err(Error::from)
.and_then(futures::future::ready);
(Self::new(Box::new(rx)), tx)
}
fn notify_listeners(
inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
result: Result<T, String>,
) {
let mut data = inner.lock().unwrap();
data.broadcast.notify_listeners(result);
}
fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
let mut data = inner.lock().unwrap();
if let Some(source) = data.future.take() {
let inner1 = inner.clone();
let task = source.map(move |value| {
match value {
Ok(value) => Self::notify_listeners(inner1, Ok(value)),
Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
}
});
tokio::spawn(task);
}
data.broadcast.listen()
}
/// Register a listener
pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
let inner2 = self.inner.clone();
async move { Self::spawn(inner2).await }
}
}
#[test]
fn test_broadcast_future() {
use std::sync::atomic::{AtomicUsize, Ordering};
static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
let (sender, trigger) = BroadcastFuture::new_oneshot();
let receiver1 = sender.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res, Ordering::SeqCst);
})
.map_err(|err| { panic!("got error {}", err); })
.map(|_| ());
let receiver2 = sender.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
})
.map_err(|err| { panic!("got error {}", err); })
.map(|_| ());
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let r1 = tokio::spawn(receiver1);
let r2 = tokio::spawn(receiver2);
trigger.send(Ok(1)).unwrap();
let _ = r1.await;
let _ = r2.await;
});
let result = CHECKSUM.load(Ordering::SeqCst);
assert_eq!(result, 3);
// the result stays available until the BroadcastFuture is dropped
rt.block_on(sender.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
})
.map_err(|err| { panic!("got error {}", err); })
.map(|_| ()));
let result = CHECKSUM.load(Ordering::SeqCst);
assert_eq!(result, 7);
}

View File

@ -1,104 +0,0 @@
use std::path::PathBuf;
use std::mem::MaybeUninit;
use anyhow::{bail, format_err, Error};
use foreign_types::ForeignTypeRef;
use openssl::x509::{X509, GeneralName};
use openssl::stack::Stack;
use openssl::pkey::{Public, PKey};
use pbs_buildcfg::configdir;
// C type:
#[allow(non_camel_case_types)]
type ASN1_TIME = <openssl::asn1::Asn1TimeRef as ForeignTypeRef>::CType;
extern "C" {
fn ASN1_TIME_to_tm(s: *const ASN1_TIME, tm: *mut libc::tm) -> libc::c_int;
}
fn asn1_time_to_unix(time: &openssl::asn1::Asn1TimeRef) -> Result<i64, Error> {
let mut c_tm = MaybeUninit::<libc::tm>::uninit();
let rc = unsafe { ASN1_TIME_to_tm(time.as_ptr(), c_tm.as_mut_ptr()) };
if rc != 1 {
bail!("failed to parse ASN1 time");
}
let mut c_tm = unsafe { c_tm.assume_init() };
proxmox::tools::time::timegm(&mut c_tm)
}
pub struct CertInfo {
x509: X509,
}
fn x509name_to_string(name: &openssl::x509::X509NameRef) -> Result<String, Error> {
let mut parts = Vec::new();
for entry in name.entries() {
parts.push(format!("{} = {}", entry.object().nid().short_name()?, entry.data().as_utf8()?));
}
Ok(parts.join(", "))
}
impl CertInfo {
pub fn new() -> Result<Self, Error> {
Self::from_path(PathBuf::from(configdir!("/proxy.pem")))
}
pub fn from_path(path: PathBuf) -> Result<Self, Error> {
Self::from_pem(&proxmox::tools::fs::file_get_contents(&path)?)
.map_err(|err| format_err!("failed to load certificate from {:?} - {}", path, err))
}
pub fn from_pem(cert_pem: &[u8]) -> Result<Self, Error> {
let x509 = openssl::x509::X509::from_pem(&cert_pem)?;
Ok(Self{
x509
})
}
pub fn subject_alt_names(&self) -> Option<Stack<GeneralName>> {
self.x509.subject_alt_names()
}
pub fn subject_name(&self) -> Result<String, Error> {
Ok(x509name_to_string(self.x509.subject_name())?)
}
pub fn issuer_name(&self) -> Result<String, Error> {
Ok(x509name_to_string(self.x509.issuer_name())?)
}
pub fn fingerprint(&self) -> Result<String, Error> {
let fp = self.x509.digest(openssl::hash::MessageDigest::sha256())?;
let fp_string = proxmox::tools::digest_to_hex(&fp);
let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
.collect::<Vec<&str>>().join(":");
Ok(fp_string)
}
pub fn public_key(&self) -> Result<PKey<Public>, Error> {
let pubkey = self.x509.public_key()?;
Ok(pubkey)
}
pub fn not_before(&self) -> &openssl::asn1::Asn1TimeRef {
self.x509.not_before()
}
pub fn not_after(&self) -> &openssl::asn1::Asn1TimeRef {
self.x509.not_after()
}
pub fn not_before_unix(&self) -> Result<i64, Error> {
asn1_time_to_unix(&self.not_before())
}
pub fn not_after_unix(&self) -> Result<i64, Error> {
asn1_time_to_unix(&self.not_after())
}
/// Check if the certificate is expired at or after a specific unix epoch.
pub fn is_expired_after_epoch(&self, epoch: i64) -> Result<bool, Error> {
Ok(self.not_after_unix()? < epoch)
}
}

View File

@ -12,7 +12,6 @@ use std::path::Path;
use anyhow::{bail, format_err, Error};
use serde_json::Value;
use openssl::hash::{hash, DigestBytes, MessageDigest};
use percent_encoding::{utf8_percent_encode, AsciiSet};
pub use proxmox::tools::fd::Fd;
use proxmox::tools::fs::{create_path, CreateOptions};
@ -33,7 +32,6 @@ pub use pbs_tools::process_locker::{
pub mod acl;
pub mod apt;
pub mod async_io;
pub mod cert;
pub mod compression;
pub mod config;
pub mod cpio;
@ -67,17 +65,10 @@ pub use wrapped_reader_stream::{AsyncReaderStream, StdChannelStream, WrappedRead
mod async_channel_writer;
pub use async_channel_writer::AsyncChannelWriter;
mod std_channel_writer;
pub use std_channel_writer::StdChannelWriter;
mod tokio_writer_adapter;
pub use tokio_writer_adapter::TokioWriterAdapter;
mod file_logger;
pub use file_logger::{FileLogger, FileLogOptions};
mod broadcast_future;
pub use broadcast_future::{BroadcastData, BroadcastFuture};
pub use pbs_tools::broadcast_future::{BroadcastData, BroadcastFuture};
/// The `BufferedRead` trait provides a single function
/// `buffered_read`. It returns a reference to an internal buffer. The
@ -235,11 +226,6 @@ pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {
None
}
/// percent encode a url component
pub fn percent_encode_component(comp: &str) -> String {
utf8_percent_encode(comp, percent_encoding::NON_ALPHANUMERIC).to_string()
}
/// Detect modified configuration files
///
/// This function fails with a reasonable error message if checksums do not match.
@ -355,22 +341,6 @@ pub fn pbs_simple_http(proxy_config: Option<ProxyConfig>) -> SimpleHttp {
SimpleHttp::with_options(options)
}
/// This used to be: `SIMPLE_ENCODE_SET` plus space, `"`, `#`, `<`, `>`, backtick, `?`, `{`, `}`
pub const DEFAULT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS // 0..1f and 7e
// The SIMPLE_ENCODE_SET adds space and anything >= 0x7e (7e itself is already included above)
.add(0x20)
.add(0x7f)
// the DEFAULT_ENCODE_SET added:
.add(b' ')
.add(b'"')
.add(b'#')
.add(b'<')
.add(b'>')
.add(b'`')
.add(b'?')
.add(b'{')
.add(b'}');
/// Get an iterator over lines of a file, skipping empty lines and comments (lines starting with a
/// `#`).
pub fn file_get_non_comment_lines<P: AsRef<Path>>(

View File

@ -1,28 +0,0 @@
use std::io::Write;
use std::sync::mpsc::SyncSender;
use anyhow::{Error};
/// Wrapper around SyncSender, which implements Write
///
/// Each write in translated into a send(Vec<u8>).
pub struct StdChannelWriter(SyncSender<Result<Vec<u8>, Error>>);
impl StdChannelWriter {
pub fn new(sender: SyncSender<Result<Vec<u8>, Error>>) -> Self {
Self(sender)
}
}
impl Write for StdChannelWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.0
.send(Ok(buf.to_vec()))
.map_err(proxmox::sys::error::io_err_other)
.and(Ok(buf.len()))
}
fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
}

View File

@ -1,26 +0,0 @@
use std::io::Write;
use tokio::task::block_in_place;
/// Wrapper around a writer which implements Write
///
/// wraps each write with a 'block_in_place' so that
/// any (blocking) writer can be safely used in async context in a
/// tokio runtime
pub struct TokioWriterAdapter<W: Write>(W);
impl<W: Write> TokioWriterAdapter<W> {
pub fn new(writer: W) -> Self {
Self(writer)
}
}
impl<W: Write> Write for TokioWriterAdapter<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
block_in_place(|| self.0.write(buf))
}
fn flush(&mut self) -> Result<(), std::io::Error> {
block_in_place(|| self.0.flush())
}
}