more formatting & use statement fixups

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-22 13:46:31 +02:00
parent 89ceb33f89
commit 7a57cb77e1
4 changed files with 36 additions and 40 deletions

View File

@ -1,37 +1,33 @@
use failure::*;
use http::Uri;
use hyper::Body;
use hyper::client::Client;
use xdg::BaseDirectories;
use chrono::{DateTime, Utc};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::io::Write; use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use http::{Request, Response}; use chrono::{DateTime, Utc};
use http::header::HeaderValue; use failure::*;
use futures::*; use futures::*;
use futures::stream::Stream; use futures::stream::Stream;
use std::sync::atomic::{AtomicUsize, Ordering}; use http::Uri;
use tokio::sync::mpsc; use http::header::HeaderValue;
use http::{Request, Response};
use hyper::Body;
use hyper::client::Client;
use openssl::ssl::{SslConnector, SslMethod}; use openssl::ssl::{SslConnector, SslMethod};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::sync::mpsc;
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
use xdg::BaseDirectories;
use proxmox::tools::{ use proxmox::tools::{
digest_to_hex, digest_to_hex,
fs::{file_get_json, file_set_contents}, fs::{file_get_json, file_set_contents},
}; };
use crate::tools::{self, BroadcastFuture, tty}; use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
use crate::tools::futures::{cancellable, Canceller}; use super::pipe_to_stream::PipeToSendStream;
use super::pipe_to_stream::*;
use super::merge_known_chunks::*;
use crate::backup::*; use crate::backup::*;
use crate::tools::futures::{cancellable, Canceller};
use crate::tools::{self, BroadcastFuture, tty};
#[derive(Clone)] #[derive(Clone)]
pub struct AuthInfo { pub struct AuthInfo {

View File

@ -4,7 +4,7 @@ use futures::*;
use crate::backup::ChunkInfo; use crate::backup::ChunkInfo;
pub enum MergedChunkInfo { pub enum MergedChunkInfo {
Known(Vec<(u64,[u8;32])>), Known(Vec<(u64, [u8; 32])>),
New(ChunkInfo), New(ChunkInfo),
} }
@ -17,16 +17,21 @@ pub struct MergeKnownChunksQueue<S> {
buffer: Option<MergedChunkInfo>, buffer: Option<MergedChunkInfo>,
} }
impl <S> MergeKnownChunks for S impl<S> MergeKnownChunks for S
where S: Stream<Item=MergedChunkInfo, Error=Error>, where
S: Stream<Item = MergedChunkInfo, Error = Error>,
{ {
fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> { fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> {
MergeKnownChunksQueue { input: self, buffer: None } MergeKnownChunksQueue {
input: self,
buffer: None,
}
} }
} }
impl <S> Stream for MergeKnownChunksQueue<S> impl<S> Stream for MergeKnownChunksQueue<S>
where S: Stream<Item=MergedChunkInfo, Error=Error>, where
S: Stream<Item = MergedChunkInfo, Error = Error>,
{ {
type Item = MergedChunkInfo; type Item = MergedChunkInfo;
type Error = Error; type Error = Error;
@ -48,10 +53,8 @@ impl <S> Stream for MergeKnownChunksQueue<S>
} }
} }
Ok(Async::Ready(Some(mergerd_chunk_info))) => { Ok(Async::Ready(Some(mergerd_chunk_info))) => {
match mergerd_chunk_info { match mergerd_chunk_info {
MergedChunkInfo::Known(list) => { MergedChunkInfo::Known(list) => {
let last = self.buffer.take(); let last = self.buffer.take();
match last { match last {

View File

@ -2,11 +2,10 @@
// //
// See also: hyper/src/proto/h2/mod.rs // See also: hyper/src/proto/h2/mod.rs
use failure::*;
use futures::{try_ready, Async, Future, Poll};
use h2::{SendStream};
use bytes::Bytes; use bytes::Bytes;
use failure::*;
use futures::{try_ready, Async, Future, Poll};
use h2::SendStream;
pub struct PipeToSendStream { pub struct PipeToSendStream {
body_tx: SendStream<Bytes>, body_tx: SendStream<Bytes>,
@ -53,7 +52,6 @@ impl Future for PipeToSendStream {
.map_err(Error::from)?; .map_err(Error::from)?;
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} else { } else {
if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? {
return Err(format_err!("stream received RST_STREAM: {:?}", reason)); return Err(format_err!("stream received RST_STREAM: {:?}", reason));

View File

@ -1,5 +1,6 @@
use std::thread;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread;
use failure::*; use failure::*;
use tokio::prelude::*; use tokio::prelude::*;
@ -9,13 +10,14 @@ pub struct StorageOperation {
} }
impl StorageOperation { impl StorageOperation {
pub fn new() -> Self { pub fn new() -> Self {
StorageOperation { state: Arc::new(Mutex::new(false)), running: false } StorageOperation {
state: Arc::new(Mutex::new(false)),
running: false,
}
} }
fn run(&mut self, task: task::Task) { fn run(&mut self, task: task::Task) {
let state = self.state.clone(); let state = self.state.clone();
thread::spawn(move || { thread::spawn(move || {
@ -51,11 +53,8 @@ impl Future for StorageOperation {
} }
} }
#[test] #[test]
fn test_storage_future() fn test_storage_future() {
{
let op = StorageOperation::new(); let op = StorageOperation::new();
hyper::rt::run(op.map_err(|e| { hyper::rt::run(op.map_err(|e| {
println!("Got Error: {}", e); println!("Got Error: {}", e);