implement rate limiter in shared memory
This kind of rate limiter can be used among several processes (as long as all set the same rate/burst).
This commit is contained in:
parent
d5f58006d3
commit
de21d4efdc
@ -107,6 +107,7 @@ proxmox-section-config = "1"
|
||||
proxmox-tfa = { version = "1", features = [ "u2f" ] }
|
||||
proxmox-time = "1"
|
||||
proxmox-uuid = "1"
|
||||
proxmox-shared-memory = "0.1.0"
|
||||
|
||||
proxmox-acme-rs = "0.3"
|
||||
proxmox-apt = "0.8.0"
|
||||
|
@ -16,6 +16,8 @@ use pbs_api_types::TrafficControlRule;
|
||||
|
||||
use pbs_config::ConfigVersionCache;
|
||||
|
||||
use super::SharedRateLimiter;
|
||||
|
||||
struct ParsedTcRule {
|
||||
config: TrafficControlRule, // original rule config
|
||||
networks: Vec<IpInet>, // parsed networks
|
||||
@ -23,6 +25,7 @@ struct ParsedTcRule {
|
||||
}
|
||||
|
||||
pub struct TrafficControlCache {
|
||||
use_shared_memory: bool,
|
||||
last_update: i64,
|
||||
last_traffic_control_generation: usize,
|
||||
rules: Vec<ParsedTcRule>,
|
||||
@ -85,17 +88,24 @@ fn cannonical_ip(ip: IpAddr) -> IpAddr {
|
||||
}
|
||||
|
||||
fn create_limiter(
|
||||
use_shared_memory: bool,
|
||||
name: &str,
|
||||
rate: u64,
|
||||
burst: u64,
|
||||
_direction: bool, // false => in, true => out
|
||||
) -> Result<Arc<dyn ShareableRateLimit>, Error> {
|
||||
Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst))))
|
||||
if use_shared_memory {
|
||||
let limiter = SharedRateLimiter::mmap_shmem(name, rate, burst)?;
|
||||
Ok(Arc::new(limiter))
|
||||
} else {
|
||||
Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst))))
|
||||
}
|
||||
}
|
||||
|
||||
impl TrafficControlCache {
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
use_shared_memory: true,
|
||||
rules: Vec::new(),
|
||||
limiter_map: HashMap::new(),
|
||||
last_traffic_control_generation: 0,
|
||||
@ -162,7 +172,13 @@ impl TrafficControlCache {
|
||||
}
|
||||
None => {
|
||||
if let Some(rate_in) = rule.rate_in {
|
||||
let limiter = create_limiter(rate_in, rule.burst_in.unwrap_or(rate_in), false)?;
|
||||
let name = format!("{}.in", rule.name);
|
||||
let limiter = create_limiter(
|
||||
self.use_shared_memory,
|
||||
&name,
|
||||
rate_in,
|
||||
rule.burst_in.unwrap_or(rate_in),
|
||||
)?;
|
||||
entry.0 = Some(limiter);
|
||||
}
|
||||
}
|
||||
@ -179,7 +195,13 @@ impl TrafficControlCache {
|
||||
}
|
||||
None => {
|
||||
if let Some(rate_out) = rule.rate_out {
|
||||
let limiter = create_limiter(rate_out, rule.burst_out.unwrap_or(rate_out), true)?;
|
||||
let name = format!("{}.out", rule.name);
|
||||
let limiter = create_limiter(
|
||||
self.use_shared_memory,
|
||||
&name,
|
||||
rate_out,
|
||||
rule.burst_out.unwrap_or(rate_out),
|
||||
)?;
|
||||
entry.1 = Some(limiter);
|
||||
}
|
||||
}
|
||||
|
@ -33,9 +33,13 @@ pub mod client_helpers;
|
||||
|
||||
pub mod rrd_cache;
|
||||
|
||||
mod shared_rate_limiter;
|
||||
pub use shared_rate_limiter::SharedRateLimiter;
|
||||
|
||||
mod cached_traffic_control;
|
||||
pub use cached_traffic_control::TrafficControlCache;
|
||||
|
||||
|
||||
/// Get the server's certificate info (from `proxy.pem`).
|
||||
pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
|
||||
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
|
||||
|
110
src/shared_rate_limiter.rs
Normal file
110
src/shared_rate_limiter.rs
Normal file
@ -0,0 +1,110 @@
|
||||
use std::path::PathBuf;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
use nix::sys::stat::Mode;
|
||||
|
||||
use proxmox::tools::fs::{create_path, CreateOptions};
|
||||
|
||||
use proxmox_http::client::{RateLimit, RateLimiter, ShareableRateLimit};
|
||||
use proxmox_shared_memory::{Init, SharedMemory, SharedMutex};
|
||||
use proxmox_shared_memory::{check_subtype, initialize_subtype};
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup SharedRateLimiter v1.0")[0..8];
|
||||
pub const PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0: [u8; 8] = [6, 58, 213, 96, 161, 122, 130, 117];
|
||||
|
||||
const BASE_PATH: &str = pbs_buildcfg::rundir!("/shmem/tbf");
|
||||
|
||||
// Wrap RateLimiter, so that we can provide an Init impl
|
||||
#[repr(C)]
|
||||
struct WrapLimiter(RateLimiter);
|
||||
|
||||
impl Init for WrapLimiter {
|
||||
fn initialize(this: &mut MaybeUninit<Self>) {
|
||||
// default does not matter here, because we override later
|
||||
this.write(WrapLimiter(RateLimiter::new(1_000_000, 1_000_000)));
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
struct SharedRateLimiterData {
|
||||
magic: [u8; 8],
|
||||
tbf: SharedMutex<WrapLimiter>,
|
||||
padding: [u8; 4096 - 120],
|
||||
}
|
||||
|
||||
impl Init for SharedRateLimiterData {
|
||||
fn initialize(this: &mut MaybeUninit<Self>) {
|
||||
unsafe {
|
||||
let me = &mut *this.as_mut_ptr();
|
||||
me.magic = PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0;
|
||||
initialize_subtype(&mut me.tbf);
|
||||
}
|
||||
}
|
||||
|
||||
fn check_type_magic(this: &MaybeUninit<Self>) -> Result<(), Error> {
|
||||
unsafe {
|
||||
let me = &*this.as_ptr();
|
||||
if me.magic != PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0 {
|
||||
bail!("SharedRateLimiterData: wrong magic number");
|
||||
}
|
||||
check_subtype(&me.tbf)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SharedRateLimiter {
|
||||
shmem: SharedMemory<SharedRateLimiterData>
|
||||
}
|
||||
|
||||
impl SharedRateLimiter {
|
||||
|
||||
pub fn mmap_shmem(name: &str, rate: u64, burst: u64) -> Result<Self, Error> {
|
||||
let mut path = PathBuf::from(BASE_PATH);
|
||||
|
||||
let user = pbs_config::backup_user()?;
|
||||
|
||||
let dir_opts = CreateOptions::new()
|
||||
.perm(Mode::from_bits_truncate(0o770))
|
||||
.owner(user.uid)
|
||||
.group(user.gid);
|
||||
|
||||
create_path(
|
||||
&path,
|
||||
Some(dir_opts.clone()),
|
||||
Some(dir_opts))?;
|
||||
|
||||
path.push(name);
|
||||
|
||||
let file_opts = CreateOptions::new()
|
||||
.perm(Mode::from_bits_truncate(0o660))
|
||||
.owner(user.uid)
|
||||
.group(user.gid);
|
||||
|
||||
let shmem: SharedMemory<SharedRateLimiterData> =
|
||||
SharedMemory::open(&path, file_opts)?;
|
||||
|
||||
shmem.data().tbf.lock().0.update_rate(rate, burst);
|
||||
|
||||
Ok(Self { shmem })
|
||||
}
|
||||
}
|
||||
|
||||
impl ShareableRateLimit for SharedRateLimiter {
|
||||
fn update_rate(&self, rate: u64, bucket_size: u64) {
|
||||
self.shmem.data().tbf.lock().0
|
||||
.update_rate(rate, bucket_size);
|
||||
}
|
||||
|
||||
fn average_rate(&self, current_time: Instant) -> f64 {
|
||||
self.shmem.data().tbf.lock().0
|
||||
.average_rate(current_time)
|
||||
}
|
||||
|
||||
fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration {
|
||||
self.shmem.data().tbf.lock().0
|
||||
.register_traffic(current_time, data_len)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user