fix #2870: renew tickets in HttpClient

by packing the auth into a RwLock and starting a background
future that renews the ticket every 15 minutes

we still use the BroadcastFuture for the first ticket and only
if that is finished we start the scheduled future

we have to store an abort handle for the renewal future and abort it when
the http client is dropped, so we do not request new tickets forever

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2020-09-15 11:15:00 +02:00 committed by Dietmar Maurer
parent 9626c28619
commit dd4b42bac1
1 changed files with 58 additions and 9 deletions

View File

@ -1,6 +1,7 @@
use std::io::Write; use std::io::Write;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use futures::*; use futures::*;
@ -29,7 +30,7 @@ use crate::tools::{self, BroadcastFuture, DEFAULT_ENCODE_SET};
#[derive(Clone)] #[derive(Clone)]
pub struct AuthInfo { pub struct AuthInfo {
pub username: String, pub userid: Userid,
pub ticket: String, pub ticket: String,
pub token: String, pub token: String,
} }
@ -99,7 +100,9 @@ pub struct HttpClient {
client: Client<HttpsConnector>, client: Client<HttpsConnector>,
server: String, server: String,
fingerprint: Arc<Mutex<Option<String>>>, fingerprint: Arc<Mutex<Option<String>>>,
auth: BroadcastFuture<AuthInfo>, first_auth: BroadcastFuture<()>,
auth: Arc<RwLock<AuthInfo>>,
ticket_abort: futures::future::AbortHandle,
_options: HttpClientOptions, _options: HttpClientOptions,
} }
@ -317,6 +320,41 @@ impl HttpClient {
} }
}; };
let auth = Arc::new(RwLock::new(AuthInfo {
userid: userid.clone(),
ticket: password.clone(),
token: "".to_string(),
}));
let server2 = server.to_string();
let client2 = client.clone();
let auth2 = auth.clone();
let prefix2 = options.prefix.clone();
let renewal_future = async move {
loop {
tokio::time::delay_for(Duration::new(60*15, 0)).await; // 15 minutes
let (userid, ticket) = {
let authinfo = auth2.read().unwrap().clone();
(authinfo.userid, authinfo.ticket)
};
match Self::credentials(client2.clone(), server2.clone(), userid, ticket).await {
Ok(auth) => {
if use_ticket_cache & &prefix2.is_some() {
let _ = store_ticket_info(prefix2.as_ref().unwrap(), &server2, &auth.userid.to_string(), &auth.ticket, &auth.token);
}
*auth2.write().unwrap() = auth;
},
Err(err) => {
eprintln!("re-authentication failed: {}", err);
return;
}
}
}
};
let (renewal_future, ticket_abort) = futures::future::abortable(renewal_future);
let login_future = Self::credentials( let login_future = Self::credentials(
client.clone(), client.clone(),
server.to_owned(), server.to_owned(),
@ -325,13 +363,14 @@ impl HttpClient {
).map_ok({ ).map_ok({
let server = server.to_string(); let server = server.to_string();
let prefix = options.prefix.clone(); let prefix = options.prefix.clone();
let authinfo = auth.clone();
move |auth| { move |auth| {
if use_ticket_cache & &prefix.is_some() { if use_ticket_cache & &prefix.is_some() {
let _ = store_ticket_info(prefix.as_ref().unwrap(), &server, &auth.username, &auth.ticket, &auth.token); let _ = store_ticket_info(prefix.as_ref().unwrap(), &server, &auth.userid.to_string(), &auth.ticket, &auth.token);
} }
*authinfo.write().unwrap() = auth;
auth tokio::spawn(renewal_future);
} }
}); });
@ -339,7 +378,9 @@ impl HttpClient {
client, client,
server: String::from(server), server: String::from(server),
fingerprint: verified_fingerprint, fingerprint: verified_fingerprint,
auth: BroadcastFuture::new(Box::new(login_future)), auth,
ticket_abort,
first_auth: BroadcastFuture::new(Box::new(login_future)),
_options: options, _options: options,
}) })
} }
@ -349,7 +390,9 @@ impl HttpClient {
/// Login is done on demand, so this is only required if you need /// Login is done on demand, so this is only required if you need
/// access to authentication data in 'AuthInfo'. /// access to authentication data in 'AuthInfo'.
pub async fn login(&self) -> Result<AuthInfo, Error> { pub async fn login(&self) -> Result<AuthInfo, Error> {
self.auth.listen().await self.first_auth.listen().await?;
let authinfo = self.auth.read().unwrap();
Ok(authinfo.clone())
} }
/// Returns the optional fingerprint passed to the new() constructor. /// Returns the optional fingerprint passed to the new() constructor.
@ -588,7 +631,7 @@ impl HttpClient {
let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap(); let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
let cred = Self::api_request(client, req).await?; let cred = Self::api_request(client, req).await?;
let auth = AuthInfo { let auth = AuthInfo {
username: cred["data"]["username"].as_str().unwrap().to_owned(), userid: cred["data"]["username"].as_str().unwrap().parse()?,
ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(), ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(), token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
}; };
@ -666,6 +709,12 @@ impl HttpClient {
} }
} }
impl Drop for HttpClient {
fn drop(&mut self) {
self.ticket_abort.abort();
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct H2Client { pub struct H2Client {