From a0172d766bd187c503503f238ddb69ae40ed076b Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sun, 14 Nov 2021 17:20:55 +0100 Subject: [PATCH] traffic-controls: add API/CLI to show current traffic Signed-off-by: Dietmar Maurer --- src/api2/admin/mod.rs | 2 + src/bin/proxmox-backup-proxy.rs | 26 ++++++-- .../proxmox_backup_manager/traffic_control.rs | 35 +++++++++++ src/cached_traffic_control.rs | 60 +++++++++++++++++++ src/lib.rs | 2 +- 5 files changed, 120 insertions(+), 5 deletions(-) diff --git a/src/api2/admin/mod.rs b/src/api2/admin/mod.rs index b5214c21..4667355a 100644 --- a/src/api2/admin/mod.rs +++ b/src/api2/admin/mod.rs @@ -6,10 +6,12 @@ use proxmox_router::list_subdirs_api_method; pub mod datastore; pub mod sync; pub mod verify; +pub mod traffic_control; const SUBDIRS: SubdirMap = &[ ("datastore", &datastore::ROUTER), ("sync", &sync::ROUTER), + ("traffic-control", &traffic_control::ROUTER), ("verify", &verify::ROUTER) ]; diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 0fc61ed5..4c9f1b80 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -35,6 +35,7 @@ use proxmox_backup::rrd_cache::{ initialize_rrd_cache, rrd_update_gauge, rrd_update_derive, rrd_sync_journal, }; use proxmox_backup::{ + TRAFFIC_CONTROL_CACHE, server::{ auth::check_pbs_auth, jobstate::{ @@ -71,7 +72,6 @@ use proxmox_backup::api2::pull::do_sync_job; use proxmox_backup::api2::tape::backup::do_tape_backup_job; use proxmox_backup::server::do_verification_job; use proxmox_backup::server::do_prune_job; -use proxmox_backup::TrafficControlCache; fn main() -> Result<(), Error> { proxmox_backup::tools::setup_safe_path_env(); @@ -329,6 +329,7 @@ async fn run() -> Result<(), Error> { start_task_scheduler(); start_stat_generator(); + start_traffic_control_updater(); server.await?; log::info!("server shutting down, waiting for active workers to complete"); @@ -465,6 +466,13 @@ fn start_task_scheduler() { tokio::spawn(task.map(|_| ())); } +fn start_traffic_control_updater() { + let abort_future = proxmox_rest_server::shutdown_future(); + let future = Box::pin(run_traffic_control_updater()); + let task = futures::future::select(future, abort_future); + tokio::spawn(task.map(|_| ())); +} + use std::time::{SystemTime, Instant, Duration, UNIX_EPOCH}; fn next_minute() -> Result { @@ -1086,9 +1094,19 @@ fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &st // Test WITH // proxmox-backup-client restore vm/201/2021-10-22T09:55:56Z drive-scsi0.img img1.img --repository localhost:store2 -lazy_static::lazy_static!{ - static ref TRAFFIC_CONTROL_CACHE: Arc> = - Arc::new(Mutex::new(TrafficControlCache::new())); +async fn run_traffic_control_updater() { + + loop { + let delay_target = Instant::now() + Duration::from_secs(1); + + { + let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap(); + cache.compute_current_rates(); + } + + tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; + } + } fn lookup_rate_limiter( diff --git a/src/bin/proxmox_backup_manager/traffic_control.rs b/src/bin/proxmox_backup_manager/traffic_control.rs index 03679621..47d599db 100644 --- a/src/bin/proxmox_backup_manager/traffic_control.rs +++ b/src/bin/proxmox_backup_manager/traffic_control.rs @@ -7,6 +7,7 @@ use proxmox_schema::api; use pbs_api_types::TRAFFIC_CONTROL_ID_SCHEMA; use proxmox_backup::api2; +use proxmox_backup::client_helpers::connect_to_localhost; #[api( @@ -75,10 +76,44 @@ fn show_traffic_control(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result Ok(Value::Null) } +#[api( + input: { + properties: { + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, + } + } +)] +/// Show current traffic for all rules. +async fn show_current_traffic(param: Value) -> Result { + + let output_format = get_output_format(¶m); + + let client = connect_to_localhost()?; + + let mut result = client.get(&"api2/json/admin/traffic-control", Some(param)).await?; + + let mut data = result["data"].take(); + + let info = &api2::admin::traffic_control::API_METHOD_SHOW_CURRENT_TRAFFIC; + + let options = default_table_format_options() + .column(ColumnConfig::new("name")) + .column(ColumnConfig::new("rate-in")) + .column(ColumnConfig::new("rate-out")); + + format_and_print_result_full(&mut data, &info.returns, &output_format, &options); + + Ok(Value::Null) +} + pub fn traffic_control_commands() -> CommandLineInterface { let cmd_def = CliCommandMap::new() .insert("list", CliCommand::new(&API_METHOD_LIST_TRAFFIC_CONTROLS)) + .insert("traffic", CliCommand::new(&API_METHOD_SHOW_CURRENT_TRAFFIC)) .insert( "show", CliCommand::new(&API_METHOD_SHOW_TRAFFIC_CONTROL) diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs index df2ba2f8..426ebba7 100644 --- a/src/cached_traffic_control.rs +++ b/src/cached_traffic_control.rs @@ -2,6 +2,8 @@ use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Instant; +use std::convert::TryInto; use anyhow::Error; use cidr::IpInet; @@ -18,14 +20,28 @@ use pbs_config::ConfigVersionCache; use super::SharedRateLimiter; +lazy_static::lazy_static!{ + pub static ref TRAFFIC_CONTROL_CACHE: Arc> = + Arc::new(Mutex::new(TrafficControlCache::new())); +} + struct ParsedTcRule { config: TrafficControlRule, // original rule config networks: Vec, // parsed networks timeframe: Vec, // parsed timeframe } +pub struct TrafficStat { + pub traffic_in: u64, + pub rate_in: u64, + pub traffic_out: u64, + pub rate_out: u64, +} + pub struct TrafficControlCache { use_shared_memory: bool, + last_rate_compute: Instant, + current_rate_map: HashMap, last_update: i64, last_traffic_control_generation: usize, rules: Vec, @@ -111,6 +127,8 @@ impl TrafficControlCache { last_traffic_control_generation: 0, last_update: 0, use_utc: false, + last_rate_compute: Instant::now(), + current_rate_map: HashMap::new(), } } @@ -148,6 +166,48 @@ impl TrafficControlCache { self.update_config(&config) } + pub fn compute_current_rates(&mut self) { + + let elapsed = self.last_rate_compute.elapsed().as_micros(); + if elapsed < 200_000 { return } // not enough data + + let mut new_rate_map = HashMap::new(); + + for (rule, (read_limit, write_limit)) in self.limiter_map.iter() { + let traffic_in = read_limit.as_ref().map(|l| l.traffic()).unwrap_or(0); + let traffic_out = write_limit.as_ref().map(|l| l.traffic()).unwrap_or(0); + + let traffic_diff_in; + let traffic_diff_out; + + if let Some(stat) = self.current_rate_map.get(rule) { + traffic_diff_in = traffic_in.saturating_sub(stat.traffic_in); + traffic_diff_out = traffic_out.saturating_sub(stat.traffic_out); + } else { + traffic_diff_in = 0; + traffic_diff_out = 0; + } + + let rate_in = ((traffic_diff_in as u128) * 1_000_000) / elapsed; + let rate_out = ((traffic_diff_out as u128) * 1_000_000) / elapsed; + + let stat = TrafficStat { + traffic_in, + traffic_out, + rate_in: rate_in.try_into().unwrap_or(u64::MAX), + rate_out: rate_out.try_into().unwrap_or(u64::MAX), + }; + new_rate_map.insert(rule.clone(), stat); + } + + self.current_rate_map = new_rate_map; + + self.last_rate_compute = Instant::now() + } + + pub fn current_rate_map(&self) -> &HashMap { + &self.current_rate_map + } fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> { self.limiter_map.retain(|key, _value| config.sections.contains_key(key)); diff --git a/src/lib.rs b/src/lib.rs index 8e5b4d37..98020b58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,7 +37,7 @@ mod shared_rate_limiter; pub use shared_rate_limiter::SharedRateLimiter; mod cached_traffic_control; -pub use cached_traffic_control::TrafficControlCache; +pub use cached_traffic_control::{TrafficControlCache, TRAFFIC_CONTROL_CACHE}; /// Get the server's certificate info (from `proxy.pem`).