From be3a0295b6ec996d4fc90ee8699768bbef26166b Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 9 Jul 2021 15:29:42 +0200 Subject: [PATCH] client: import updates Signed-off-by: Wolfgang Bumiller --- src/client/backup_writer.rs | 14 +++++++------- src/client/http_client.rs | 3 ++- src/client/merge_known_chunks.rs | 6 +++--- src/client/pipe_to_stream.rs | 2 +- src/client/pxar_backup_stream.rs | 3 ++- src/client/remote_chunk_reader.rs | 8 ++++++-- 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index 7ef90793..5ab38b77 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; +use std::future::Future; use std::os::unix::fs::OpenOptionsExt; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; -use futures::future::AbortHandle; -use futures::stream::Stream; -use futures::*; +use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; +use futures::stream::{Stream, StreamExt, TryStreamExt}; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; @@ -453,7 +453,7 @@ impl BackupWriter { .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { (Some(response), MergedChunkInfo::Known(list)) => { - future::Either::Left( + Either::Left( response .map_err(Error::from) .and_then(H2Client::h2api_response) @@ -463,7 +463,7 @@ impl BackupWriter { ) } (None, MergedChunkInfo::Known(list)) => { - future::Either::Right(future::ok(MergedChunkInfo::Known(list))) + Either::Right(future::ok(MergedChunkInfo::Known(list))) } _ => unreachable!(), } @@ -736,7 +736,7 @@ impl BackupWriter { let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); - future::Either::Left(h2.send_request(request, upload_data).and_then( + Either::Left(h2.send_request(request, upload_data).and_then( move |response| async move { upload_queue .send((new_info, Some(response))) @@ -747,7 +747,7 @@ impl BackupWriter { }, )) } else { - future::Either::Right(async move { + Either::Right(async move { upload_queue .send((merged_chunk_info, None)) .await diff --git a/src/client/http_client.rs b/src/client/http_client.rs index e05ee601..0f3ec729 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -23,8 +23,9 @@ use proxmox::{ use proxmox_http::client::HttpsConnector; use proxmox_http::uri::build_authority; +use pbs_api_types::{Authid, Userid}; + use super::pipe_to_stream::PipeToSendStream; -use crate::api2::types::{Authid, Userid}; use crate::tools::{ self, BroadcastFuture, diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs index 16e9ab9a..ef7a8f9f 100644 --- a/src/client/merge_known_chunks.rs +++ b/src/client/merge_known_chunks.rs @@ -1,11 +1,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::{Error}; -use futures::*; +use anyhow::Error; +use futures::{ready, Stream}; use pin_project::pin_project; -use crate::backup::ChunkInfo; +use pbs_datastore::data_blob::ChunkInfo; pub enum MergedChunkInfo { Known(Vec<(u64, [u8; 32])>), diff --git a/src/client/pipe_to_stream.rs b/src/client/pipe_to_stream.rs index 63a2d818..d461b1df 100644 --- a/src/client/pipe_to_stream.rs +++ b/src/client/pipe_to_stream.rs @@ -5,8 +5,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use bytes::Bytes; use anyhow::{format_err, Error}; +use bytes::Bytes; use futures::{ready, Future}; use h2::SendStream; diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 1d3fc228..86bc8583 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -12,7 +12,8 @@ use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; -use crate::backup::CatalogWriter; +use pbs_datastore::catalog::CatalogWriter; + use crate::tools::{ StdChannelWriter, TokioWriterAdapter, diff --git a/src/client/remote_chunk_reader.rs b/src/client/remote_chunk_reader.rs index 62e6feaa..78c79af2 100644 --- a/src/client/remote_chunk_reader.rs +++ b/src/client/remote_chunk_reader.rs @@ -5,10 +5,14 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, Error}; -use super::BackupReader; -use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk}; +use pbs_datastore::{CryptConfig, CryptMode}; +use pbs_datastore::data_blob::DataBlob; +use pbs_datastore::read_chunk::ReadChunk; use pbs_runtime::block_on; +use super::BackupReader; +use crate::backup::AsyncReadChunk; + /// Read chunks from remote host using ``BackupReader`` #[derive(Clone)] pub struct RemoteChunkReader {