client: import updates

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2021-07-09 15:29:42 +02:00
parent aa2838c27a
commit be3a0295b6
6 changed files with 21 additions and 15 deletions

View File

@ -1,12 +1,12 @@
use std::collections::HashSet;
use std::future::Future;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::future::AbortHandle;
use futures::stream::Stream;
use futures::*;
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@ -453,7 +453,7 @@ impl BackupWriter {
.and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
match (response, merged_chunk_info) {
(Some(response), MergedChunkInfo::Known(list)) => {
future::Either::Left(
Either::Left(
response
.map_err(Error::from)
.and_then(H2Client::h2api_response)
@ -463,7 +463,7 @@ impl BackupWriter {
)
}
(None, MergedChunkInfo::Known(list)) => {
future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
Either::Right(future::ok(MergedChunkInfo::Known(list)))
}
_ => unreachable!(),
}
@ -736,7 +736,7 @@ impl BackupWriter {
let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
future::Either::Left(h2.send_request(request, upload_data).and_then(
Either::Left(h2.send_request(request, upload_data).and_then(
move |response| async move {
upload_queue
.send((new_info, Some(response)))
@ -747,7 +747,7 @@ impl BackupWriter {
},
))
} else {
future::Either::Right(async move {
Either::Right(async move {
upload_queue
.send((merged_chunk_info, None))
.await

View File

@ -23,8 +23,9 @@ use proxmox::{
use proxmox_http::client::HttpsConnector;
use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid};
use super::pipe_to_stream::PipeToSendStream;
use crate::api2::types::{Authid, Userid};
use crate::tools::{
self,
BroadcastFuture,

View File

@ -1,11 +1,11 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{Error};
use futures::*;
use anyhow::Error;
use futures::{ready, Stream};
use pin_project::pin_project;
use crate::backup::ChunkInfo;
use pbs_datastore::data_blob::ChunkInfo;
pub enum MergedChunkInfo {
Known(Vec<(u64, [u8; 32])>),

View File

@ -5,8 +5,8 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use anyhow::{format_err, Error};
use bytes::Bytes;
use futures::{ready, Future};
use h2::SendStream;

View File

@ -12,7 +12,8 @@ use nix::dir::Dir;
use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
use crate::backup::CatalogWriter;
use pbs_datastore::catalog::CatalogWriter;
use crate::tools::{
StdChannelWriter,
TokioWriterAdapter,

View File

@ -5,10 +5,14 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, Error};
use super::BackupReader;
use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
use pbs_datastore::{CryptConfig, CryptMode};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::read_chunk::ReadChunk;
use pbs_runtime::block_on;
use super::BackupReader;
use crate::backup::AsyncReadChunk;
/// Read chunks from remote host using ``BackupReader``
#[derive(Clone)]
pub struct RemoteChunkReader {