some internal combinator-influenced api cleanup
The download methods used to take the destination by value and return them again, since this was required when using combinators before we had `async fn`. But this is just an ugly left-over now. Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
parent
8e6e18b77c
commit
3d571d5509
|
@ -44,8 +44,8 @@ async fn run() -> Result<(), Error> {
|
||||||
|
|
||||||
let mut bytes = 0;
|
let mut bytes = 0;
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
let writer = DummyWriter { bytes: 0 };
|
let mut writer = DummyWriter { bytes: 0 };
|
||||||
let writer = client.speedtest(writer).await?;
|
client.speedtest(&mut writer).await?;
|
||||||
println!("Received {} bytes", writer.bytes);
|
println!("Received {} bytes", writer.bytes);
|
||||||
bytes += writer.bytes;
|
bytes += writer.bytes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2199,7 +2199,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
|
||||||
true,
|
true,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let mut tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.custom_flags(libc::O_TMPFILE)
|
.custom_flags(libc::O_TMPFILE)
|
||||||
|
@ -2216,7 +2216,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
|
||||||
Arc::new(BufferedDynamicReadAt::new(reader));
|
Arc::new(BufferedDynamicReadAt::new(reader));
|
||||||
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
|
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
|
||||||
|
|
||||||
let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
|
client.download(CATALOG_NAME, &mut tmpfile).await?;
|
||||||
let index = DynamicIndexReader::new(tmpfile)
|
let index = DynamicIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read catalog index - {}", err))?;
|
.map_err(|err| format_err!("unable to read catalog index - {}", err))?;
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ impl BackupReader {
|
||||||
&self,
|
&self,
|
||||||
file_name: &str,
|
file_name: &str,
|
||||||
output: W,
|
output: W,
|
||||||
) -> Result<W, Error> {
|
) -> Result<(), Error> {
|
||||||
let path = "download";
|
let path = "download";
|
||||||
let param = json!({ "file-name": file_name });
|
let param = json!({ "file-name": file_name });
|
||||||
self.h2.download(path, Some(param), output).await
|
self.h2.download(path, Some(param), output).await
|
||||||
|
@ -103,7 +103,7 @@ impl BackupReader {
|
||||||
pub async fn speedtest<W: Write + Send>(
|
pub async fn speedtest<W: Write + Send>(
|
||||||
&self,
|
&self,
|
||||||
output: W,
|
output: W,
|
||||||
) -> Result<W, Error> {
|
) -> Result<(), Error> {
|
||||||
self.h2.download("speedtest", None, output).await
|
self.h2.download("speedtest", None, output).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ impl BackupReader {
|
||||||
&self,
|
&self,
|
||||||
digest: &[u8; 32],
|
digest: &[u8; 32],
|
||||||
output: W,
|
output: W,
|
||||||
) -> Result<W, Error> {
|
) -> Result<(), Error> {
|
||||||
let path = "chunk";
|
let path = "chunk";
|
||||||
let param = json!({ "digest": digest_to_hex(digest) });
|
let param = json!({ "digest": digest_to_hex(digest) });
|
||||||
self.h2.download(path, Some(param), output).await
|
self.h2.download(path, Some(param), output).await
|
||||||
|
@ -127,7 +127,8 @@ impl BackupReader {
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
let raw_data = self.download(MANIFEST_BLOB_NAME, Vec::with_capacity(64*1024)).await?;
|
let mut raw_data = Vec::with_capacity(64 * 1024);
|
||||||
|
self.download(MANIFEST_BLOB_NAME, &mut raw_data).await?;
|
||||||
let blob = DataBlob::from_raw(raw_data)?;
|
let blob = DataBlob::from_raw(raw_data)?;
|
||||||
blob.verify_crc()?;
|
blob.verify_crc()?;
|
||||||
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
|
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
|
||||||
|
@ -146,13 +147,13 @@ impl BackupReader {
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<DataBlobReader<File>, Error> {
|
) -> Result<DataBlobReader<File>, Error> {
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let mut tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.custom_flags(libc::O_TMPFILE)
|
.custom_flags(libc::O_TMPFILE)
|
||||||
.open("/tmp")?;
|
.open("/tmp")?;
|
||||||
|
|
||||||
let mut tmpfile = self.download(name, tmpfile).await?;
|
self.download(name, &mut tmpfile).await?;
|
||||||
|
|
||||||
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
let (csum, size) = compute_file_csum(&mut tmpfile)?;
|
||||||
manifest.verify_file(name, &csum, size)?;
|
manifest.verify_file(name, &csum, size)?;
|
||||||
|
@ -172,13 +173,13 @@ impl BackupReader {
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<DynamicIndexReader, Error> {
|
) -> Result<DynamicIndexReader, Error> {
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let mut tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.custom_flags(libc::O_TMPFILE)
|
.custom_flags(libc::O_TMPFILE)
|
||||||
.open("/tmp")?;
|
.open("/tmp")?;
|
||||||
|
|
||||||
let tmpfile = self.download(name, tmpfile).await?;
|
self.download(name, &mut tmpfile).await?;
|
||||||
|
|
||||||
let index = DynamicIndexReader::new(tmpfile)
|
let index = DynamicIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read dynamic index '{}' - {}", name, err))?;
|
.map_err(|err| format_err!("unable to read dynamic index '{}' - {}", name, err))?;
|
||||||
|
@ -200,13 +201,13 @@ impl BackupReader {
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<FixedIndexReader, Error> {
|
) -> Result<FixedIndexReader, Error> {
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let mut tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.custom_flags(libc::O_TMPFILE)
|
.custom_flags(libc::O_TMPFILE)
|
||||||
.open("/tmp")?;
|
.open("/tmp")?;
|
||||||
|
|
||||||
let tmpfile = self.download(name, tmpfile).await?;
|
self.download(name, &mut tmpfile).await?;
|
||||||
|
|
||||||
let index = FixedIndexReader::new(tmpfile)
|
let index = FixedIndexReader::new(tmpfile)
|
||||||
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", name, err))?;
|
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", name, err))?;
|
||||||
|
|
|
@ -707,7 +707,7 @@ impl H2Client {
|
||||||
path: &str,
|
path: &str,
|
||||||
param: Option<Value>,
|
param: Option<Value>,
|
||||||
mut output: W,
|
mut output: W,
|
||||||
) -> Result<W, Error> {
|
) -> Result<(), Error> {
|
||||||
let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
|
let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
|
||||||
|
|
||||||
let response_future = self.send_request(request, None).await?;
|
let response_future = self.send_request(request, None).await?;
|
||||||
|
@ -727,7 +727,7 @@ impl H2Client {
|
||||||
output.write_all(&chunk)?;
|
output.write_all(&chunk)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(output)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload(
|
pub async fn upload(
|
||||||
|
|
|
@ -47,13 +47,13 @@ async fn download_manifest(
|
||||||
filename: &std::path::Path,
|
filename: &std::path::Path,
|
||||||
) -> Result<std::fs::File, Error> {
|
) -> Result<std::fs::File, Error> {
|
||||||
|
|
||||||
let tmp_manifest_file = std::fs::OpenOptions::new()
|
let mut tmp_manifest_file = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.open(&filename)?;
|
.open(&filename)?;
|
||||||
|
|
||||||
let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
|
reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
|
||||||
|
|
||||||
tmp_manifest_file.seek(SeekFrom::Start(0))?;
|
tmp_manifest_file.seek(SeekFrom::Start(0))?;
|
||||||
|
|
||||||
|
@ -77,13 +77,13 @@ async fn pull_single_archive(
|
||||||
tmp_path.set_extension("tmp");
|
tmp_path.set_extension("tmp");
|
||||||
|
|
||||||
worker.log(format!("sync archive {}", archive_name));
|
worker.log(format!("sync archive {}", archive_name));
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let mut tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
.open(&tmp_path)?;
|
.open(&tmp_path)?;
|
||||||
|
|
||||||
let tmpfile = reader.download(archive_name, tmpfile).await?;
|
reader.download(archive_name, &mut tmpfile).await?;
|
||||||
|
|
||||||
match archive_type(archive_name)? {
|
match archive_type(archive_name)? {
|
||||||
ArchiveType::DynamicIndex => {
|
ArchiveType::DynamicIndex => {
|
||||||
|
@ -124,7 +124,7 @@ async fn try_client_log_download(
|
||||||
.open(&tmp_path)?;
|
.open(&tmp_path)?;
|
||||||
|
|
||||||
// Note: be silent if there is no log - only log successful download
|
// Note: be silent if there is no log - only log successful download
|
||||||
if let Ok(_) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
|
if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
|
||||||
if let Err(err) = std::fs::rename(&tmp_path, &path) {
|
if let Err(err) = std::fs::rename(&tmp_path, &path) {
|
||||||
bail!("Atomic rename file {:?} failed - {}", path, err);
|
bail!("Atomic rename file {:?} failed - {}", path, err);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue