use hyper/tokio-openssl instead of hyper/tokio-tls
This exposes the complete SSL setup. And download is much faster now (600MB/s instead of 130MB/s)!
This commit is contained in:
parent
74d0a6bc23
commit
6d1f61b208
@ -22,13 +22,13 @@ bytes = "0.4"
|
|||||||
tokio-threadpool = "0.1"
|
tokio-threadpool = "0.1"
|
||||||
tokio = "0.1"
|
tokio = "0.1"
|
||||||
tokio-fs = "0.1"
|
tokio-fs = "0.1"
|
||||||
tokio-tls = "0.2"
|
tokio-openssl = "0.3"
|
||||||
tokio-signal = "0.2"
|
tokio-signal = "0.2"
|
||||||
native-tls = "0.2"
|
native-tls = "0.2"
|
||||||
http = "0.1"
|
http = "0.1"
|
||||||
h2 = "0.1"
|
h2 = "0.1"
|
||||||
hyper = "0.12"
|
hyper = "0.12"
|
||||||
hyper-tls = "0.3"
|
hyper-openssl = "0.7"
|
||||||
lazy_static = "1.1"
|
lazy_static = "1.1"
|
||||||
regex = "1.0"
|
regex = "1.0"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
|
@ -68,12 +68,17 @@ pub fn main() -> Result<(), Error> {
|
|||||||
.and_then(|c| {
|
.and_then(|c| {
|
||||||
c.set_nodelay(true).unwrap();
|
c.set_nodelay(true).unwrap();
|
||||||
c.set_recv_buffer_size(1024*1024).unwrap();
|
c.set_recv_buffer_size(1024*1024).unwrap();
|
||||||
let mut builder = native_tls::TlsConnector::builder();
|
|
||||||
builder.danger_accept_invalid_certs(true);
|
use openssl::ssl::*;
|
||||||
let connector = builder.build().unwrap();
|
use tokio_openssl::SslConnectorExt;
|
||||||
let connector = tokio_tls::TlsConnector::from(connector);
|
|
||||||
connector.connect("localhost", c)
|
let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
.map_err(Error::from)
|
ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
|
||||||
|
|
||||||
|
let connector = ssl_connector_builder.build();
|
||||||
|
|
||||||
|
connector.connect_async("localhost", c)
|
||||||
|
.map_err(|err| format_err!("connect failed - {}", err))
|
||||||
})
|
})
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.and_then(|c| {
|
.and_then(|c| {
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
// Simple H2 server to test H2 speed with h2s-client.rs
|
// Simple H2 server to test H2 speed with h2s-client.rs
|
||||||
|
|
||||||
use hyper::{Request, Response, Body};
|
use hyper::{Request, Response, Body};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
use proxmox_backup::client::pipe_to_stream::*;
|
|
||||||
use proxmox_backup::tools;
|
|
||||||
use proxmox_backup::configdir;
|
use proxmox_backup::configdir;
|
||||||
|
|
||||||
|
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio_openssl::SslAcceptorExt;
|
||||||
|
|
||||||
pub fn main() -> Result<(), Error> {
|
pub fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
start_h2_server()?;
|
start_h2_server()?;
|
||||||
@ -18,39 +19,19 @@ pub fn main() -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_certificate<T: AsRef<Path>, U: AsRef<Path>>(
|
|
||||||
key: T,
|
|
||||||
cert: U,
|
|
||||||
) -> Result<openssl::pkcs12::Pkcs12, Error> {
|
|
||||||
let key = tools::file_get_contents(key)?;
|
|
||||||
let cert = tools::file_get_contents(cert)?;
|
|
||||||
|
|
||||||
let key = openssl::pkey::PKey::private_key_from_pem(&key)?;
|
|
||||||
let cert = openssl::x509::X509::from_pem(&cert)?;
|
|
||||||
|
|
||||||
Ok(openssl::pkcs12::Pkcs12::builder()
|
|
||||||
.build("", "", &key, &cert)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start_h2_server() -> Result<(), Error> {
|
pub fn start_h2_server() -> Result<(), Error> {
|
||||||
|
|
||||||
let cert_path = configdir!("/proxy.pfx");
|
let key_path = configdir!("/proxy.key");
|
||||||
let raw_cert = match std::fs::read(cert_path) {
|
let cert_path = configdir!("/proxy.pem");
|
||||||
Ok(pfx) => pfx,
|
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => {
|
|
||||||
let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?;
|
|
||||||
pkcs12.to_der()?
|
|
||||||
}
|
|
||||||
Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") {
|
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||||
Ok(data) => data,
|
acceptor.set_private_key_file(key_path, SslFiletype::PEM)
|
||||||
Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err),
|
.map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
|
||||||
};
|
acceptor.set_certificate_chain_file(cert_path)
|
||||||
|
.map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
|
||||||
|
acceptor.check_private_key().unwrap();
|
||||||
|
|
||||||
let acceptor = native_tls::TlsAcceptor::new(identity)?;
|
let acceptor = Arc::new(acceptor.build());
|
||||||
let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap();
|
let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
@ -59,7 +40,12 @@ pub fn start_h2_server() -> Result<(), Error> {
|
|||||||
let server = listener
|
let server = listener
|
||||||
.incoming()
|
.incoming()
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
|
.and_then(move |sock| {
|
||||||
|
sock.set_nodelay(true).unwrap();
|
||||||
|
sock.set_send_buffer_size(1024*1024).unwrap();
|
||||||
|
sock.set_recv_buffer_size(1024*1024).unwrap();
|
||||||
|
acceptor.accept_async(sock).map_err(|e| e.into())
|
||||||
|
})
|
||||||
.then(|r| match r {
|
.then(|r| match r {
|
||||||
// accept()s can fail here with an Err() when eg. the client rejects
|
// accept()s can fail here with an Err() when eg. the client rejects
|
||||||
// the cert and closes the connection, so we follow up with mapping
|
// the cert and closes the connection, so we follow up with mapping
|
||||||
@ -88,7 +74,7 @@ pub fn start_h2_server() -> Result<(), Error> {
|
|||||||
http.http2_initial_stream_window_size(max_window_size);
|
http.http2_initial_stream_window_size(max_window_size);
|
||||||
http.http2_initial_connection_window_size(max_window_size);
|
http.http2_initial_connection_window_size(max_window_size);
|
||||||
|
|
||||||
let service = hyper::service::service_fn(|req: Request<Body>| {
|
let service = hyper::service::service_fn(|_req: Request<Body>| {
|
||||||
println!("Got request");
|
println!("Got request");
|
||||||
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
|
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
|
||||||
let body = Body::from(buffer);
|
let body = Body::from(buffer);
|
||||||
|
@ -1,9 +1,5 @@
|
|||||||
use std::io;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
use proxmox_backup::try_block;
|
use proxmox_backup::try_block;
|
||||||
use proxmox_backup::configdir;
|
use proxmox_backup::configdir;
|
||||||
use proxmox_backup::tools;
|
|
||||||
use proxmox_backup::server;
|
use proxmox_backup::server;
|
||||||
use proxmox_backup::tools::daemon;
|
use proxmox_backup::tools::daemon;
|
||||||
use proxmox_backup::api_schema::router::*;
|
use proxmox_backup::api_schema::router::*;
|
||||||
@ -17,6 +13,10 @@ use lazy_static::lazy_static;
|
|||||||
use futures::*;
|
use futures::*;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
|
||||||
|
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio_openssl::SslAcceptorExt;
|
||||||
|
|
||||||
use hyper;
|
use hyper;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -27,20 +27,6 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_certificate<T: AsRef<Path>, U: AsRef<Path>>(
|
|
||||||
key: T,
|
|
||||||
cert: U,
|
|
||||||
) -> Result<openssl::pkcs12::Pkcs12, Error> {
|
|
||||||
let key = tools::file_get_contents(key)?;
|
|
||||||
let cert = tools::file_get_contents(cert)?;
|
|
||||||
|
|
||||||
let key = openssl::pkey::PKey::private_key_from_pem(&key)?;
|
|
||||||
let cert = openssl::x509::X509::from_pem(&cert)?;
|
|
||||||
|
|
||||||
Ok(openssl::pkcs12::Pkcs12::builder()
|
|
||||||
.build("", "", &key, &cert)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run() -> Result<(), Error> {
|
fn run() -> Result<(), Error> {
|
||||||
if let Err(err) = syslog::init(
|
if let Err(err) = syslog::init(
|
||||||
syslog::Facility::LOG_DAEMON,
|
syslog::Facility::LOG_DAEMON,
|
||||||
@ -72,26 +58,22 @@ fn run() -> Result<(), Error> {
|
|||||||
|
|
||||||
let rest_server = RestServer::new(config);
|
let rest_server = RestServer::new(config);
|
||||||
|
|
||||||
let cert_path = configdir!("/proxy.pfx");
|
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
|
||||||
let raw_cert = match std::fs::read(cert_path) {
|
let key_path = configdir!("/proxy.key");
|
||||||
Ok(pfx) => pfx,
|
let cert_path = configdir!("/proxy.pem");
|
||||||
Err(ref err) if err.kind() == io::ErrorKind::NotFound => {
|
|
||||||
let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?;
|
|
||||||
pkcs12.to_der()?
|
|
||||||
}
|
|
||||||
Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") {
|
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||||
Ok(data) => data,
|
acceptor.set_private_key_file(key_path, SslFiletype::PEM)
|
||||||
Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err),
|
.map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
|
||||||
};
|
acceptor.set_certificate_chain_file(cert_path)
|
||||||
|
.map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
|
||||||
|
acceptor.check_private_key().unwrap();
|
||||||
|
|
||||||
|
let acceptor = Arc::new(acceptor.build());
|
||||||
|
|
||||||
let server = daemon::create_daemon(
|
let server = daemon::create_daemon(
|
||||||
([0,0,0,0,0,0,0,0], 8007).into(),
|
([0,0,0,0,0,0,0,0], 8007).into(),
|
||||||
|listener| {
|
|listener| {
|
||||||
let acceptor = native_tls::TlsAcceptor::new(identity)?;
|
|
||||||
let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
|
|
||||||
let connections = listener
|
let connections = listener
|
||||||
.incoming()
|
.incoming()
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
@ -99,7 +81,7 @@ fn run() -> Result<(), Error> {
|
|||||||
sock.set_nodelay(true).unwrap();
|
sock.set_nodelay(true).unwrap();
|
||||||
sock.set_send_buffer_size(1024*1024).unwrap();
|
sock.set_send_buffer_size(1024*1024).unwrap();
|
||||||
sock.set_recv_buffer_size(1024*1024).unwrap();
|
sock.set_recv_buffer_size(1024*1024).unwrap();
|
||||||
acceptor.accept(sock).map_err(|e| e.into())
|
acceptor.accept_async(sock).map_err(|e| e.into())
|
||||||
})
|
})
|
||||||
.then(|r| match r {
|
.then(|r| match r {
|
||||||
// accept()s can fail here with an Err() when eg. the client rejects
|
// accept()s can fail here with an Err() when eg. the client rejects
|
||||||
|
@ -1,712 +0,0 @@
|
|||||||
use std::io;
|
|
||||||
use std::process::exit;
|
|
||||||
|
|
||||||
use chrono::Utc;
|
|
||||||
use failure::*;
|
|
||||||
use futures::future::{ok, poll_fn, Future};
|
|
||||||
use futures::try_ready;
|
|
||||||
use futures::{Async, Poll};
|
|
||||||
use http::{Request, Response, StatusCode};
|
|
||||||
use hyper::rt::Stream;
|
|
||||||
use hyper::Body;
|
|
||||||
use tokio::prelude::*;
|
|
||||||
use tokio_fs::file::File;
|
|
||||||
|
|
||||||
use proxmox_protocol::Client as PmxClient;
|
|
||||||
use proxmox_protocol::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId};
|
|
||||||
|
|
||||||
use proxmox_backup::client::BackupRepository;
|
|
||||||
|
|
||||||
// This is a temporary client using the backup protocol crate.
|
|
||||||
// Its functionality should be moved to the `proxmox-backup-client` binary instead.
|
|
||||||
// For now this is mostly here to keep in the history an alternative way of connecting to an https
|
|
||||||
// server without hyper-tls in the background.
|
|
||||||
// Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get
|
|
||||||
// rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate.
|
|
||||||
|
|
||||||
type HyperConnection<T, B> = hyper::client::conn::Connection<T, B>;
|
|
||||||
type HyperConnType = HyperConnection<tokio_tls::TlsStream<tokio::net::TcpStream>, Body>;
|
|
||||||
|
|
||||||
// Create a future which connects to a TLS-enabled http server.
|
|
||||||
// This would ordinarily be covered by the Connect trait in the higher level hyper interface.
|
|
||||||
// Connect to the server, initiate TLS, finally run hyper's handshake method.
|
|
||||||
fn connect(
|
|
||||||
domain: &str,
|
|
||||||
port: u16,
|
|
||||||
no_cert_validation: bool,
|
|
||||||
) -> impl Future<
|
|
||||||
// Typing out this function signature is almost more work than copying its code body...
|
|
||||||
Item = (hyper::client::conn::SendRequest<Body>, HyperConnType),
|
|
||||||
Error = Error,
|
|
||||||
> {
|
|
||||||
// tokio::net::TcpStream::connect(addr) <- this takes only a single address!
|
|
||||||
// so we need to improvise...:
|
|
||||||
use tokio_threadpool::blocking;
|
|
||||||
|
|
||||||
let domain = domain.to_string();
|
|
||||||
let domain2 = domain.clone();
|
|
||||||
poll_fn(move || {
|
|
||||||
blocking(|| {
|
|
||||||
let conn =
|
|
||||||
std::net::TcpStream::connect((domain.as_str(), port)).map_err(Error::from)?;
|
|
||||||
tokio::net::TcpStream::from_std(conn, &Default::default()).map_err(Error::from)
|
|
||||||
})
|
|
||||||
.map_err(Error::from)
|
|
||||||
})
|
|
||||||
.map_err(Error::from)
|
|
||||||
.flatten()
|
|
||||||
.and_then(move |tcp| {
|
|
||||||
let mut builder = native_tls::TlsConnector::builder();
|
|
||||||
if no_cert_validation {
|
|
||||||
builder.danger_accept_invalid_certs(true);
|
|
||||||
}
|
|
||||||
let connector = tokio_tls::TlsConnector::from(builder.build().unwrap());
|
|
||||||
connector.connect(&domain2, tcp).map_err(Error::from)
|
|
||||||
})
|
|
||||||
.and_then(|tls| hyper::client::conn::handshake(tls).map_err(Error::from))
|
|
||||||
}
|
|
||||||
|
|
||||||
// convenience helper for non-Deserialize data...
|
|
||||||
fn required_string_member(value: &serde_json::Value, member: &str) -> Result<String, Error> {
|
|
||||||
Ok(value
|
|
||||||
.get(member)
|
|
||||||
.ok_or_else(|| format_err!("missing '{}' in response", member))?
|
|
||||||
.as_str()
|
|
||||||
.ok_or_else(|| format_err!("invalid data type for '{}' in response", member))?
|
|
||||||
.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Auth {
|
|
||||||
ticket: String,
|
|
||||||
token: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a future which logs in on a proxmox backup server and yields an Auth struct.
|
|
||||||
fn login(
|
|
||||||
domain: &str,
|
|
||||||
port: u16,
|
|
||||||
no_cert_validation: bool,
|
|
||||||
urlbase: &str,
|
|
||||||
user: String,
|
|
||||||
pass: String,
|
|
||||||
) -> impl Future<Item = Auth, Error = Error> {
|
|
||||||
let formdata = Body::from(
|
|
||||||
url::form_urlencoded::Serializer::new(String::new())
|
|
||||||
.append_pair("username", &{ user })
|
|
||||||
.append_pair("password", &{ pass })
|
|
||||||
.finish(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let urlbase = urlbase.to_string();
|
|
||||||
connect(domain, port, no_cert_validation)
|
|
||||||
.and_then(move |(mut client, conn)| {
|
|
||||||
let req = Request::builder()
|
|
||||||
.method("POST")
|
|
||||||
.uri(format!("{}/access/ticket", urlbase))
|
|
||||||
.header("Content-type", "application/x-www-form-urlencoded")
|
|
||||||
.body(formdata)?;
|
|
||||||
Ok((client.send_request(req), conn))
|
|
||||||
})
|
|
||||||
.and_then(|(res, conn)| {
|
|
||||||
let mut conn = Some(conn);
|
|
||||||
res.map(|res| {
|
|
||||||
res.into_body()
|
|
||||||
.concat2()
|
|
||||||
.map_err(Error::from)
|
|
||||||
.and_then(|data| {
|
|
||||||
let data: serde_json::Value = serde_json::from_slice(&data)?;
|
|
||||||
let data = data
|
|
||||||
.get("data")
|
|
||||||
.ok_or_else(|| format_err!("missing 'data' in response"))?;
|
|
||||||
let ticket = required_string_member(data, "ticket")?;
|
|
||||||
let token = required_string_member(data, "CSRFPreventionToken")?;
|
|
||||||
|
|
||||||
Ok(Auth { ticket, token })
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.join(poll_fn(move || {
|
|
||||||
try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
|
|
||||||
Ok(Async::Ready(conn.take().unwrap()))
|
|
||||||
}))
|
|
||||||
.map_err(Error::from)
|
|
||||||
})
|
|
||||||
.and_then(|(res, _conn)| res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Factored out protocol switching future: Takes a Response future and a connection and verifies
|
|
||||||
// its returned headers and protocol values. Yields a Response and the connection.
|
|
||||||
fn switch_protocols(
|
|
||||||
res: hyper::client::conn::ResponseFuture,
|
|
||||||
conn: HyperConnType,
|
|
||||||
) -> impl Future<Item = (Result<Response<Body>, Error>, HyperConnType), Error = Error> {
|
|
||||||
let mut conn = Some(conn);
|
|
||||||
res.map(|res| {
|
|
||||||
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
|
|
||||||
bail!("unexpected status code - expected SwitchingProtocols");
|
|
||||||
}
|
|
||||||
let upgrade = match res.headers().get("Upgrade") {
|
|
||||||
None => bail!("missing upgrade header in server response!"),
|
|
||||||
Some(u) => u,
|
|
||||||
};
|
|
||||||
if upgrade != "proxmox-backup-protocol-1" {
|
|
||||||
match upgrade.to_str() {
|
|
||||||
Ok(s) => bail!("unexpected upgrade protocol type received: {}", s),
|
|
||||||
_ => bail!("unexpected upgrade protocol type received"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
})
|
|
||||||
.map_err(Error::from)
|
|
||||||
.join(poll_fn(move || {
|
|
||||||
try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
|
|
||||||
Ok(Async::Ready(conn.take().unwrap()))
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader:
|
|
||||||
struct UploaderBase<S: AsyncRead + AsyncWrite> {
|
|
||||||
client: Option<PmxClient<S>>,
|
|
||||||
wait_id: Option<StreamId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: AsyncRead + AsyncWrite> UploaderBase<S> {
|
|
||||||
pub fn new(client: PmxClient<S>) -> Self {
|
|
||||||
Self {
|
|
||||||
client: Some(client),
|
|
||||||
wait_id: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn create_backup(
|
|
||||||
&mut self,
|
|
||||||
index_type: IndexType,
|
|
||||||
backup_type: &str,
|
|
||||||
backup_id: &str,
|
|
||||||
backup_timestamp: i64,
|
|
||||||
filename: &str,
|
|
||||||
chunk_size: usize,
|
|
||||||
file_size: Option<u64>,
|
|
||||||
) -> Result<BackupStream, Error> {
|
|
||||||
if self.wait_id.is_some() {
|
|
||||||
bail!("create_backup cannot be called while awaiting a response");
|
|
||||||
}
|
|
||||||
|
|
||||||
let backup_stream = self.client.as_mut().unwrap().create_backup(
|
|
||||||
index_type,
|
|
||||||
backup_type,
|
|
||||||
backup_id,
|
|
||||||
backup_timestamp,
|
|
||||||
filename,
|
|
||||||
chunk_size,
|
|
||||||
file_size,
|
|
||||||
true,
|
|
||||||
)?;
|
|
||||||
self.wait_id = Some(backup_stream.into());
|
|
||||||
Ok(backup_stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn poll_ack(&mut self) -> Poll<(), Error> {
|
|
||||||
if let Some(id) = self.wait_id {
|
|
||||||
if self.client.as_mut().unwrap().wait_for_id(id)? {
|
|
||||||
self.wait_id = None;
|
|
||||||
} else {
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Ok(Async::Ready(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn poll_send(&mut self) -> Poll<(), Error> {
|
|
||||||
match self.client.as_mut().unwrap().poll_send()? {
|
|
||||||
Some(false) => Ok(Async::NotReady),
|
|
||||||
_ => Ok(Async::Ready(())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn upload_chunk(
|
|
||||||
&mut self,
|
|
||||||
info: &ChunkEntry,
|
|
||||||
chunk: &[u8],
|
|
||||||
) -> Result<Option<StreamId>, Error> {
|
|
||||||
self.client.as_mut().unwrap().upload_chunk(info, chunk)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn continue_upload_chunk(&mut self, chunk: &[u8]) -> Result<Option<StreamId>, Error> {
|
|
||||||
let res = self.client.as_mut().unwrap().continue_upload_chunk(chunk)?;
|
|
||||||
if let Some(id) = res {
|
|
||||||
self.wait_id = Some(id);
|
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(), Error> {
|
|
||||||
let id = stream.into();
|
|
||||||
let (name, _done) = self.client.as_mut().unwrap().finish_backup(stream)?;
|
|
||||||
println!("Server created file: {}", name);
|
|
||||||
self.wait_id = Some(id);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn take_client(&mut self) -> Option<PmxClient<S>> {
|
|
||||||
self.client.take()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Future which creates a backup with a dynamic file:
|
|
||||||
struct DynamicIndexUploader<C: AsyncRead, S: AsyncRead + AsyncWrite> {
|
|
||||||
base: UploaderBase<S>,
|
|
||||||
chunks: ChunkStream<C>,
|
|
||||||
current_chunk: Option<ChunkEntry>,
|
|
||||||
backup_stream: Option<BackupStream>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C: AsyncRead, S: AsyncRead + AsyncWrite> DynamicIndexUploader<C, S> {
|
|
||||||
pub fn new(
|
|
||||||
client: PmxClient<S>,
|
|
||||||
chunks: ChunkStream<C>,
|
|
||||||
backup_type: &str,
|
|
||||||
backup_id: &str,
|
|
||||||
backup_timestamp: i64,
|
|
||||||
filename: &str,
|
|
||||||
chunk_size: usize,
|
|
||||||
) -> Result<Self, Error> {
|
|
||||||
let mut base = UploaderBase::new(client);
|
|
||||||
let stream = base.create_backup(
|
|
||||||
IndexType::Dynamic,
|
|
||||||
backup_type,
|
|
||||||
backup_id,
|
|
||||||
backup_timestamp,
|
|
||||||
filename,
|
|
||||||
chunk_size,
|
|
||||||
None,
|
|
||||||
)?;
|
|
||||||
Ok(Self {
|
|
||||||
base,
|
|
||||||
chunks,
|
|
||||||
current_chunk: None,
|
|
||||||
backup_stream: Some(stream),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_chunk<'a>(chunks: &'a mut ChunkStream<C>) -> Poll<Option<&'a [u8]>, Error> {
|
|
||||||
match chunks.get() {
|
|
||||||
Ok(Some(None)) => Ok(Async::Ready(None)),
|
|
||||||
Ok(Some(Some(chunk))) => Ok(Async::Ready(Some(chunk))),
|
|
||||||
Ok(None) => return Ok(Async::NotReady),
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finished_chunk(&mut self) -> Result<(), Error> {
|
|
||||||
self.base.client.as_mut().unwrap().dynamic_chunk(
|
|
||||||
self.backup_stream.unwrap(),
|
|
||||||
self.current_chunk.as_ref().unwrap(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
self.current_chunk = None;
|
|
||||||
self.chunks.next();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C: AsyncRead, S: AsyncRead + AsyncWrite> Future for DynamicIndexUploader<C, S> {
|
|
||||||
type Item = PmxClient<S>;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Error> {
|
|
||||||
loop {
|
|
||||||
// Process our upload queue if we have one:
|
|
||||||
try_ready!(self.base.poll_send());
|
|
||||||
|
|
||||||
// If we have a chunk in-flight, wait for acknowledgement:
|
|
||||||
try_ready!(self.base.poll_ack());
|
|
||||||
|
|
||||||
// Get our current chunk:
|
|
||||||
let chunk = match try_ready!(Self::get_chunk(&mut self.chunks)) {
|
|
||||||
Some(chunk) => chunk,
|
|
||||||
None => match self.backup_stream.take() {
|
|
||||||
Some(stream) => {
|
|
||||||
self.base.finish_backup(stream)?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
None => return Ok(Async::Ready(self.base.take_client().unwrap())),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// If the current chunk is in-flight just poll the upload:
|
|
||||||
if self.current_chunk.is_some() {
|
|
||||||
if self.base.continue_upload_chunk(chunk)?.is_some() {
|
|
||||||
self.finished_chunk()?;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let client = self.base.client.as_ref().unwrap();
|
|
||||||
|
|
||||||
// We got a new chunk, see if we need to upload it:
|
|
||||||
self.current_chunk = Some(ChunkEntry::from_data(chunk));
|
|
||||||
let entry = self.current_chunk.as_ref().unwrap();
|
|
||||||
if client.is_chunk_available(entry) {
|
|
||||||
eprintln!("Already available: {}", entry.digest_to_hex());
|
|
||||||
self.finished_chunk()?;
|
|
||||||
} else {
|
|
||||||
eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
|
|
||||||
match self.base.upload_chunk(entry, chunk)? {
|
|
||||||
Some(_id) => {
|
|
||||||
eprintln!("Finished right away!");
|
|
||||||
self.finished_chunk()?;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// Send-buffer filled up, start polling the upload process.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct FixedIndexUploader<T: AsyncRead, S: AsyncRead + AsyncWrite> {
|
|
||||||
base: UploaderBase<S>,
|
|
||||||
input: T,
|
|
||||||
backup_stream: Option<BackupStream>,
|
|
||||||
current_chunk: Option<ChunkEntry>,
|
|
||||||
chunk_size: usize,
|
|
||||||
index: usize,
|
|
||||||
buffer: Vec<u8>,
|
|
||||||
eof: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead, S: AsyncRead + AsyncWrite> FixedIndexUploader<T, S> {
|
|
||||||
pub fn new(
|
|
||||||
client: PmxClient<S>,
|
|
||||||
input: T,
|
|
||||||
backup_type: &str,
|
|
||||||
backup_id: &str,
|
|
||||||
backup_timestamp: i64,
|
|
||||||
filename: &str,
|
|
||||||
chunk_size: usize,
|
|
||||||
file_size: u64,
|
|
||||||
) -> Result<Self, Error> {
|
|
||||||
let mut base = UploaderBase::new(client);
|
|
||||||
let stream = base.create_backup(
|
|
||||||
IndexType::Fixed,
|
|
||||||
backup_type,
|
|
||||||
backup_id,
|
|
||||||
backup_timestamp,
|
|
||||||
filename,
|
|
||||||
chunk_size,
|
|
||||||
Some(file_size),
|
|
||||||
)?;
|
|
||||||
Ok(Self {
|
|
||||||
base,
|
|
||||||
input,
|
|
||||||
backup_stream: Some(stream),
|
|
||||||
current_chunk: None,
|
|
||||||
chunk_size,
|
|
||||||
index: 0,
|
|
||||||
buffer: Vec::with_capacity(chunk_size),
|
|
||||||
eof: false,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn fill_chunk(&mut self) -> Poll<bool, io::Error> {
|
|
||||||
let mut pos = self.buffer.len();
|
|
||||||
|
|
||||||
// we hit eof and we want the next chunk, return false:
|
|
||||||
if self.eof && pos == 0 {
|
|
||||||
return Ok(Async::Ready(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
// we still have a full chunk right now:
|
|
||||||
if pos == self.chunk_size {
|
|
||||||
return Ok(Async::Ready(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill it up:
|
|
||||||
unsafe {
|
|
||||||
self.buffer.set_len(self.chunk_size);
|
|
||||||
}
|
|
||||||
let res = loop {
|
|
||||||
match self.input.poll_read(&mut self.buffer[pos..]) {
|
|
||||||
Err(e) => break Err(e),
|
|
||||||
Ok(Async::NotReady) => break Ok(Async::NotReady),
|
|
||||||
Ok(Async::Ready(got)) => {
|
|
||||||
if got == 0 {
|
|
||||||
self.eof = true;
|
|
||||||
break Ok(Async::Ready(true));
|
|
||||||
}
|
|
||||||
pos += got;
|
|
||||||
if pos == self.chunk_size {
|
|
||||||
break Ok(Async::Ready(true));
|
|
||||||
}
|
|
||||||
// read more...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
unsafe {
|
|
||||||
self.buffer.set_len(pos);
|
|
||||||
}
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finished_chunk(&mut self) -> Result<(), Error> {
|
|
||||||
self.base.client.as_mut().unwrap().fixed_data(
|
|
||||||
self.backup_stream.unwrap(),
|
|
||||||
self.index,
|
|
||||||
self.current_chunk.as_ref().unwrap(),
|
|
||||||
)?;
|
|
||||||
self.index += 1;
|
|
||||||
self.current_chunk = None;
|
|
||||||
unsafe {
|
|
||||||
// This is how we tell fill_chunk() that it needs to read new data
|
|
||||||
self.buffer.set_len(0);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead, S: AsyncRead + AsyncWrite> Future for FixedIndexUploader<T, S> {
|
|
||||||
type Item = PmxClient<S>;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Error> {
|
|
||||||
loop {
|
|
||||||
// Process our upload queue if we have one:
|
|
||||||
try_ready!(self.base.poll_send());
|
|
||||||
|
|
||||||
// If we have a chunk in-flight, wait for acknowledgement:
|
|
||||||
try_ready!(self.base.poll_ack());
|
|
||||||
|
|
||||||
// Get our current chunk:
|
|
||||||
if !try_ready!(self.fill_chunk()) {
|
|
||||||
match self.backup_stream.take() {
|
|
||||||
Some(stream) => {
|
|
||||||
self.base.finish_backup(stream)?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
return Ok(Async::Ready(self.base.take_client().unwrap()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let chunk = &self.buffer[..];
|
|
||||||
|
|
||||||
// If the current chunk is in-flight just poll the upload:
|
|
||||||
if self.current_chunk.is_some() {
|
|
||||||
if self.base.continue_upload_chunk(chunk)?.is_some() {
|
|
||||||
self.finished_chunk()?;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let client = self.base.client.as_ref().unwrap();
|
|
||||||
|
|
||||||
// We got a new chunk, see if we need to upload it:
|
|
||||||
self.current_chunk = Some(ChunkEntry::from_data(chunk));
|
|
||||||
let entry = self.current_chunk.as_ref().unwrap();
|
|
||||||
if client.is_chunk_available(entry) {
|
|
||||||
eprintln!("Already available: {}", entry.digest_to_hex());
|
|
||||||
self.finished_chunk()?;
|
|
||||||
} else {
|
|
||||||
eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
|
|
||||||
match self.base.upload_chunk(entry, chunk)? {
|
|
||||||
Some(_id) => {
|
|
||||||
eprintln!("Finished right away!");
|
|
||||||
self.finished_chunk()?;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// Send-buffer filled up, start polling the upload process.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete:
|
|
||||||
struct ClientWaitFuture<S: AsyncRead + AsyncWrite>(
|
|
||||||
Option<PmxClient<S>>,
|
|
||||||
fn(&mut PmxClient<S>) -> Result<bool, Error>,
|
|
||||||
);
|
|
||||||
|
|
||||||
impl<S: AsyncRead + AsyncWrite> Future for ClientWaitFuture<S> {
|
|
||||||
type Item = PmxClient<S>;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
if (self.1)(self.0.as_mut().unwrap())? {
|
|
||||||
Ok(Async::Ready(self.0.take().unwrap()))
|
|
||||||
} else {
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trait to provide Futures for some proxmox_protocol::Client methods:
|
|
||||||
trait ClientOps<S: AsyncRead + AsyncWrite> {
|
|
||||||
fn poll_handshake(self) -> ClientWaitFuture<S>;
|
|
||||||
fn poll_hashes(self, file: &str) -> Result<ClientWaitFuture<S>, Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: AsyncRead + AsyncWrite> ClientOps<S> for PmxClient<S> {
|
|
||||||
fn poll_handshake(self) -> ClientWaitFuture<S> {
|
|
||||||
ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_handshake)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_hashes(mut self, name: &str) -> Result<ClientWaitFuture<S>, Error> {
|
|
||||||
self.query_hashes(name)?;
|
|
||||||
Ok(ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_hashes))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CLI helper.
|
|
||||||
fn require_arg(args: &mut dyn Iterator<Item = String>, name: &str) -> String {
|
|
||||||
match args.next() {
|
|
||||||
Some(arg) => arg,
|
|
||||||
None => {
|
|
||||||
eprintln!("missing required argument: {}", name);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
// Usage:
|
|
||||||
// ./proxmox-protocol-testclient <type> <id> <filename> [<optional old-file>]
|
|
||||||
//
|
|
||||||
// This will query the remote server for a list of chunks in <old-file> if the argument was
|
|
||||||
// provided, otherwise assumes all chunks are new.
|
|
||||||
|
|
||||||
let mut args = std::env::args().skip(1);
|
|
||||||
let mut repo = require_arg(&mut args, "repository");
|
|
||||||
let use_fixed_chunks = if repo == "--fixed" {
|
|
||||||
repo = require_arg(&mut args, "repository");
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
let backup_type = require_arg(&mut args, "backup-type");
|
|
||||||
let backup_id = require_arg(&mut args, "backup-id");
|
|
||||||
let filename = require_arg(&mut args, "backup-file-name");
|
|
||||||
// optional previous backup:
|
|
||||||
let previous = args.next().map(|s| s.to_string());
|
|
||||||
|
|
||||||
let repo: BackupRepository = match repo.parse() {
|
|
||||||
Ok(repo) => repo,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("error parsing repository: {}", e);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let backup_time = Utc::now().timestamp();
|
|
||||||
// Or fake the time to verify we cannot create an already existing backup:
|
|
||||||
//let backup_time = Utc::today().and_hms(3, 25, 55);
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"Uploading file `{}`, type {}, id: {}",
|
|
||||||
filename, backup_type, backup_id
|
|
||||||
);
|
|
||||||
|
|
||||||
let no_cert_validation = true; // FIXME
|
|
||||||
let domain = repo.host().to_owned();
|
|
||||||
let port = 8007;
|
|
||||||
let address = format!("{}:{}", domain, port);
|
|
||||||
let urlbase = format!("https://{}/api2/json", address);
|
|
||||||
|
|
||||||
let user = repo.user().to_string();
|
|
||||||
let pass = match proxmox_backup::tools::tty::read_password("Password: ")
|
|
||||||
.and_then(|x| String::from_utf8(x).map_err(Error::from))
|
|
||||||
{
|
|
||||||
Ok(pass) => pass,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("error getting password: {}", e);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let store = repo.store().to_owned();
|
|
||||||
|
|
||||||
let stream = File::open(filename.clone())
|
|
||||||
.map_err(Error::from)
|
|
||||||
.join(login(
|
|
||||||
&domain,
|
|
||||||
port,
|
|
||||||
no_cert_validation,
|
|
||||||
&urlbase,
|
|
||||||
user,
|
|
||||||
pass,
|
|
||||||
))
|
|
||||||
.and_then(move |(file, auth)| {
|
|
||||||
ok((file, auth)).join(connect(&domain, port, no_cert_validation))
|
|
||||||
})
|
|
||||||
.and_then(move |((file, auth), (mut client, conn))| {
|
|
||||||
let req = Request::builder()
|
|
||||||
.method("GET")
|
|
||||||
.uri(format!("{}/admin/datastore/{}/test-upload", urlbase, store))
|
|
||||||
.header("Cookie", format!("PBSAuthCookie={}", auth.ticket))
|
|
||||||
.header("CSRFPreventionToken", auth.token)
|
|
||||||
.header("Connection", "Upgrade")
|
|
||||||
.header("Upgrade", "proxmox-backup-protocol-1")
|
|
||||||
.body(Body::empty())?;
|
|
||||||
Ok((file, client.send_request(req), conn))
|
|
||||||
})
|
|
||||||
.and_then(|(file, res, conn)| ok(file).join(switch_protocols(res, conn)))
|
|
||||||
.and_then(|(file, (_, conn))| {
|
|
||||||
let client = PmxClient::new(conn.into_parts().io);
|
|
||||||
file.metadata()
|
|
||||||
.map_err(Error::from)
|
|
||||||
.join(client.poll_handshake())
|
|
||||||
})
|
|
||||||
.and_then(move |((file, meta), client)| {
|
|
||||||
eprintln!("Server said hello");
|
|
||||||
// 2 possible futures of distinct types need an explicit cast to Box<dyn Future>...
|
|
||||||
let fut: Box<dyn Future<Item = _, Error = _> + Send> =
|
|
||||||
if let Some(previous) = previous {
|
|
||||||
let query = client.poll_hashes(&previous)?;
|
|
||||||
Box::new(ok((file, meta)).join(query))
|
|
||||||
} else {
|
|
||||||
Box::new(ok(((file, meta), client)))
|
|
||||||
};
|
|
||||||
Ok(fut)
|
|
||||||
})
|
|
||||||
.flatten()
|
|
||||||
.and_then(move |((file, meta), client)| {
|
|
||||||
eprintln!("starting uploader...");
|
|
||||||
let uploader: Box<dyn Future<Item = _, Error = _> + Send> = if use_fixed_chunks {
|
|
||||||
Box::new(FixedIndexUploader::new(
|
|
||||||
client,
|
|
||||||
file,
|
|
||||||
&backup_type,
|
|
||||||
&backup_id,
|
|
||||||
backup_time,
|
|
||||||
&filename,
|
|
||||||
4 * 1024 * 1024,
|
|
||||||
meta.len(),
|
|
||||||
)?)
|
|
||||||
} else {
|
|
||||||
let chunker = ChunkStream::new(file);
|
|
||||||
Box::new(DynamicIndexUploader::new(
|
|
||||||
client,
|
|
||||||
chunker,
|
|
||||||
&backup_type,
|
|
||||||
&backup_id,
|
|
||||||
backup_time,
|
|
||||||
&filename,
|
|
||||||
4 * 1024 * 1024,
|
|
||||||
)?)
|
|
||||||
};
|
|
||||||
Ok(uploader)
|
|
||||||
})
|
|
||||||
.flatten();
|
|
||||||
|
|
||||||
let stream = stream
|
|
||||||
.and_then(move |_client| {
|
|
||||||
println!("Done");
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.map_err(|e| eprintln!("error: {}", e));
|
|
||||||
hyper::rt::run(stream);
|
|
||||||
}
|
|
@ -16,6 +16,7 @@ use futures::*;
|
|||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use openssl::ssl::{SslConnector, SslMethod};
|
||||||
|
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
|
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
|
||||||
@ -36,7 +37,7 @@ struct AuthInfo {
|
|||||||
|
|
||||||
/// HTTP(S) API client
|
/// HTTP(S) API client
|
||||||
pub struct HttpClient {
|
pub struct HttpClient {
|
||||||
client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
|
client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
|
||||||
server: String,
|
server: String,
|
||||||
auth: BroadcastFuture<AuthInfo>,
|
auth: BroadcastFuture<AuthInfo>,
|
||||||
}
|
}
|
||||||
@ -156,17 +157,19 @@ impl HttpClient {
|
|||||||
bail!("no password input mechanism available");
|
bail!("no password input mechanism available");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
|
fn build_client() -> Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>> {
|
||||||
let mut builder = native_tls::TlsConnector::builder();
|
|
||||||
// FIXME: We need a CLI option for this!
|
let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
builder.danger_accept_invalid_certs(true);
|
|
||||||
let tlsconnector = builder.build().unwrap();
|
ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
|
||||||
|
|
||||||
let mut httpc = hyper::client::HttpConnector::new(1);
|
let mut httpc = hyper::client::HttpConnector::new(1);
|
||||||
httpc.set_nodelay(true); // important for h2 download performance!
|
httpc.set_nodelay(true); // important for h2 download performance!
|
||||||
httpc.set_recv_buf_size(Some(1024*1024)); //important for h2 download performance!
|
httpc.set_recv_buf_size(Some(1024*1024)); //important for h2 download performance!
|
||||||
httpc.enforce_http(false); // we want https...
|
httpc.enforce_http(false); // we want https...
|
||||||
let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
|
|
||||||
https.https_only(true); // force it!
|
let https = hyper_openssl::HttpsConnector::with_connector(httpc, ssl_connector_builder).unwrap();
|
||||||
|
|
||||||
Client::builder()
|
Client::builder()
|
||||||
//.http2_initial_stream_window_size( (1 << 31) - 2)
|
//.http2_initial_stream_window_size( (1 << 31) - 2)
|
||||||
//.http2_initial_connection_window_size( (1 << 31) - 2)
|
//.http2_initial_connection_window_size( (1 << 31) - 2)
|
||||||
@ -356,7 +359,7 @@ impl HttpClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn credentials(
|
fn credentials(
|
||||||
client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
|
client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
|
||||||
server: String,
|
server: String,
|
||||||
username: String,
|
username: String,
|
||||||
password: String,
|
password: String,
|
||||||
@ -411,7 +414,7 @@ impl HttpClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn api_request(
|
fn api_request(
|
||||||
client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
|
client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
|
||||||
req: Request<Body>
|
req: Request<Body>
|
||||||
) -> impl Future<Item=Value, Error=Error> {
|
) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
|
||||||
@ -1046,7 +1049,6 @@ impl H2Client {
|
|||||||
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
||||||
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
||||||
|
|
||||||
|
|
||||||
self.h2.clone()
|
self.h2.clone()
|
||||||
.ready()
|
.ready()
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
|
Loading…
Reference in New Issue
Block a user