pbs-client: add option to use the new RateLimiter

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
Dietmar Maurer 2021-11-03 13:52:13 +01:00
parent 68fd9ca6d6
commit 2419dc0de9
4 changed files with 44 additions and 7 deletions

View File

@ -97,7 +97,7 @@ pathpatterns = "0.1.2"
pxar = { version = "0.10.1", features = [ "tokio-io" ] } pxar = { version = "0.10.1", features = [ "tokio-io" ] }
proxmox = { version = "0.15.0", features = [ "sortable-macro" ] } proxmox = { version = "0.15.0", features = [ "sortable-macro" ] }
proxmox-http = { version = "0.5.0", features = [ "client", "http-helpers", "websocket" ] } proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] }
proxmox-io = "1" proxmox-io = "1"
proxmox-lang = "1" proxmox-lang = "1"
proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-router = { version = "1.1", features = [ "cli" ] }

View File

@ -30,7 +30,7 @@ xdg = "2.2"
pathpatterns = "0.1.2" pathpatterns = "0.1.2"
proxmox = "0.15.0" proxmox = "0.15.0"
proxmox-fuse = "0.1.1" proxmox-fuse = "0.1.1"
proxmox-http = { version = "0.5.0", features = [ "client", "http-helpers", "websocket" ] } proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] }
proxmox-io = { version = "1", features = [ "tokio" ] } proxmox-io = { version = "1", features = [ "tokio" ] }
proxmox-lang = "1" proxmox-lang = "1"
proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-router = { version = "1.1", features = [ "cli" ] }

View File

@ -20,7 +20,7 @@ use proxmox::{
}; };
use proxmox_router::HttpError; use proxmox_router::HttpError;
use proxmox_http::client::HttpsConnector; use proxmox_http::client::{HttpsConnector, RateLimiter};
use proxmox_http::uri::build_authority; use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid}; use pbs_api_types::{Authid, Userid};
@ -51,6 +51,8 @@ pub struct HttpClientOptions {
ticket_cache: bool, ticket_cache: bool,
fingerprint_cache: bool, fingerprint_cache: bool,
verify_cert: bool, verify_cert: bool,
rate_limit: Option<u64>,
bucket_size: Option<u64>,
} }
impl HttpClientOptions { impl HttpClientOptions {
@ -109,6 +111,16 @@ impl HttpClientOptions {
self.verify_cert = verify_cert; self.verify_cert = verify_cert;
self self
} }
pub fn rate_limit(mut self, rate_limit: Option<u64>) -> Self {
self.rate_limit = rate_limit;
self
}
pub fn bucket_size(mut self, bucket_size: Option<u64>) -> Self {
self.bucket_size = bucket_size;
self
}
} }
impl Default for HttpClientOptions { impl Default for HttpClientOptions {
@ -121,6 +133,8 @@ impl Default for HttpClientOptions {
ticket_cache: false, ticket_cache: false,
fingerprint_cache: false, fingerprint_cache: false,
verify_cert: true, verify_cert: true,
rate_limit: None,
bucket_size: None,
} }
} }
} }
@ -343,7 +357,13 @@ impl HttpClient {
httpc.enforce_http(false); // we want https... httpc.enforce_http(false); // we want https...
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))); httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
if let Some(rate_limit) = options.rate_limit {
let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
}
let client = Client::builder() let client = Client::builder()
//.http2_initial_stream_window_size( (1 << 31) - 2) //.http2_initial_stream_window_size( (1 << 31) - 2)

View File

@ -135,15 +135,32 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
} }
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> { pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
connect_do(repo.host(), repo.port(), repo.auth_id()) connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err)) .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
} }
fn connect_do(server: &str, port: u16, auth_id: &Authid) -> Result<HttpClient, Error> { pub fn connect_rate_limited(
repo: &BackupRepository,
rate: Option<u64>,
bucket_size: Option<u64>,
) -> Result<HttpClient, Error> {
connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
}
fn connect_do(
server: &str,
port: u16,
auth_id: &Authid,
rate_limit: Option<u64>,
bucket_size: Option<u64>,
) -> Result<HttpClient, Error> {
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?; let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
let options = HttpClientOptions::new_interactive(password, fingerprint); let options = HttpClientOptions::new_interactive(password, fingerprint)
.rate_limit(rate_limit)
.bucket_size(bucket_size);
HttpClient::new(server, port, auth_id, options) HttpClient::new(server, port, auth_id, options)
} }