diff --git a/src/client/http_client.rs b/src/client/http_client.rs index a190ce53..5158fc69 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -1,37 +1,33 @@ -use failure::*; - -use http::Uri; -use hyper::Body; -use hyper::client::Client; -use xdg::BaseDirectories; -use chrono::{DateTime, Utc}; use std::collections::HashSet; -use std::sync::{Arc, Mutex}; use std::io::Write; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; -use http::{Request, Response}; -use http::header::HeaderValue; - +use chrono::{DateTime, Utc}; +use failure::*; use futures::*; use futures::stream::Stream; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio::sync::mpsc; +use http::Uri; +use http::header::HeaderValue; +use http::{Request, Response}; +use hyper::Body; +use hyper::client::Client; use openssl::ssl::{SslConnector, SslMethod}; - use serde_json::{json, Value}; +use tokio::sync::mpsc; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; +use xdg::BaseDirectories; use proxmox::tools::{ digest_to_hex, fs::{file_get_json, file_set_contents}, }; -use crate::tools::{self, BroadcastFuture, tty}; -use crate::tools::futures::{cancellable, Canceller}; -use super::pipe_to_stream::*; -use super::merge_known_chunks::*; - +use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks}; +use super::pipe_to_stream::PipeToSendStream; use crate::backup::*; +use crate::tools::futures::{cancellable, Canceller}; +use crate::tools::{self, BroadcastFuture, tty}; #[derive(Clone)] pub struct AuthInfo { diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs index c5255bdf..ff3b3661 100644 --- a/src/client/merge_known_chunks.rs +++ b/src/client/merge_known_chunks.rs @@ -4,7 +4,7 @@ use futures::*; use crate::backup::ChunkInfo; pub enum MergedChunkInfo { - Known(Vec<(u64,[u8;32])>), + Known(Vec<(u64, [u8; 32])>), New(ChunkInfo), } @@ -17,16 +17,21 @@ pub struct MergeKnownChunksQueue { buffer: Option, } -impl MergeKnownChunks for S - where S: Stream, +impl MergeKnownChunks for S +where + S: Stream, { fn merge_known_chunks(self) -> MergeKnownChunksQueue { - MergeKnownChunksQueue { input: self, buffer: None } + MergeKnownChunksQueue { + input: self, + buffer: None, + } } } -impl Stream for MergeKnownChunksQueue - where S: Stream, +impl Stream for MergeKnownChunksQueue +where + S: Stream, { type Item = MergedChunkInfo; type Error = Error; @@ -48,10 +53,8 @@ impl Stream for MergeKnownChunksQueue } } Ok(Async::Ready(Some(mergerd_chunk_info))) => { - match mergerd_chunk_info { MergedChunkInfo::Known(list) => { - let last = self.buffer.take(); match last { diff --git a/src/client/pipe_to_stream.rs b/src/client/pipe_to_stream.rs index 4a972f29..5968f02f 100644 --- a/src/client/pipe_to_stream.rs +++ b/src/client/pipe_to_stream.rs @@ -2,11 +2,10 @@ // // See also: hyper/src/proto/h2/mod.rs -use failure::*; - -use futures::{try_ready, Async, Future, Poll}; -use h2::{SendStream}; use bytes::Bytes; +use failure::*; +use futures::{try_ready, Async, Future, Poll}; +use h2::SendStream; pub struct PipeToSendStream { body_tx: SendStream, @@ -53,7 +52,6 @@ impl Future for PipeToSendStream { .map_err(Error::from)?; return Ok(Async::Ready(())); - } else { if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { return Err(format_err!("stream received RST_STREAM: {:?}", reason)); diff --git a/src/storage/futures.rs b/src/storage/futures.rs index da64f936..83a47771 100644 --- a/src/storage/futures.rs +++ b/src/storage/futures.rs @@ -1,5 +1,6 @@ -use std::thread; use std::sync::{Arc, Mutex}; +use std::thread; + use failure::*; use tokio::prelude::*; @@ -9,13 +10,14 @@ pub struct StorageOperation { } impl StorageOperation { - pub fn new() -> Self { - StorageOperation { state: Arc::new(Mutex::new(false)), running: false } + StorageOperation { + state: Arc::new(Mutex::new(false)), + running: false, + } } fn run(&mut self, task: task::Task) { - let state = self.state.clone(); thread::spawn(move || { @@ -51,11 +53,8 @@ impl Future for StorageOperation { } } - #[test] -fn test_storage_future() -{ - +fn test_storage_future() { let op = StorageOperation::new(); hyper::rt::run(op.map_err(|e| { println!("Got Error: {}", e);