src/client/http_client.rs: cleanup, make login fully async

This commit is contained in:
Dietmar Maurer 2019-04-28 10:55:03 +02:00
parent 3dceb9b304
commit 5a2df00004
2 changed files with 192 additions and 286 deletions

View File

@ -24,6 +24,7 @@ use regex::Regex;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use futures::*;
lazy_static! { lazy_static! {
static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap(); static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap();
@ -135,7 +136,7 @@ fn backup_directory<P: AsRef<Path>>(
let body = Body::wrap_stream(stream); let body = Body::wrap_stream(stream);
client.upload("application/x-proxmox-backup-pxar", body, &path)?; client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?;
Ok(()) Ok(())
} }
@ -235,11 +236,11 @@ fn list_backup_groups(
let repo_url = tools::required_string_param(&param, "repository")?; let repo_url = tools::required_string_param(&param, "repository")?;
let repo: BackupRepository = repo_url.parse()?; let repo: BackupRepository = repo_url.parse()?;
let mut client = HttpClient::new(repo.host(), repo.user()); let client = HttpClient::new(repo.host(), repo.user());
let path = format!("api2/json/admin/datastore/{}/groups", repo.store()); let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
let mut result = client.get(&path)?; let mut result = client.get(&path).wait()?;
record_repository(&repo); record_repository(&repo);
@ -300,12 +301,12 @@ fn list_snapshots(
"backup-id": group.backup_id(), "backup-id": group.backup_id(),
}))?; }))?;
let mut client = HttpClient::new(repo.host(), repo.user()); let client = HttpClient::new(repo.host(), repo.user());
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query); let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
// fixme: params // fixme: params
let result = client.get(&path)?; let result = client.get(&path).wait()?;
record_repository(&repo); record_repository(&repo);
@ -353,7 +354,7 @@ fn forget_snapshots(
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query); let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
let result = client.delete(&path)?; let result = client.delete(&path).wait()?;
record_repository(&repo); record_repository(&repo);
@ -373,7 +374,7 @@ fn start_garbage_collection(
let path = format!("api2/json/admin/datastore/{}/gc", repo.store()); let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
let result = client.post(&path)?; let result = client.post(&path, None).wait()?;
record_repository(&repo); record_repository(&repo);
@ -446,8 +447,6 @@ fn create_backup(
let mut client = HttpClient::new(repo.host(), repo.user()); let mut client = HttpClient::new(repo.host(), repo.user());
client.login()?; // login before starting backup
record_repository(&repo); record_repository(&repo);
println!("Starting backup"); println!("Starting backup");
@ -503,8 +502,6 @@ fn restore(
let mut client = HttpClient::new(repo.host(), repo.user()); let mut client = HttpClient::new(repo.host(), repo.user());
client.login()?; // login before starting
record_repository(&repo); record_repository(&repo);
let path = tools::required_string_param(&param, "snapshot")?; let path = tools::required_string_param(&param, "snapshot")?;
@ -520,7 +517,7 @@ fn restore(
}))?; }))?;
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery); let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery);
let result = client.get(&path)?; let result = client.get(&path).wait()?;
let list = result["data"].as_array().unwrap(); let list = result["data"].as_array().unwrap();
if list.len() == 0 { if list.len() == 0 {
@ -553,7 +550,7 @@ fn restore(
let target = PathBuf::from(target); let target = PathBuf::from(target);
let writer = PxarDecodeWriter::new(&target, true)?; let writer = PxarDecodeWriter::new(&target, true)?;
client.download(&path, Box::new(writer))?; client.download(&path, Box::new(writer)).wait()?;
} else { } else {
bail!("unknown file extensions - unable to download '{}'", archive_name); bail!("unknown file extensions - unable to download '{}'", archive_name);
} }
@ -576,18 +573,19 @@ fn prune(
param.as_object_mut().unwrap().remove("repository"); param.as_object_mut().unwrap().remove("repository");
let result = client.post_json(&path, param)?; let result = client.post(&path, Some(param)).wait()?;
record_repository(&repo); record_repository(&repo);
Ok(result) Ok(result)
} }
// like get, but simply ignore errors and return Null instead
fn try_get(repo: &BackupRepository, url: &str) -> Value { fn try_get(repo: &BackupRepository, url: &str) -> Value {
let mut client = HttpClient::new(repo.host(), repo.user()); let client = HttpClient::new(repo.host(), repo.user());
let mut resp = match client.try_get(url) { let mut resp = match client.get(url).wait() {
Ok(v) => v, Ok(v) => v,
_ => return Value::Null, _ => return Value::Null,
}; };
@ -858,6 +856,9 @@ fn main() {
.insert("restore".to_owned(), restore_cmd_def.into()) .insert("restore".to_owned(), restore_cmd_def.into())
.insert("snapshots".to_owned(), snapshots_cmd_def.into()); .insert("snapshots".to_owned(), snapshots_cmd_def.into());
run_cli_command(cmd_def.into()); hyper::rt::run(futures::future::lazy(move || {
run_cli_command(cmd_def.into());
Ok(())
}));
} }

View File

@ -3,25 +3,32 @@ use failure::*;
use http::Uri; use http::Uri;
use hyper::Body; use hyper::Body;
use hyper::client::Client; use hyper::client::Client;
use hyper::rt::{self, Future};
use xdg::BaseDirectories; use xdg::BaseDirectories;
use chrono::Utc; use chrono::Utc;
use http::Request; use http::Request;
use http::header::HeaderValue;
use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use serde_json::{json, Value}; use serde_json::{json, Value};
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
use crate::tools::{self, tty}; use crate::tools::{self, BroadcastFuture, tty};
#[derive(Clone)]
struct AuthInfo {
username: String,
ticket: String,
token: String,
}
/// HTTP(S) API client /// HTTP(S) API client
pub struct HttpClient { pub struct HttpClient {
username: String, client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
server: String, server: String,
auth: BroadcastFuture<AuthInfo>,
ticket: Option<String>,
token: Option<String>
} }
fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> { fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
@ -104,15 +111,17 @@ fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
impl HttpClient { impl HttpClient {
pub fn new(server: &str, username: &str) -> Self { pub fn new(server: &str, username: &str) -> Self {
let client = Self::build_client();
let login = Self::credentials(client.clone(), server, username);
Self { Self {
client,
server: String::from(server), server: String::from(server),
username: String::from(username), auth: BroadcastFuture::new(login),
ticket: None,
token: None,
} }
} }
fn get_password(&self) -> Result<String, Error> { fn get_password(_username: &str) -> Result<String, Error> {
use std::env::VarError::*; use std::env::VarError::*;
match std::env::var("PBS_PASSWORD") { match std::env::var("PBS_PASSWORD") {
Ok(p) => return Ok(p), Ok(p) => return Ok(p),
@ -130,34 +139,161 @@ impl HttpClient {
bail!("no password input mechanism available"); bail!("no password input mechanism available");
} }
fn build_client() -> Result<Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>, Error> { fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
let mut builder = native_tls::TlsConnector::builder(); let mut builder = native_tls::TlsConnector::builder();
// FIXME: We need a CLI option for this! // FIXME: We need a CLI option for this!
builder.danger_accept_invalid_certs(true); builder.danger_accept_invalid_certs(true);
let tlsconnector = builder.build()?; let tlsconnector = builder.build().unwrap();
let mut httpc = hyper::client::HttpConnector::new(1); let mut httpc = hyper::client::HttpConnector::new(1);
httpc.enforce_http(false); // we want https... httpc.enforce_http(false); // we want https...
let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector)); let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
https.https_only(true); // force it! https.https_only(true); // force it!
let client = Client::builder().build::<_, Body>(https); Client::builder().build::<_, Body>(https)
Ok(client)
} }
fn run_request( pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error> {
request: Request<Body>,
) -> Result<Value, Error> {
let client = Self::build_client()?;
let (tx, rx) = std::sync::mpsc::channel(); let login = self.auth.listen();
let future = client let client = self.client.clone();
.request(request)
login.and_then(move |auth| {
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
let request = Self::api_request(client, req);
request
})
}
pub fn get(&self, path: &str) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder(&self.server, "GET", path, None).unwrap();
self.request(req)
}
pub fn delete(&mut self, path: &str) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder(&self.server, "DELETE", path, None).unwrap();
self.request(req)
}
pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
self.request(req)
}
pub fn download(&mut self, path: &str, mut output: Box<dyn std::io::Write + Send>) -> impl Future<Item=(), Error=Error> {
let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
let login = self.auth.listen();
let client = self.client.clone();
login.and_then(move |auth| {
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
client.request(req)
.map_err(Error::from)
.and_then(|resp| {
let _status = resp.status(); // fixme: ??
resp.into_body()
.map_err(Error::from)
.for_each(move |chunk| {
output.write_all(&chunk)?;
Ok(())
})
})
})
}
pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
let req = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Content-Type", content_type)
.body(body).unwrap();
self.request(req)
}
fn credentials(
client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
server: &str,
username: &str,
) -> Box<Future<Item=AuthInfo, Error=Error> + Send> {
let server = server.to_owned();
let server2 = server.to_owned();
let username = username.to_owned();
let create_request = futures::future::lazy(move || {
let data = if let Some((ticket, _token)) = load_ticket_info(&server, &username) {
json!({ "username": username, "password": ticket })
} else {
let password = match Self::get_password(&username) {
Ok(p) => p,
Err(err) => {
return futures::future::Either::A(futures::future::err(err));
}
};
json!({ "username": username, "password": password })
};
let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
futures::future::Either::B(Self::api_request(client, req))
});
let login_future = create_request
.and_then(move |cred| {
let auth = AuthInfo {
username: cred["data"]["username"].as_str().unwrap().to_owned(),
ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
};
let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
Ok(auth)
});
Box::new(login_future)
}
fn api_request(
client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
req: Request<Body>
) -> impl Future<Item=Value, Error=Error> {
client.request(req)
.map_err(Error::from) .map_err(Error::from)
.and_then(|resp| { .and_then(|resp| {
let status = resp.status(); let status = resp.status();
resp.into_body().concat2().map_err(Error::from) resp
.into_body()
.concat2()
.map_err(Error::from)
.and_then(move |data| { .and_then(move |data| {
let text = String::from_utf8(data.to_vec()).unwrap(); let text = String::from_utf8(data.to_vec()).unwrap();
@ -173,265 +309,34 @@ impl HttpClient {
} }
}) })
}) })
.then(move |res| {
tx.send(res).unwrap();
Ok(())
});
// drop client, else client keeps connectioon open (keep-alive feature)
drop(client);
rt::run(future);
rx.recv().unwrap()
} }
fn run_download( pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
request: Request<Body>,
mut output: Box<dyn std::io::Write + Send>,
) -> Result<(), Error> {
let client = Self::build_client()?;
let (tx, rx) = std::sync::mpsc::channel();
let future = client
.request(request)
.map_err(Error::from)
.and_then(move |resp| {
let _status = resp.status(); // fixme: ??
resp.into_body()
.map_err(Error::from)
.for_each(move |chunk| {
output.write_all(&chunk)?;
Ok(())
})
})
.then(move |res| {
tx.send(res).unwrap();
Ok(())
});
// drop client, else client keeps connectioon open (keep-alive feature)
drop(client);
rt::run(future);
rx.recv().unwrap()
}
pub fn download(&mut self, path: &str, output: Box<dyn std::io::Write + Send>) -> Result<(), Error> {
let path = path.trim_matches('/'); let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
let (ticket, _token) = self.login()?; if let Some(data) = data {
if method == "POST" {
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); let request = Request::builder()
.method(method)
let request = Request::builder() .uri(url)
.method("GET") .header("User-Agent", "proxmox-backup-client/1.0")
.uri(url) .header(hyper::header::CONTENT_TYPE, "application/json")
.header("User-Agent", "proxmox-backup-client/1.0") .body(Body::from(data.to_string()))?;
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) return Ok(request);
.body(Body::empty())?; } else {
unimplemented!();
Self::run_download(request, output)
}
pub fn get(&mut self, path: &str) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let (ticket, _token) = self.login()?;
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder()
.method("GET")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.body(Body::empty())?;
Self::run_request(request)
}
/// like get(), but use existing credentials (never asks for password).
/// this simply fails when there is no ticket
pub fn try_get(&mut self, path: &str) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let mut credentials = None;
if let Some(ref ticket) = self.ticket {
if let Some(ref token) = self.token {
credentials = Some((ticket.clone(), token.clone()));
} }
} }
if credentials == None {
if let Some((ticket, token)) = load_ticket_info(&self.server, &self.username) {
credentials = Some((ticket.clone(), token.clone()));
}
}
if credentials == None {
bail!("unable to get credentials");
}
let (ticket, _token) = credentials.unwrap();
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder() let request = Request::builder()
.method("GET") .method(method)
.uri(url) .uri(url)
.header("User-Agent", "proxmox-backup-client/1.0") .header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.body(Body::empty())?;
Self::run_request(request)
}
pub fn delete(&mut self, path: &str) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let (ticket, token) = self.login()?;
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder()
.method("DELETE")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.header("CSRFPreventionToken", token)
.body(Body::empty())?;
Self::run_request(request)
}
pub fn post(&mut self, path: &str) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let (ticket, token) = self.login()?;
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.header("CSRFPreventionToken", token)
.header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(Body::empty())?; .body(Body::empty())?;
Self::run_request(request) Ok(request)
}
pub fn post_json(&mut self, path: &str, data: Value) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let (ticket, token) = self.login()?;
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.header("CSRFPreventionToken", token)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Body::from(data.to_string()))?;
Self::run_request(request)
}
fn try_login(&mut self, password: &str) -> Result<(String, String), Error> {
let url: Uri = format!("https://{}:8007/{}", self.server, "/api2/json/access/ticket").parse()?;
let query = url::form_urlencoded::Serializer::new(String::new())
.append_pair("username", &self.username)
.append_pair("password", &password)
.finish();
let request = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(Body::from(query))?;
let auth_res = Self::run_request(request)?;
let ticket = match auth_res["data"]["ticket"].as_str() {
Some(t) => t,
None => bail!("got unexpected respose for login request."),
};
let token = match auth_res["data"]["CSRFPreventionToken"].as_str() {
Some(t) => t,
None => bail!("got unexpected respose for login request."),
};
Ok((ticket.to_owned(), token.to_owned()))
}
pub fn login(&mut self) -> Result<(String, String), Error> {
if let Some(ref ticket) = self.ticket {
if let Some(ref token) = self.token {
return Ok((ticket.clone(), token.clone()));
}
}
if let Some((ticket, _token)) = load_ticket_info(&self.server, &self.username) {
if let Ok((ticket, token)) = self.try_login(&ticket) {
let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
return Ok((ticket.to_owned(), token.to_owned()))
}
}
let password = self.get_password()?;
let (ticket, token) = self.try_login(&password)?;
let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
Ok((ticket.to_owned(), token.to_owned()))
}
pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> Result<Value, Error> {
let path = path.trim_matches('/');
let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
let (ticket, token) = self.login()?;
let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
let request = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
.header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
.header("CSRFPreventionToken", token)
.header("Content-Type", content_type)
.body(body)?;
Self::run_request(request)
} }
} }