use RateLimitConfig for HttpClient and pull
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
parent
6eb756bcab
commit
2d5287fbbc
|
@ -24,7 +24,7 @@ use proxmox_http::client::{HttpsConnector, RateLimiter};
|
||||||
use proxmox_http::uri::build_authority;
|
use proxmox_http::uri::build_authority;
|
||||||
use proxmox_async::broadcast_future::BroadcastFuture;
|
use proxmox_async::broadcast_future::BroadcastFuture;
|
||||||
|
|
||||||
use pbs_api_types::{Authid, Userid};
|
use pbs_api_types::{Authid, Userid, RateLimitConfig};
|
||||||
use pbs_tools::json::json_object_to_query;
|
use pbs_tools::json::json_object_to_query;
|
||||||
use pbs_tools::ticket;
|
use pbs_tools::ticket;
|
||||||
use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
|
use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
|
||||||
|
@ -51,8 +51,7 @@ pub struct HttpClientOptions {
|
||||||
ticket_cache: bool,
|
ticket_cache: bool,
|
||||||
fingerprint_cache: bool,
|
fingerprint_cache: bool,
|
||||||
verify_cert: bool,
|
verify_cert: bool,
|
||||||
rate_limit: Option<u64>,
|
limit: RateLimitConfig,
|
||||||
bucket_size: Option<u64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpClientOptions {
|
impl HttpClientOptions {
|
||||||
|
@ -112,13 +111,8 @@ impl HttpClientOptions {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rate_limit(mut self, rate_limit: Option<u64>) -> Self {
|
pub fn rate_limit(mut self, rate_limit: RateLimitConfig) -> Self {
|
||||||
self.rate_limit = rate_limit;
|
self.limit = rate_limit;
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn bucket_size(mut self, bucket_size: Option<u64>) -> Self {
|
|
||||||
self.bucket_size = bucket_size;
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,8 +127,7 @@ impl Default for HttpClientOptions {
|
||||||
ticket_cache: false,
|
ticket_cache: false,
|
||||||
fingerprint_cache: false,
|
fingerprint_cache: false,
|
||||||
verify_cert: true,
|
verify_cert: true,
|
||||||
rate_limit: None,
|
limit: RateLimitConfig::default(), // unlimited
|
||||||
bucket_size: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -359,10 +352,18 @@ impl HttpClient {
|
||||||
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
|
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
|
||||||
let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
|
||||||
|
|
||||||
if let Some(rate_limit) = options.rate_limit {
|
if let Some(rate_in) = options.limit.rate_in {
|
||||||
let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
|
let burst_in = options.limit.burst_in.unwrap_or_else(|| rate_in).as_u64();
|
||||||
https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
|
https.set_read_limiter(Some(Arc::new(Mutex::new(
|
||||||
https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
|
RateLimiter::new(rate_in.as_u64(), burst_in)
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(rate_out) = options.limit.rate_out {
|
||||||
|
let burst_out = options.limit.burst_out.unwrap_or_else(|| rate_out).as_u64();
|
||||||
|
https.set_write_limiter(Some(Arc::new(Mutex::new(
|
||||||
|
RateLimiter::new(rate_out.as_u64(), burst_out)
|
||||||
|
))));
|
||||||
}
|
}
|
||||||
|
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
|
|
|
@ -14,7 +14,7 @@ use proxmox_schema::*;
|
||||||
use proxmox_router::cli::{complete_file_name, shellword_split};
|
use proxmox_router::cli::{complete_file_name, shellword_split};
|
||||||
use proxmox::tools::fs::file_get_json;
|
use proxmox::tools::fs::file_get_json;
|
||||||
|
|
||||||
use pbs_api_types::{BACKUP_REPO_URL, Authid, UserWithTokens};
|
use pbs_api_types::{BACKUP_REPO_URL, Authid, RateLimitConfig, UserWithTokens};
|
||||||
use pbs_datastore::BackupDir;
|
use pbs_datastore::BackupDir;
|
||||||
use pbs_tools::json::json_object_to_query;
|
use pbs_tools::json::json_object_to_query;
|
||||||
|
|
||||||
|
@ -135,16 +135,16 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
|
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
|
||||||
connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
|
let rate_limit = RateLimitConfig::default(); // unlimited
|
||||||
|
connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
|
||||||
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
|
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connect_rate_limited(
|
pub fn connect_rate_limited(
|
||||||
repo: &BackupRepository,
|
repo: &BackupRepository,
|
||||||
rate: Option<u64>,
|
rate_limit: RateLimitConfig,
|
||||||
bucket_size: Option<u64>,
|
|
||||||
) -> Result<HttpClient, Error> {
|
) -> Result<HttpClient, Error> {
|
||||||
connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
|
connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
|
||||||
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
|
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,15 +152,13 @@ fn connect_do(
|
||||||
server: &str,
|
server: &str,
|
||||||
port: u16,
|
port: u16,
|
||||||
auth_id: &Authid,
|
auth_id: &Authid,
|
||||||
rate_limit: Option<u64>,
|
rate_limit: RateLimitConfig,
|
||||||
bucket_size: Option<u64>,
|
|
||||||
) -> Result<HttpClient, Error> {
|
) -> Result<HttpClient, Error> {
|
||||||
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
|
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
|
||||||
|
|
||||||
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
|
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
|
||||||
let options = HttpClientOptions::new_interactive(password, fingerprint)
|
let options = HttpClientOptions::new_interactive(password, fingerprint)
|
||||||
.rate_limit(rate_limit)
|
.rate_limit(rate_limit);
|
||||||
.bucket_size(bucket_size);
|
|
||||||
|
|
||||||
HttpClient::new(server, port, auth_id, options)
|
HttpClient::new(server, port, auth_id, options)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,9 @@ use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
|
BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
|
||||||
TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
|
TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
|
||||||
Authid, CryptMode, Fingerprint, GroupListItem, PruneListItem, PruneOptions,
|
Authid, CryptMode, Fingerprint, GroupListItem, HumanByte,
|
||||||
SnapshotListItem, StorageStatus,
|
PruneListItem, PruneOptions, RateLimitConfig, SnapshotListItem,
|
||||||
|
StorageStatus,
|
||||||
};
|
};
|
||||||
use pbs_client::{
|
use pbs_client::{
|
||||||
BACKUP_SOURCE_SCHEMA,
|
BACKUP_SOURCE_SCHEMA,
|
||||||
|
@ -640,8 +641,16 @@ async fn create_backup(
|
||||||
verify_chunk_size(size)?;
|
verify_chunk_size(size)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let rate_limit = param["rate"].as_u64();
|
let rate = match param["rate"].as_str() {
|
||||||
let bucket_size = param["burst"].as_u64();
|
Some(s) => Some(s.parse::<HumanByte>()?),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
let burst = match param["burst"].as_str() {
|
||||||
|
Some(s) => Some(s.parse::<HumanByte>()?),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
|
||||||
|
|
||||||
let crypto = crypto_parameters(¶m)?;
|
let crypto = crypto_parameters(¶m)?;
|
||||||
|
|
||||||
|
@ -737,7 +746,7 @@ async fn create_backup(
|
||||||
|
|
||||||
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
|
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
|
||||||
|
|
||||||
let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
|
let client = connect_rate_limited(&repo, rate_limit)?;
|
||||||
record_repository(&repo);
|
record_repository(&repo);
|
||||||
|
|
||||||
println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
|
println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
|
||||||
|
@ -1092,10 +1101,18 @@ async fn restore(param: Value) -> Result<Value, Error> {
|
||||||
|
|
||||||
let archive_name = json::required_string_param(¶m, "archive-name")?;
|
let archive_name = json::required_string_param(¶m, "archive-name")?;
|
||||||
|
|
||||||
let rate_limit = param["rate"].as_u64();
|
let rate = match param["rate"].as_str() {
|
||||||
let bucket_size = param["burst"].as_u64();
|
Some(s) => Some(s.parse::<HumanByte>()?),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
let burst = match param["burst"].as_str() {
|
||||||
|
Some(s) => Some(s.parse::<HumanByte>()?),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
|
let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
|
||||||
|
|
||||||
|
let client = connect_rate_limited(&repo, rate_limit)?;
|
||||||
record_repository(&repo);
|
record_repository(&repo);
|
||||||
|
|
||||||
let path = json::required_string_param(¶m, "snapshot")?;
|
let path = json::required_string_param(¶m, "snapshot")?;
|
||||||
|
|
|
@ -12,7 +12,7 @@ use pbs_client::{HttpClient, HttpClientOptions};
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
REMOTE_ID_SCHEMA, REMOTE_PASSWORD_SCHEMA, Remote, RemoteConfig, RemoteConfigUpdater,
|
REMOTE_ID_SCHEMA, REMOTE_PASSWORD_SCHEMA, Remote, RemoteConfig, RemoteConfigUpdater,
|
||||||
Authid, PROXMOX_CONFIG_DIGEST_SCHEMA, DATASTORE_SCHEMA, GroupListItem,
|
Authid, PROXMOX_CONFIG_DIGEST_SCHEMA, DATASTORE_SCHEMA, GroupListItem,
|
||||||
DataStoreListItem, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
|
DataStoreListItem, RateLimitConfig, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
|
||||||
};
|
};
|
||||||
use pbs_config::sync;
|
use pbs_config::sync;
|
||||||
|
|
||||||
|
@ -280,8 +280,15 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper to get client for remote.cfg entry
|
/// Helper to get client for remote.cfg entry
|
||||||
pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
|
pub async fn remote_client(
|
||||||
let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
|
remote: &Remote,
|
||||||
|
limit: Option<RateLimitConfig>,
|
||||||
|
) -> Result<HttpClient, Error> {
|
||||||
|
let mut options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
|
||||||
|
|
||||||
|
if let Some(limit) = limit {
|
||||||
|
options = options.rate_limit(limit);
|
||||||
|
}
|
||||||
|
|
||||||
let client = HttpClient::new(
|
let client = HttpClient::new(
|
||||||
&remote.config.host,
|
&remote.config.host,
|
||||||
|
@ -325,7 +332,7 @@ pub async fn scan_remote_datastores(name: String) -> Result<Vec<DataStoreListIte
|
||||||
api_err)
|
api_err)
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = remote_client(&remote)
|
let client = remote_client(&remote, None)
|
||||||
.await
|
.await
|
||||||
.map_err(map_remote_err)?;
|
.map_err(map_remote_err)?;
|
||||||
let api_res = client
|
let api_res = client
|
||||||
|
@ -375,7 +382,7 @@ pub async fn scan_remote_groups(name: String, store: String) -> Result<Vec<Group
|
||||||
api_err)
|
api_err)
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = remote_client(&remote)
|
let client = remote_client(&remote, None)
|
||||||
.await
|
.await
|
||||||
.map_err(map_remote_err)?;
|
.map_err(map_remote_err)?;
|
||||||
let api_res = client
|
let api_res = client
|
||||||
|
|
|
@ -9,7 +9,7 @@ use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
|
||||||
use proxmox_sys::task_log;
|
use proxmox_sys::task_log;
|
||||||
|
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
Authid, SyncJobConfig, GroupFilter, GROUP_FILTER_LIST_SCHEMA,
|
Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
|
||||||
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
|
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||||
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
|
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
|
||||||
};
|
};
|
||||||
|
@ -51,6 +51,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
|
||||||
sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
|
sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
|
||||||
sync_job.remove_vanished,
|
sync_job.remove_vanished,
|
||||||
sync_job.group_filter.clone(),
|
sync_job.group_filter.clone(),
|
||||||
|
sync_job.limit.clone(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,6 +157,10 @@ pub fn do_sync_job(
|
||||||
schema: GROUP_FILTER_LIST_SCHEMA,
|
schema: GROUP_FILTER_LIST_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
},
|
},
|
||||||
|
limit: {
|
||||||
|
type: RateLimitConfig,
|
||||||
|
flatten: true,
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
access: {
|
access: {
|
||||||
|
@ -174,6 +179,7 @@ async fn pull (
|
||||||
remote_store: String,
|
remote_store: String,
|
||||||
remove_vanished: Option<bool>,
|
remove_vanished: Option<bool>,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
|
limit: RateLimitConfig,
|
||||||
_info: &ApiMethod,
|
_info: &ApiMethod,
|
||||||
rpcenv: &mut dyn RpcEnvironment,
|
rpcenv: &mut dyn RpcEnvironment,
|
||||||
) -> Result<String, Error> {
|
) -> Result<String, Error> {
|
||||||
|
@ -190,6 +196,7 @@ async fn pull (
|
||||||
auth_id.clone(),
|
auth_id.clone(),
|
||||||
remove_vanished,
|
remove_vanished,
|
||||||
group_filter,
|
group_filter,
|
||||||
|
limit,
|
||||||
)?;
|
)?;
|
||||||
let client = pull_params.client().await?;
|
let client = pull_params.client().await?;
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,10 @@ use http::StatusCode;
|
||||||
use proxmox_router::HttpError;
|
use proxmox_router::HttpError;
|
||||||
use proxmox_sys::task_log;
|
use proxmox_sys::task_log;
|
||||||
|
|
||||||
use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
|
use pbs_api_types::{
|
||||||
|
Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
|
||||||
|
SnapshotListItem,
|
||||||
|
};
|
||||||
|
|
||||||
use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
|
use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
|
||||||
use pbs_datastore::data_blob::DataBlob;
|
use pbs_datastore::data_blob::DataBlob;
|
||||||
|
@ -41,6 +44,7 @@ pub struct PullParameters {
|
||||||
owner: Authid,
|
owner: Authid,
|
||||||
remove_vanished: bool,
|
remove_vanished: bool,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
|
limit: RateLimitConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PullParameters {
|
impl PullParameters {
|
||||||
|
@ -51,6 +55,7 @@ impl PullParameters {
|
||||||
owner: Authid,
|
owner: Authid,
|
||||||
remove_vanished: Option<bool>,
|
remove_vanished: Option<bool>,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
|
limit: RateLimitConfig,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
let store = DataStore::lookup_datastore(store)?;
|
let store = DataStore::lookup_datastore(store)?;
|
||||||
|
|
||||||
|
@ -66,11 +71,11 @@ impl PullParameters {
|
||||||
remote_store.to_string(),
|
remote_store.to_string(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
|
Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn client(&self) -> Result<HttpClient, Error> {
|
pub async fn client(&self) -> Result<HttpClient, Error> {
|
||||||
crate::api2::config::remote::remote_client(&self.remote).await
|
crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue