From 6d1f61b208822a0e79b70beaccd0c0a0ae7b43f7 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 2 Jul 2019 13:33:58 +0200 Subject: [PATCH] use hyper/tokio-openssl instead of hyper/tokio-tls This exposes the complete SSL setup. And download is much faster now (600MB/s instead of 130MB/s)! --- Cargo.toml | 4 +- src/bin/h2s-client.rs | 17 +- src/bin/h2s-server.rs | 54 +- src/bin/proxmox-backup-proxy.rs | 50 +- src/bin/proxmox-protocol-testclient.rs | 712 ------------------------- src/client/http_client.rs | 24 +- 6 files changed, 62 insertions(+), 799 deletions(-) delete mode 100644 src/bin/proxmox-protocol-testclient.rs diff --git a/Cargo.toml b/Cargo.toml index 54e526fe..70098bb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,13 @@ bytes = "0.4" tokio-threadpool = "0.1" tokio = "0.1" tokio-fs = "0.1" -tokio-tls = "0.2" +tokio-openssl = "0.3" tokio-signal = "0.2" native-tls = "0.2" http = "0.1" h2 = "0.1" hyper = "0.12" -hyper-tls = "0.3" +hyper-openssl = "0.7" lazy_static = "1.1" regex = "1.0" libc = "0.2" diff --git a/src/bin/h2s-client.rs b/src/bin/h2s-client.rs index 040d2747..a26191e2 100644 --- a/src/bin/h2s-client.rs +++ b/src/bin/h2s-client.rs @@ -68,12 +68,17 @@ pub fn main() -> Result<(), Error> { .and_then(|c| { c.set_nodelay(true).unwrap(); c.set_recv_buffer_size(1024*1024).unwrap(); - let mut builder = native_tls::TlsConnector::builder(); - builder.danger_accept_invalid_certs(true); - let connector = builder.build().unwrap(); - let connector = tokio_tls::TlsConnector::from(connector); - connector.connect("localhost", c) - .map_err(Error::from) + + use openssl::ssl::*; + use tokio_openssl::SslConnectorExt; + + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); + + let connector = ssl_connector_builder.build(); + + connector.connect_async("localhost", c) + .map_err(|err| format_err!("connect failed - {}", err)) }) .map_err(Error::from) .and_then(|c| { diff --git a/src/bin/h2s-server.rs b/src/bin/h2s-server.rs index a3800192..478aa98a 100644 --- a/src/bin/h2s-server.rs +++ b/src/bin/h2s-server.rs @@ -1,16 +1,17 @@ use failure::*; use futures::*; -use std::path::Path; // Simple H2 server to test H2 speed with h2s-client.rs use hyper::{Request, Response, Body}; use tokio::net::TcpListener; -use proxmox_backup::client::pipe_to_stream::*; -use proxmox_backup::tools; use proxmox_backup::configdir; +use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use std::sync::Arc; +use tokio_openssl::SslAcceptorExt; + pub fn main() -> Result<(), Error> { start_h2_server()?; @@ -18,39 +19,19 @@ pub fn main() -> Result<(), Error> { Ok(()) } -fn load_certificate, U: AsRef>( - key: T, - cert: U, -) -> Result { - let key = tools::file_get_contents(key)?; - let cert = tools::file_get_contents(cert)?; - - let key = openssl::pkey::PKey::private_key_from_pem(&key)?; - let cert = openssl::x509::X509::from_pem(&cert)?; - - Ok(openssl::pkcs12::Pkcs12::builder() - .build("", "", &key, &cert)?) -} - pub fn start_h2_server() -> Result<(), Error> { - let cert_path = configdir!("/proxy.pfx"); - let raw_cert = match std::fs::read(cert_path) { - Ok(pfx) => pfx, - Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => { - let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?; - pkcs12.to_der()? - } - Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err), - }; + let key_path = configdir!("/proxy.key"); + let cert_path = configdir!("/proxy.pem"); - let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") { - Ok(data) => data, - Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err), - }; + let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + acceptor.set_private_key_file(key_path, SslFiletype::PEM) + .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; + acceptor.set_certificate_chain_file(cert_path) + .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; + acceptor.check_private_key().unwrap(); - let acceptor = native_tls::TlsAcceptor::new(identity)?; - let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); + let acceptor = Arc::new(acceptor.build()); let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap(); @@ -59,7 +40,12 @@ pub fn start_h2_server() -> Result<(), Error> { let server = listener .incoming() .map_err(Error::from) - .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into())) + .and_then(move |sock| { + sock.set_nodelay(true).unwrap(); + sock.set_send_buffer_size(1024*1024).unwrap(); + sock.set_recv_buffer_size(1024*1024).unwrap(); + acceptor.accept_async(sock).map_err(|e| e.into()) + }) .then(|r| match r { // accept()s can fail here with an Err() when eg. the client rejects // the cert and closes the connection, so we follow up with mapping @@ -88,7 +74,7 @@ pub fn start_h2_server() -> Result<(), Error> { http.http2_initial_stream_window_size(max_window_size); http.http2_initial_connection_window_size(max_window_size); - let service = hyper::service::service_fn(|req: Request| { + let service = hyper::service::service_fn(|_req: Request| { println!("Got request"); let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...] let body = Body::from(buffer); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index a3255a72..fe9e628a 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,9 +1,5 @@ -use std::io; -use std::path::Path; - use proxmox_backup::try_block; use proxmox_backup::configdir; -use proxmox_backup::tools; use proxmox_backup::server; use proxmox_backup::tools::daemon; use proxmox_backup::api_schema::router::*; @@ -17,6 +13,10 @@ use lazy_static::lazy_static; use futures::*; use futures::stream::Stream; +use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use std::sync::Arc; +use tokio_openssl::SslAcceptorExt; + use hyper; fn main() { @@ -27,20 +27,6 @@ fn main() { } } -fn load_certificate, U: AsRef>( - key: T, - cert: U, -) -> Result { - let key = tools::file_get_contents(key)?; - let cert = tools::file_get_contents(cert)?; - - let key = openssl::pkey::PKey::private_key_from_pem(&key)?; - let cert = openssl::x509::X509::from_pem(&cert)?; - - Ok(openssl::pkcs12::Pkcs12::builder() - .build("", "", &key, &cert)?) -} - fn run() -> Result<(), Error> { if let Err(err) = syslog::init( syslog::Facility::LOG_DAEMON, @@ -72,26 +58,22 @@ fn run() -> Result<(), Error> { let rest_server = RestServer::new(config); - let cert_path = configdir!("/proxy.pfx"); - let raw_cert = match std::fs::read(cert_path) { - Ok(pfx) => pfx, - Err(ref err) if err.kind() == io::ErrorKind::NotFound => { - let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?; - pkcs12.to_der()? - } - Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err), - }; + //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes + let key_path = configdir!("/proxy.key"); + let cert_path = configdir!("/proxy.pem"); - let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") { - Ok(data) => data, - Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err), - }; + let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + acceptor.set_private_key_file(key_path, SslFiletype::PEM) + .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; + acceptor.set_certificate_chain_file(cert_path) + .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; + acceptor.check_private_key().unwrap(); + + let acceptor = Arc::new(acceptor.build()); let server = daemon::create_daemon( ([0,0,0,0,0,0,0,0], 8007).into(), |listener| { - let acceptor = native_tls::TlsAcceptor::new(identity)?; - let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); let connections = listener .incoming() .map_err(Error::from) @@ -99,7 +81,7 @@ fn run() -> Result<(), Error> { sock.set_nodelay(true).unwrap(); sock.set_send_buffer_size(1024*1024).unwrap(); sock.set_recv_buffer_size(1024*1024).unwrap(); - acceptor.accept(sock).map_err(|e| e.into()) + acceptor.accept_async(sock).map_err(|e| e.into()) }) .then(|r| match r { // accept()s can fail here with an Err() when eg. the client rejects diff --git a/src/bin/proxmox-protocol-testclient.rs b/src/bin/proxmox-protocol-testclient.rs deleted file mode 100644 index e595f7e7..00000000 --- a/src/bin/proxmox-protocol-testclient.rs +++ /dev/null @@ -1,712 +0,0 @@ -use std::io; -use std::process::exit; - -use chrono::Utc; -use failure::*; -use futures::future::{ok, poll_fn, Future}; -use futures::try_ready; -use futures::{Async, Poll}; -use http::{Request, Response, StatusCode}; -use hyper::rt::Stream; -use hyper::Body; -use tokio::prelude::*; -use tokio_fs::file::File; - -use proxmox_protocol::Client as PmxClient; -use proxmox_protocol::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId}; - -use proxmox_backup::client::BackupRepository; - -// This is a temporary client using the backup protocol crate. -// Its functionality should be moved to the `proxmox-backup-client` binary instead. -// For now this is mostly here to keep in the history an alternative way of connecting to an https -// server without hyper-tls in the background. -// Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get -// rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate. - -type HyperConnection = hyper::client::conn::Connection; -type HyperConnType = HyperConnection, Body>; - -// Create a future which connects to a TLS-enabled http server. -// This would ordinarily be covered by the Connect trait in the higher level hyper interface. -// Connect to the server, initiate TLS, finally run hyper's handshake method. -fn connect( - domain: &str, - port: u16, - no_cert_validation: bool, -) -> impl Future< - // Typing out this function signature is almost more work than copying its code body... - Item = (hyper::client::conn::SendRequest, HyperConnType), - Error = Error, -> { - // tokio::net::TcpStream::connect(addr) <- this takes only a single address! - // so we need to improvise...: - use tokio_threadpool::blocking; - - let domain = domain.to_string(); - let domain2 = domain.clone(); - poll_fn(move || { - blocking(|| { - let conn = - std::net::TcpStream::connect((domain.as_str(), port)).map_err(Error::from)?; - tokio::net::TcpStream::from_std(conn, &Default::default()).map_err(Error::from) - }) - .map_err(Error::from) - }) - .map_err(Error::from) - .flatten() - .and_then(move |tcp| { - let mut builder = native_tls::TlsConnector::builder(); - if no_cert_validation { - builder.danger_accept_invalid_certs(true); - } - let connector = tokio_tls::TlsConnector::from(builder.build().unwrap()); - connector.connect(&domain2, tcp).map_err(Error::from) - }) - .and_then(|tls| hyper::client::conn::handshake(tls).map_err(Error::from)) -} - -// convenience helper for non-Deserialize data... -fn required_string_member(value: &serde_json::Value, member: &str) -> Result { - Ok(value - .get(member) - .ok_or_else(|| format_err!("missing '{}' in response", member))? - .as_str() - .ok_or_else(|| format_err!("invalid data type for '{}' in response", member))? - .to_string()) -} - -struct Auth { - ticket: String, - token: String, -} - -// Create a future which logs in on a proxmox backup server and yields an Auth struct. -fn login( - domain: &str, - port: u16, - no_cert_validation: bool, - urlbase: &str, - user: String, - pass: String, -) -> impl Future { - let formdata = Body::from( - url::form_urlencoded::Serializer::new(String::new()) - .append_pair("username", &{ user }) - .append_pair("password", &{ pass }) - .finish(), - ); - - let urlbase = urlbase.to_string(); - connect(domain, port, no_cert_validation) - .and_then(move |(mut client, conn)| { - let req = Request::builder() - .method("POST") - .uri(format!("{}/access/ticket", urlbase)) - .header("Content-type", "application/x-www-form-urlencoded") - .body(formdata)?; - Ok((client.send_request(req), conn)) - }) - .and_then(|(res, conn)| { - let mut conn = Some(conn); - res.map(|res| { - res.into_body() - .concat2() - .map_err(Error::from) - .and_then(|data| { - let data: serde_json::Value = serde_json::from_slice(&data)?; - let data = data - .get("data") - .ok_or_else(|| format_err!("missing 'data' in response"))?; - let ticket = required_string_member(data, "ticket")?; - let token = required_string_member(data, "CSRFPreventionToken")?; - - Ok(Auth { ticket, token }) - }) - }) - .join(poll_fn(move || { - try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); - Ok(Async::Ready(conn.take().unwrap())) - })) - .map_err(Error::from) - }) - .and_then(|(res, _conn)| res) -} - -// Factored out protocol switching future: Takes a Response future and a connection and verifies -// its returned headers and protocol values. Yields a Response and the connection. -fn switch_protocols( - res: hyper::client::conn::ResponseFuture, - conn: HyperConnType, -) -> impl Future, Error>, HyperConnType), Error = Error> { - let mut conn = Some(conn); - res.map(|res| { - if res.status() != StatusCode::SWITCHING_PROTOCOLS { - bail!("unexpected status code - expected SwitchingProtocols"); - } - let upgrade = match res.headers().get("Upgrade") { - None => bail!("missing upgrade header in server response!"), - Some(u) => u, - }; - if upgrade != "proxmox-backup-protocol-1" { - match upgrade.to_str() { - Ok(s) => bail!("unexpected upgrade protocol type received: {}", s), - _ => bail!("unexpected upgrade protocol type received"), - } - } - Ok(res) - }) - .map_err(Error::from) - .join(poll_fn(move || { - try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); - Ok(Async::Ready(conn.take().unwrap())) - })) -} - -// Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader: -struct UploaderBase { - client: Option>, - wait_id: Option, -} - -impl UploaderBase { - pub fn new(client: PmxClient) -> Self { - Self { - client: Some(client), - wait_id: None, - } - } - - pub fn create_backup( - &mut self, - index_type: IndexType, - backup_type: &str, - backup_id: &str, - backup_timestamp: i64, - filename: &str, - chunk_size: usize, - file_size: Option, - ) -> Result { - if self.wait_id.is_some() { - bail!("create_backup cannot be called while awaiting a response"); - } - - let backup_stream = self.client.as_mut().unwrap().create_backup( - index_type, - backup_type, - backup_id, - backup_timestamp, - filename, - chunk_size, - file_size, - true, - )?; - self.wait_id = Some(backup_stream.into()); - Ok(backup_stream) - } - - pub fn poll_ack(&mut self) -> Poll<(), Error> { - if let Some(id) = self.wait_id { - if self.client.as_mut().unwrap().wait_for_id(id)? { - self.wait_id = None; - } else { - return Ok(Async::NotReady); - } - } - return Ok(Async::Ready(())); - } - - pub fn poll_send(&mut self) -> Poll<(), Error> { - match self.client.as_mut().unwrap().poll_send()? { - Some(false) => Ok(Async::NotReady), - _ => Ok(Async::Ready(())), - } - } - - pub fn upload_chunk( - &mut self, - info: &ChunkEntry, - chunk: &[u8], - ) -> Result, Error> { - self.client.as_mut().unwrap().upload_chunk(info, chunk) - } - - pub fn continue_upload_chunk(&mut self, chunk: &[u8]) -> Result, Error> { - let res = self.client.as_mut().unwrap().continue_upload_chunk(chunk)?; - if let Some(id) = res { - self.wait_id = Some(id); - } - Ok(res) - } - - pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(), Error> { - let id = stream.into(); - let (name, _done) = self.client.as_mut().unwrap().finish_backup(stream)?; - println!("Server created file: {}", name); - self.wait_id = Some(id); - Ok(()) - } - - pub fn take_client(&mut self) -> Option> { - self.client.take() - } -} - -// Future which creates a backup with a dynamic file: -struct DynamicIndexUploader { - base: UploaderBase, - chunks: ChunkStream, - current_chunk: Option, - backup_stream: Option, -} - -impl DynamicIndexUploader { - pub fn new( - client: PmxClient, - chunks: ChunkStream, - backup_type: &str, - backup_id: &str, - backup_timestamp: i64, - filename: &str, - chunk_size: usize, - ) -> Result { - let mut base = UploaderBase::new(client); - let stream = base.create_backup( - IndexType::Dynamic, - backup_type, - backup_id, - backup_timestamp, - filename, - chunk_size, - None, - )?; - Ok(Self { - base, - chunks, - current_chunk: None, - backup_stream: Some(stream), - }) - } - - fn get_chunk<'a>(chunks: &'a mut ChunkStream) -> Poll, Error> { - match chunks.get() { - Ok(Some(None)) => Ok(Async::Ready(None)), - Ok(Some(Some(chunk))) => Ok(Async::Ready(Some(chunk))), - Ok(None) => return Ok(Async::NotReady), - Err(e) => return Err(e), - } - } - - fn finished_chunk(&mut self) -> Result<(), Error> { - self.base.client.as_mut().unwrap().dynamic_chunk( - self.backup_stream.unwrap(), - self.current_chunk.as_ref().unwrap(), - )?; - - self.current_chunk = None; - self.chunks.next(); - Ok(()) - } -} - -impl Future for DynamicIndexUploader { - type Item = PmxClient; - type Error = Error; - - fn poll(&mut self) -> Poll { - loop { - // Process our upload queue if we have one: - try_ready!(self.base.poll_send()); - - // If we have a chunk in-flight, wait for acknowledgement: - try_ready!(self.base.poll_ack()); - - // Get our current chunk: - let chunk = match try_ready!(Self::get_chunk(&mut self.chunks)) { - Some(chunk) => chunk, - None => match self.backup_stream.take() { - Some(stream) => { - self.base.finish_backup(stream)?; - continue; - } - None => return Ok(Async::Ready(self.base.take_client().unwrap())), - }, - }; - - // If the current chunk is in-flight just poll the upload: - if self.current_chunk.is_some() { - if self.base.continue_upload_chunk(chunk)?.is_some() { - self.finished_chunk()?; - } - continue; - } - - let client = self.base.client.as_ref().unwrap(); - - // We got a new chunk, see if we need to upload it: - self.current_chunk = Some(ChunkEntry::from_data(chunk)); - let entry = self.current_chunk.as_ref().unwrap(); - if client.is_chunk_available(entry) { - eprintln!("Already available: {}", entry.digest_to_hex()); - self.finished_chunk()?; - } else { - eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len()); - match self.base.upload_chunk(entry, chunk)? { - Some(_id) => { - eprintln!("Finished right away!"); - self.finished_chunk()?; - } - None => { - // Send-buffer filled up, start polling the upload process. - continue; - } - } - } - } - } -} - -struct FixedIndexUploader { - base: UploaderBase, - input: T, - backup_stream: Option, - current_chunk: Option, - chunk_size: usize, - index: usize, - buffer: Vec, - eof: bool, -} - -impl FixedIndexUploader { - pub fn new( - client: PmxClient, - input: T, - backup_type: &str, - backup_id: &str, - backup_timestamp: i64, - filename: &str, - chunk_size: usize, - file_size: u64, - ) -> Result { - let mut base = UploaderBase::new(client); - let stream = base.create_backup( - IndexType::Fixed, - backup_type, - backup_id, - backup_timestamp, - filename, - chunk_size, - Some(file_size), - )?; - Ok(Self { - base, - input, - backup_stream: Some(stream), - current_chunk: None, - chunk_size, - index: 0, - buffer: Vec::with_capacity(chunk_size), - eof: false, - }) - } - - fn fill_chunk(&mut self) -> Poll { - let mut pos = self.buffer.len(); - - // we hit eof and we want the next chunk, return false: - if self.eof && pos == 0 { - return Ok(Async::Ready(false)); - } - - // we still have a full chunk right now: - if pos == self.chunk_size { - return Ok(Async::Ready(true)); - } - - // fill it up: - unsafe { - self.buffer.set_len(self.chunk_size); - } - let res = loop { - match self.input.poll_read(&mut self.buffer[pos..]) { - Err(e) => break Err(e), - Ok(Async::NotReady) => break Ok(Async::NotReady), - Ok(Async::Ready(got)) => { - if got == 0 { - self.eof = true; - break Ok(Async::Ready(true)); - } - pos += got; - if pos == self.chunk_size { - break Ok(Async::Ready(true)); - } - // read more... - } - } - }; - unsafe { - self.buffer.set_len(pos); - } - res - } - - fn finished_chunk(&mut self) -> Result<(), Error> { - self.base.client.as_mut().unwrap().fixed_data( - self.backup_stream.unwrap(), - self.index, - self.current_chunk.as_ref().unwrap(), - )?; - self.index += 1; - self.current_chunk = None; - unsafe { - // This is how we tell fill_chunk() that it needs to read new data - self.buffer.set_len(0); - } - Ok(()) - } -} - -impl Future for FixedIndexUploader { - type Item = PmxClient; - type Error = Error; - - fn poll(&mut self) -> Poll { - loop { - // Process our upload queue if we have one: - try_ready!(self.base.poll_send()); - - // If we have a chunk in-flight, wait for acknowledgement: - try_ready!(self.base.poll_ack()); - - // Get our current chunk: - if !try_ready!(self.fill_chunk()) { - match self.backup_stream.take() { - Some(stream) => { - self.base.finish_backup(stream)?; - continue; - } - None => { - return Ok(Async::Ready(self.base.take_client().unwrap())); - } - } - }; - - let chunk = &self.buffer[..]; - - // If the current chunk is in-flight just poll the upload: - if self.current_chunk.is_some() { - if self.base.continue_upload_chunk(chunk)?.is_some() { - self.finished_chunk()?; - } - continue; - } - - let client = self.base.client.as_ref().unwrap(); - - // We got a new chunk, see if we need to upload it: - self.current_chunk = Some(ChunkEntry::from_data(chunk)); - let entry = self.current_chunk.as_ref().unwrap(); - if client.is_chunk_available(entry) { - eprintln!("Already available: {}", entry.digest_to_hex()); - self.finished_chunk()?; - } else { - eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len()); - match self.base.upload_chunk(entry, chunk)? { - Some(_id) => { - eprintln!("Finished right away!"); - self.finished_chunk()?; - } - None => { - // Send-buffer filled up, start polling the upload process. - continue; - } - } - } - } - } -} - -// Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete: -struct ClientWaitFuture( - Option>, - fn(&mut PmxClient) -> Result, -); - -impl Future for ClientWaitFuture { - type Item = PmxClient; - type Error = Error; - - fn poll(&mut self) -> Poll { - if (self.1)(self.0.as_mut().unwrap())? { - Ok(Async::Ready(self.0.take().unwrap())) - } else { - Ok(Async::NotReady) - } - } -} - -// Trait to provide Futures for some proxmox_protocol::Client methods: -trait ClientOps { - fn poll_handshake(self) -> ClientWaitFuture; - fn poll_hashes(self, file: &str) -> Result, Error>; -} - -impl ClientOps for PmxClient { - fn poll_handshake(self) -> ClientWaitFuture { - ClientWaitFuture(Some(self), PmxClient::::wait_for_handshake) - } - - fn poll_hashes(mut self, name: &str) -> Result, Error> { - self.query_hashes(name)?; - Ok(ClientWaitFuture(Some(self), PmxClient::::wait_for_hashes)) - } -} - -// CLI helper. -fn require_arg(args: &mut dyn Iterator, name: &str) -> String { - match args.next() { - Some(arg) => arg, - None => { - eprintln!("missing required argument: {}", name); - exit(1); - } - } -} - -fn main() { - // Usage: - // ./proxmox-protocol-testclient [] - // - // This will query the remote server for a list of chunks in if the argument was - // provided, otherwise assumes all chunks are new. - - let mut args = std::env::args().skip(1); - let mut repo = require_arg(&mut args, "repository"); - let use_fixed_chunks = if repo == "--fixed" { - repo = require_arg(&mut args, "repository"); - true - } else { - false - }; - let backup_type = require_arg(&mut args, "backup-type"); - let backup_id = require_arg(&mut args, "backup-id"); - let filename = require_arg(&mut args, "backup-file-name"); - // optional previous backup: - let previous = args.next().map(|s| s.to_string()); - - let repo: BackupRepository = match repo.parse() { - Ok(repo) => repo, - Err(e) => { - eprintln!("error parsing repository: {}", e); - exit(1); - } - }; - - let backup_time = Utc::now().timestamp(); - // Or fake the time to verify we cannot create an already existing backup: - //let backup_time = Utc::today().and_hms(3, 25, 55); - - println!( - "Uploading file `{}`, type {}, id: {}", - filename, backup_type, backup_id - ); - - let no_cert_validation = true; // FIXME - let domain = repo.host().to_owned(); - let port = 8007; - let address = format!("{}:{}", domain, port); - let urlbase = format!("https://{}/api2/json", address); - - let user = repo.user().to_string(); - let pass = match proxmox_backup::tools::tty::read_password("Password: ") - .and_then(|x| String::from_utf8(x).map_err(Error::from)) - { - Ok(pass) => pass, - Err(e) => { - eprintln!("error getting password: {}", e); - exit(1); - } - }; - let store = repo.store().to_owned(); - - let stream = File::open(filename.clone()) - .map_err(Error::from) - .join(login( - &domain, - port, - no_cert_validation, - &urlbase, - user, - pass, - )) - .and_then(move |(file, auth)| { - ok((file, auth)).join(connect(&domain, port, no_cert_validation)) - }) - .and_then(move |((file, auth), (mut client, conn))| { - let req = Request::builder() - .method("GET") - .uri(format!("{}/admin/datastore/{}/test-upload", urlbase, store)) - .header("Cookie", format!("PBSAuthCookie={}", auth.ticket)) - .header("CSRFPreventionToken", auth.token) - .header("Connection", "Upgrade") - .header("Upgrade", "proxmox-backup-protocol-1") - .body(Body::empty())?; - Ok((file, client.send_request(req), conn)) - }) - .and_then(|(file, res, conn)| ok(file).join(switch_protocols(res, conn))) - .and_then(|(file, (_, conn))| { - let client = PmxClient::new(conn.into_parts().io); - file.metadata() - .map_err(Error::from) - .join(client.poll_handshake()) - }) - .and_then(move |((file, meta), client)| { - eprintln!("Server said hello"); - // 2 possible futures of distinct types need an explicit cast to Box... - let fut: Box + Send> = - if let Some(previous) = previous { - let query = client.poll_hashes(&previous)?; - Box::new(ok((file, meta)).join(query)) - } else { - Box::new(ok(((file, meta), client))) - }; - Ok(fut) - }) - .flatten() - .and_then(move |((file, meta), client)| { - eprintln!("starting uploader..."); - let uploader: Box + Send> = if use_fixed_chunks { - Box::new(FixedIndexUploader::new( - client, - file, - &backup_type, - &backup_id, - backup_time, - &filename, - 4 * 1024 * 1024, - meta.len(), - )?) - } else { - let chunker = ChunkStream::new(file); - Box::new(DynamicIndexUploader::new( - client, - chunker, - &backup_type, - &backup_id, - backup_time, - &filename, - 4 * 1024 * 1024, - )?) - }; - Ok(uploader) - }) - .flatten(); - - let stream = stream - .and_then(move |_client| { - println!("Done"); - Ok(()) - }) - .map_err(|e| eprintln!("error: {}", e)); - hyper::rt::run(stream); -} diff --git a/src/client/http_client.rs b/src/client/http_client.rs index d4903c0c..57bd8b19 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -16,6 +16,7 @@ use futures::*; use futures::stream::Stream; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::mpsc; +use openssl::ssl::{SslConnector, SslMethod}; use serde_json::{json, Value}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; @@ -36,7 +37,7 @@ struct AuthInfo { /// HTTP(S) API client pub struct HttpClient { - client: Client>, + client: Client>, server: String, auth: BroadcastFuture, } @@ -156,17 +157,19 @@ impl HttpClient { bail!("no password input mechanism available"); } - fn build_client() -> Client> { - let mut builder = native_tls::TlsConnector::builder(); - // FIXME: We need a CLI option for this! - builder.danger_accept_invalid_certs(true); - let tlsconnector = builder.build().unwrap(); + fn build_client() -> Client> { + + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); + + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme! + let mut httpc = hyper::client::HttpConnector::new(1); httpc.set_nodelay(true); // important for h2 download performance! httpc.set_recv_buf_size(Some(1024*1024)); //important for h2 download performance! httpc.enforce_http(false); // we want https... - let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector)); - https.https_only(true); // force it! + + let https = hyper_openssl::HttpsConnector::with_connector(httpc, ssl_connector_builder).unwrap(); + Client::builder() //.http2_initial_stream_window_size( (1 << 31) - 2) //.http2_initial_connection_window_size( (1 << 31) - 2) @@ -356,7 +359,7 @@ impl HttpClient { } fn credentials( - client: Client>, + client: Client>, server: String, username: String, password: String, @@ -411,7 +414,7 @@ impl HttpClient { } fn api_request( - client: Client>, + client: Client>, req: Request ) -> impl Future { @@ -1046,7 +1049,6 @@ impl H2Client { pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { let request = Self::request_builder("localhost", "POST", path, param).unwrap(); - self.h2.clone() .ready() .map_err(Error::from)