diff --git a/Cargo.toml b/Cargo.toml index 2dae2905..10a07e32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ path = "src/lib.rs" [dependencies] proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1" } -proxmox-protocol = { path = "proxmox-protocol" } log = "0.4" syslog = "4.0" failure = "0.1" diff --git a/proxmox-protocol/Cargo.toml b/proxmox-protocol/Cargo.toml deleted file mode 100644 index 5f995479..00000000 --- a/proxmox-protocol/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "proxmox-protocol" -version = "0.1.0" -authors = [ - "Dietmar Maurer ", - "Wolfgang Bumiller ", -] -edition = "2018" - -[lib] -crate-type = ['lib', 'cdylib'] - -[dependencies] -chrono = "0.4" -endian_trait = { version = "0.6", features = ["arrays"] } -errno = "0.2" -failure = "0.1" -libc = "0.2" -openssl = "0.10" -serde_json = "1.0" -url = "1.7" diff --git a/proxmox-protocol/proxmox-protocol.h b/proxmox-protocol/proxmox-protocol.h deleted file mode 100644 index ef4d4c38..00000000 --- a/proxmox-protocol/proxmox-protocol.h +++ /dev/null @@ -1,100 +0,0 @@ -#pragma once - -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -typedef int64_t proxmox_backup_read_cb(void *opaque, void *buffer, uint64_t size); -typedef int64_t proxmox_backup_write_cb(void *opaque, const void *buffer, uint64_t size); -typedef void proxmox_backup_drop_cb(void *opaque); - -typedef struct ProxmoxBackup ProxmoxBackup; - -extern ProxmoxBackup *proxmox_backup_new( - void *opaque, - proxmox_backup_read_cb *read_cb, - proxmox_backup_write_cb *write_cb, - proxmox_backup_drop_cb *drop_cb); - -extern void proxmox_backup_done(ProxmoxBackup *self); - -extern void proxmox_backup_clear_err(ProxmoxBackup *self); -extern const char* proxmox_backup_get_error(const ProxmoxBackup *self); - -extern bool proxmox_backup_is_eof(const ProxmoxBackup *self); - -extern int proxmox_backup_wait_for_handshake(ProxmoxBackup *self); - -extern int proxmox_backup_query_hashes(ProxmoxBackup *self, const char *file_name); -extern int proxmox_backup_wait_for_hashes(ProxmoxBackup *self); - -extern bool proxmox_backup_is_chunk_available(ProxmoxBackup *self, const void *digest); -extern int proxmox_backup_upload_chunk( - ProxmoxBackup *self, - const void *digest, - const void *data, - uint64_t size); -extern int proxmox_backup_continue_upload(ProxmoxBackup *self); - -extern int proxmox_backup_poll_read(ProxmoxBackup *self); -extern int proxmox_backup_poll_send(ProxmoxBackup *self); - -extern int proxmox_backup_wait_for_id(ProxmoxBackup *self, int id); -extern int proxmox_backup_discard_id(ProxmoxBackup *self, int id); - -extern int proxmox_backup_create( - ProxmoxBackup *self, - bool dynamic, - const char *backup_type, - const char *backup_id, - int64_t time_epoch, - const char *file_name, - size_t chunk_size, - int64_t file_size, - bool is_new); - -extern int proxmox_backup_dynamic_data( - ProxmoxBackup *self, - int stream, - const void *digest, - uint64_t size); - -extern int proxmox_backup_fixed_data( - ProxmoxBackup *self, - int stream, - size_t index, - const void *digest); - -extern int proxmox_backup_finish_backup( - ProxmoxBackup *self, - int stream, - char **remote_path); - -typedef struct ProxmoxChunker ProxmoxChunker; -extern ProxmoxChunker *proxmox_chunker_new(uint64_t chunk_size_avg); -extern void proxmox_chunker_done(ProxmoxChunker *self); -extern uint64_t proxmox_chunker_scan(ProxmoxChunker *self, const void *data, size_t size); - -extern void proxmox_chunk_digest(const void *data, size_t size, uint8_t (*digest)[32]); - -typedef struct ProxmoxConnector ProxmoxConnector; -extern ProxmoxConnector *proxmox_connector_new( - const char *user, - const char *server, - const char *store); -extern void proxmox_connector_drop(ProxmoxConnector *self); -extern int proxmox_connector_set_password(ProxmoxConnector *self, const char *password); -extern int proxmox_connector_set_ticket( - ProxmoxConnector *self, - const char *ticket, - const char *token); -extern void proxmox_connector_set_certificate_validation(ProxmoxConnector *self, bool on); -extern ProxmoxBackup *proxmox_connector_connect(ProxmoxConnector *self); - -#ifdef __cplusplus -} -#endif diff --git a/proxmox-protocol/src/c_chunker.rs b/proxmox-protocol/src/c_chunker.rs deleted file mode 100644 index c24730eb..00000000 --- a/proxmox-protocol/src/c_chunker.rs +++ /dev/null @@ -1,49 +0,0 @@ -//! C API for the Chunker. - -use std::os::raw::c_void; - -use libc::size_t; - -use crate::Chunker; - -/// Creates a new chunker instance. -#[no_mangle] -pub extern "C" fn proxmox_chunker_new(chunk_size_avg: size_t) -> *mut Chunker { - Box::leak(Box::new(Chunker::new(chunk_size_avg as usize))) -} - -/// Drops an instance of a chunker. The pointer must be valid or `NULL`. -#[no_mangle] -pub extern "C" fn proxmox_chunker_done(me: *mut Chunker) { - if !me.is_null() { - unsafe { - Box::from_raw(me); - } - } -} - -/// Scan the specified data for a chunk border. Returns 0 if none was found, or a positive offset -/// to a border. -#[no_mangle] -pub extern "C" fn proxmox_chunker_scan( - me: *mut Chunker, - data: *const c_void, - size: size_t, -) -> size_t { - let me = unsafe { &mut *me }; - me.scan(unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }) as size_t -} - -/// Compute a chunk digest. This is mostly a convenience method to avoid having to lookup the right -/// digest method for your language of choice. -#[no_mangle] -pub extern "C" fn proxmox_chunk_digest( - data: *const c_void, - size: size_t, - out_digest: *mut [u8; 32], -) { - let digest = crate::FixedChunk::from_data(unsafe { - std::slice::from_raw_parts(data as *const u8, size as usize) - }); - unsafe { *out_digest = digest.0 }; -} diff --git a/proxmox-protocol/src/c_client.rs b/proxmox-protocol/src/c_client.rs deleted file mode 100644 index ee0dc656..00000000 --- a/proxmox-protocol/src/c_client.rs +++ /dev/null @@ -1,519 +0,0 @@ -//! For the C API we need to provide a `Client` compatible with C. In rust `Client` takes a -//! `T: Read + Write`, so we need to provide a way for C to provide callbacks to us to -//! implement this. - -use std::ffi::{CStr, CString}; -use std::io::{self, Read, Write}; -use std::os::raw::{c_char, c_int, c_void}; - -use failure::{bail, format_err, Error}; -use libc::size_t; - -/// Read callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`, -/// the rest are the usual read function parameters. This should return the number of bytes -/// actually read, zero on EOF, or a negative `errno` value on error (eg. `-EAGAIN`). -pub type ReadFn = extern "C" fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64; - -/// Write callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`, -/// the rest are the usual write function parameters. This should return the number of bytes -/// actually written, or a negative `errno` value on error (eg. `-EAGAIN`). -pub type WriteFn = extern "C" fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64; - -/// Optional drop callback. This is called when the Client gets destroyed and allows freeing -/// resources associated with the opaque object behind the C API socket. -pub type DropFn = extern "C" fn(opaque: *mut c_void); - -/// Stores the external C callbacks for communicating with the protocol socket. -pub struct CApiSocket { - opaque: *mut c_void, - read: ReadFn, - write: WriteFn, - drop: Option, -} - -impl CApiSocket { - fn from_io(stream: T) -> Self { - let opaque = Box::leak(Box::new(stream)); - Self { - opaque: opaque as *mut T as _, - read: c_read_fn::, - write: c_write_fn::, - drop: Some(c_drop_fn::), - } - } -} - -/// A client instance using C callbacks for reading from and writing to the protocol socket. -pub struct CClient { - client: crate::Client, - error: Option, - upload: Option<(*const u8, usize)>, -} - -impl CClient { - fn set_error(&mut self, err: Error) -> c_int { - self.error = Some(match CString::new(err.to_string()) { - Ok(cs) => cs, - Err(_) => CString::new("").unwrap(), - }); - -1 - } - - #[inline(always)] - fn bool_result(&mut self, res: Result) -> c_int { - match res { - Ok(false) => 0, - Ok(true) => 1, - Err(e) => self.set_error(e), - } - } - - #[inline(always)] - fn bool_call(&mut self, func: F) -> c_int - where - F: FnOnce(&mut crate::Client) -> Result, - { - let res = func(&mut self.client); - self.bool_result(res) - } - - #[inline(always)] - fn int_call(&mut self, func: F) -> c_int - where - F: FnOnce(&mut crate::Client) -> Result, - { - match func(&mut self.client) { - Ok(v) => v, - Err(e) => self.set_error(e.into()), - } - } -} - -impl Read for CApiSocket { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let rc = (self.read)(self.opaque, buf.as_mut_ptr(), buf.len() as u64); - if rc < 0 { - Err(io::Error::from_raw_os_error((-rc) as i32)) - } else { - Ok(rc as usize) - } - } -} - -impl Write for CApiSocket { - fn write(&mut self, buf: &[u8]) -> io::Result { - let rc = (self.write)(self.opaque, buf.as_ptr(), buf.len() as u64); - if rc < 0 { - Err(io::Error::from_raw_os_error((-rc) as i32)) - } else { - Ok(rc as usize) - } - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Drop for CApiSocket { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.opaque); - } - } -} - -extern "C" fn c_read_fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64 { - let stream = unsafe { &mut *(opaque as *mut T) }; - let buf = unsafe { std::slice::from_raw_parts_mut(buf, size as usize) }; - - match stream.read(buf) { - Ok(size) => size as i64, - Err(err) => { - match err.raw_os_error() { - Some(err) => -(err as i64), - None => { - eprintln!("error reading from stream: {}", err); - -libc::EIO as i64 - } - } - }, - } -} - -extern "C" fn c_write_fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64 { - let stream = unsafe { &mut *(opaque as *mut T) }; - let buf = unsafe { std::slice::from_raw_parts(buf, size as usize) }; - - match stream.write(buf) { - Ok(size) => size as i64, - Err(err) => { - match err.raw_os_error() { - Some(err) => -(err as i64), - None => { - eprintln!("error writing to stream: {}", err); - -libc::EIO as i64 - } - } - }, - } -} - -extern "C" fn c_drop_fn(opaque: *mut c_void) { - unsafe { - Box::from_raw(opaque as *mut T); - } -} - -pub(crate) fn make_c_compatible_client(stream: T) -> crate::Client { - crate::Client::new(CApiSocket::from_io(stream)) -} - -pub(crate) fn make_c_client(client: crate::Client) -> *mut CClient { - Box::leak(Box::new(CClient { - client, - error: None, - upload: None, - })) -} - -/// Creates a new instance of a backup protocol client. -/// -/// # Arguments -/// -/// * `opaque` - An opaque pointer passed to the two provided callback methods. -/// * `read` - The read callback. -/// * `write` - The write callback. -#[no_mangle] -pub extern "C" fn proxmox_backup_new( - opaque: *mut c_void, - read: ReadFn, - write: WriteFn, - drop: Option, -) -> *mut CClient { - make_c_client(crate::Client::new(CApiSocket { - opaque, - read, - write, - drop, - })) -} - -/// Drops an instance of a backup protocol client. The pointer must be valid or `NULL`. -#[no_mangle] -pub extern "C" fn proxmox_backup_done(me: *mut CClient) { - if !me.is_null() { - unsafe { - Box::from_raw(me); - } - } -} - -/// Returns a C String describing the last error or `NULL` if there was none. -#[no_mangle] -pub extern "C" fn proxmox_backup_get_error(me: *const CClient) -> *const c_char { - let me = unsafe { &*me }; - match me.error { - Some(ref e) => e.as_ptr(), - None => std::ptr::null(), - } -} - -/// Returns true if the `read` callback had previously returned `EOF`. -#[no_mangle] -pub extern "C" fn proxmox_backup_is_eof(me: *const CClient) -> bool { - let me = unsafe { &*me }; - me.client.eof() -} - -/// The data polling methods usually pass errors from the callbacks through to the original caller. -/// Since the protocol needs to be non-blocking-IO safe and therefore able to resumine at any point -/// where `-EAGAIN` can be returned by the callbacks, it is up to the caller which errors are to be -/// considered fatal, but any error returned by callbacks which is not `-EAGAIN` will result in an -/// internal error flag to be set which has to be cleared before trying to resume normal -/// operations. -#[no_mangle] -pub extern "C" fn proxmox_backup_clear_err(me: *mut CClient) { - let me = unsafe { &mut *me }; - me.client.clear_err(); - me.error = None; -} - -/// Polls for data and checks whether the protocol handshake has been made successfully. -/// Returns `1` if the handshake was successful, `0` if it is not yet complete or `-1` on error. -#[no_mangle] -pub extern "C" fn proxmox_backup_wait_for_handshake(me: *mut CClient) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |c| c.wait_for_handshake()) -} - -fn check_string(s: *const c_char) -> Result<&'static str, Error> { - if s.is_null() { - bail!("NULL string"); - } - Ok(std::str::from_utf8(unsafe { - CStr::from_ptr(s).to_bytes() - })?) -} - -/// Request the list of hashes for a backup file in order to prevent duplicates from being sent to -/// the server. This simply causes an internal list to be filled. Only one such operation can be -/// performed simultaneously. To wait for its completion see `proxmox_backup_wait_for_hashes`. -/// -/// If the file name is `NULL` or not a valid UTF-8 string, this function returns an error without -/// putting the protocol handler in an error state. -/// -/// Returns `0` on success, `-1` otherwise. -#[no_mangle] -pub extern "C" fn proxmox_backup_query_hashes(me: *mut CClient, file_name: *const c_char) -> c_int { - let me = unsafe { &mut *me }; - - me.int_call(move |client| { - let file_name = check_string(file_name)?; - client.query_hashes(file_name)?; - Ok(0) - }) -} - -/// If there is an ongoing hash list request, this will poll the data stream. -/// -/// Returns `1` if the transfer is complete (or there was no transfer to begin with), `0` if it is -/// incomplete, or `-1` if an error occurred. -#[no_mangle] -pub extern "C" fn proxmox_backup_wait_for_hashes(me: *mut CClient) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |c| c.wait_for_hashes()) -} - -/// Check if a chunk of the provided digest is known to the this client instance. Note that this -/// does not query the server for this information, and is only useful after a call to -/// `proxmox_backup_query_hashes` or after uploading something. -#[no_mangle] -pub extern "C" fn proxmox_backup_is_chunk_available(me: *const CClient, digest: *const u8) -> bool { - let me = unsafe { &*me }; - let digest = unsafe { &*(digest as *const [u8; 32]) }; - me.client.is_chunk_available(digest) -} - -/// Begin uploading a chunk to the server. This attempts to upload the data right away, but if the -/// writer may fail due to non-blocking I/O in which case the `proxmox_backup_continue_upload` -/// function must be used. -/// -/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately, -/// or `-1` on error. -/// -/// The ID returned on success can be used to wait for the server to acknowledge that the chunk has -/// been written successfully. Use `proxmox_backup_wait_for_id` to do this. If confirmation is not -/// required, the ID should be released via `proxmox_backup_discard_id`. -#[no_mangle] -pub extern "C" fn proxmox_backup_upload_chunk( - me: *mut CClient, - digest: *const u8, - data: *const u8, - size: u64, -) -> c_int { - let me = unsafe { &mut *me }; - let digest: &[u8; 32] = unsafe { &*(digest as *const [u8; 32]) }; - let size = size as usize; - let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, size) }; - match me.client.upload_chunk(digest, slice) { - Ok(Some(id)) => id.0 as c_int, - Ok(None) => { - me.upload = Some((data, size)); - 0 - } - Err(e) => me.set_error(e), - } -} - -/// If an upload did not finish immediately (`proxmox_backup_upload_chunk` returned `0`), this -/// function must be used to retry sending the rest of the data. -/// -/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately, -/// or `-1` on error. -#[no_mangle] -pub extern "C" fn proxmox_backup_continue_upload(me: *mut CClient) -> c_int { - let me = unsafe { &mut *me }; - match me.upload { - Some((data, len)) => { - let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, len) }; - match me.client.continue_upload_chunk(slice) { - Ok(Some(id)) => id.0 as c_int, - Ok(None) => 0, - Err(e) => me.set_error(e), - } - } - None => me.set_error(format_err!("no upload currently running")), - } -} - -/// Run the main receive loop. Returns `0` on success, `-1` on error. -#[no_mangle] -pub extern "C" fn proxmox_backup_poll_read(me: *mut CClient) -> c_int { - let me = unsafe { &mut *me }; - match me.client.poll_read(false) { - Ok(_) => 0, - Err(e) => me.set_error(e), - } -} - -/// Run the main send loop. If the `write` callback returned `-EAGAIN`, during an operation, the -/// protocol handler keeps the data to be sent in a write queue. This function will attempt to -/// continue writing out the remaining data. See individual function descriptions for when this is -/// necessary. -/// -/// Returns `1` if the queue is now empty, `0` if there is still data in the queue, or `-1` on -/// error. -#[no_mangle] -pub extern "C" fn proxmox_backup_poll_send(me: *mut CClient) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |c| Ok(c.poll_send()?.unwrap_or(true))) -} - -/// Run the main receive loop and check for confirmation of a stream with the specified ID. -/// -/// Returns `1` if the transaction was confirmed, `0` if not, or `-1` on error. -/// -/// Note that once this function returned `1` for an ID, the id is considered to be free for -/// recycling and should not be used for further calls. -#[no_mangle] -pub extern "C" fn proxmox_backup_wait_for_id(me: *mut CClient, id: c_int) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |c| c.wait_for_id(crate::StreamId(id as u8))) -} - -/// Notifies the protocol handler that we do not bother waiting for confirmation of an ID. The ID -/// may immediately be recycled for future transactions, thus the user should not use it for any -/// further function calls. -/// -/// Returns `0` on success, `-1` on error. -#[no_mangle] -pub extern "C" fn proxmox_backup_discard_id(me: *mut CClient, id: c_int) -> c_int { - let me = unsafe { &mut *me }; - match me.client.discard_id(crate::StreamId(id as u8)) { - Ok(_) => 0, - Err(e) => me.set_error(e), - } -} - -/// Create a new backup. The returned ID should be waited upon via `proxmox_backup_wait_for_id`, -/// which returns true once the server confirmed the creation of the backup. -#[no_mangle] -pub extern "C" fn proxmox_backup_create( - me: *mut CClient, - dynamic: bool, - backup_type: *const c_char, // "host", "ct", "vm" - backup_id: *const c_char, - time_epoch: i64, - file_name: *const c_char, - chunk_size: size_t, - file_size: i64, - is_new: bool, -) -> c_int { - let me = unsafe { &mut *me }; - me.int_call(move |client| { - let index_type = match dynamic { - false => crate::IndexType::Fixed, - _ => crate::IndexType::Dynamic, - }; - - let backup_type = check_string(backup_type)?; - let backup_id = check_string(backup_id)?; - let file_name = check_string(file_name)?; - - Ok(client - .create_backup( - index_type, - backup_type, - backup_id, - time_epoch, - file_name, - chunk_size as _, - if file_size < 0 { - None - } else { - Some(file_size as u64) - }, - is_new, - )? - .0 as c_int) - }) -} - -/// Send a dynamic chunk entry. -/// -/// If the entry was sent out successfully this returns `1`. If the `write` callback returned -/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send` -/// should be used to continue sending the data. -/// On error `-1` is returned. -#[no_mangle] -pub extern "C" fn proxmox_backup_dynamic_data( - me: *mut CClient, - stream: c_int, - digest: *const [u8; 32], - size: u64, -) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |client| { - client.dynamic_data(crate::BackupStream(stream as u8), unsafe { &*digest }, size) - }) -} - -/// Send a fixed chunk entry. -/// -/// If the entry was sent out successfully this returns `1`. If the `write` callback returned -/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send` -/// should be used to continue sending the data. -/// On error `-1` is returned. -#[no_mangle] -pub extern "C" fn proxmox_backup_fixed_data( - me: *mut CClient, - stream: c_int, - index: size_t, - digest: *const [u8; 32], -) -> c_int { - let me = unsafe { &mut *me }; - me.bool_call(move |client| { - client.fixed_data(crate::BackupStream(stream as u8), index as usize, unsafe { - &*digest - }) - }) -} - -/// Finish a running backup. -/// -/// Tells the server that the backup is supposed to be considered complete. If the request could be -/// sent out entirely `1` is returned. If the underlying socket is non-blocking and the packet -/// wasn't finished `0` is returned, after which `proxmox_backup_poll_send` should be used. -/// -/// Once the request was sent out successfully, the client should wait for acknowledgement by the -/// remote server via `proxmox_backup_wait_for_id`, passing the backup stream ID as parameter. -/// -/// Finally, if the client wishes to know the exact name the server stored the file under, the -/// `remote_path` parameter can be non-`NULL` to receive a string containing the file name, which -/// must be freed by the caller! -/// -/// Returns: `1` on success, possibly `0` for non-blocking I/O, `-1` on error. -#[no_mangle] -pub extern "C" fn proxmox_backup_finish_backup( - me: *mut CClient, - stream: c_int, - remote_path: *mut *mut c_char, -) -> c_int { - let me = unsafe { &mut *me }; - me.int_call(move |client| { - let (path, ack) = client.finish_backup(crate::BackupStream(stream as u8))?; - - if !remote_path.is_null() { - // would be shorter with the unstable map_or_else - let cstr = CString::new(path) - .map(|cs| cs.into_raw()) - .unwrap_or(std::ptr::null_mut()); - unsafe { - *remote_path = cstr; - } - } - - Ok(if ack { 1 } else { 0 }) - }) -} diff --git a/proxmox-protocol/src/c_connector.rs b/proxmox-protocol/src/c_connector.rs deleted file mode 100644 index bb82d0f8..00000000 --- a/proxmox-protocol/src/c_connector.rs +++ /dev/null @@ -1,145 +0,0 @@ -//! C API for the `Connector`. - -use std::ffi::CStr; -use std::os::raw::{c_char, c_int}; - -use crate::Connector; - -#[inline(always)] -fn with_errno(err: c_int, value: T) -> T { - errno::set_errno(errno::Errno(err)); - value -} - -#[inline] -fn checkstr(ptr: *const c_char) -> Option { - if ptr.is_null() { - return None; - } - - let cstr = unsafe { CStr::from_ptr(ptr) }; - match cstr.to_str() { - Ok(s) => Some(s.to_string()), - Err(_) => None, - } -} - -#[inline(always)] -fn wrap_buildcall(me: *mut Connector, func: F) -where - F: FnOnce(Connector) -> Connector, -{ - let me = unsafe { &mut *me }; - let moved_me = std::mem::replace(me, unsafe { std::mem::uninitialized() }); - std::mem::forget(std::mem::replace(me, func(moved_me))); -} - -/// Create a connector object. -/// -/// Returns a valid pointer or `NULL` on error, with `errno` set. -/// -/// Errors: -/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes. -#[no_mangle] -pub extern "C" fn proxmox_connector_new( - user: *const c_char, - server: *const c_char, - store: *const c_char, -) -> *mut Connector { - let (user, server, store) = match (checkstr(user), checkstr(server), checkstr(store)) { - (Some(user), Some(server), Some(store)) => (user, server, store), - _ => return with_errno(libc::EINVAL, std::ptr::null_mut()), - }; - - Box::leak(Box::new(Connector::new(user, server, store))) -} - -/// If a connector is not required anymore and has not been used up via a call to -/// `proxmox_connector_connect`, this can be used to free the associated resources. -#[no_mangle] -pub extern "C" fn proxmox_connector_drop(me: *mut Connector) { - unsafe { Box::from_raw(me) }; -} - -/// Use a password -/// -/// Returns `0` on success, a negative `errno` value on error. -/// -/// Errors: -/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes. -#[no_mangle] -pub extern "C" fn proxmox_connector_set_password( - me: *mut Connector, - password: *const c_char, -) -> c_int { - let password = match checkstr(password) { - Some(pw) => pw, - _ => return -libc::EINVAL, - }; - - wrap_buildcall(me, move |me| me.password(password)); - - 0 -} - -/// Use an existing ticket. -/// -/// Returns `0` on success, a negative `errno` value on error. -/// -/// Errors: -/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes. -#[no_mangle] -pub extern "C" fn proxmox_connector_set_ticket( - me: *mut Connector, - ticket: *const c_char, - token: *const c_char, -) -> c_int { - let (ticket, token) = match (checkstr(ticket), checkstr(token)) { - (Some(ticket), Some(token)) => (ticket, token), - _ => return -libc::EINVAL, - }; - - wrap_buildcall(me, move |me| me.ticket(ticket, token)); - - 0 -} - -/// Change whether certificate validation should be used on the connector. -#[no_mangle] -pub extern "C" fn proxmox_connector_set_certificate_validation(me: *mut Connector, on: bool) { - let me = unsafe { &mut *me }; - - wrap_buildcall(me, move |me| me.certificate_validation(on)); -} - -/// Initiate the connection. This consumes the Connector, invalidating the pointer to it! -/// -/// Returns a `ProxmoxBackup*`, or `NULL` on error. -#[no_mangle] -pub extern "C" fn proxmox_connector_connect( - me: *mut Connector, -) -> *mut crate::c_client::CClient { - let boxed = unsafe { Box::from_raw(me) }; - let me = *boxed; - match me.do_connect() { - Ok(stream) => { - let mut client = crate::c_client::make_c_compatible_client(stream); - match client.wait_for_handshake() { - Ok(true) => crate::c_client::make_c_client(client), - Ok(false) => { - // This is a synchronous blocking connection, so this should be impossible: - eprintln!("proxmox backup protocol error handshake did not complete?"); - std::ptr::null_mut() - } - Err(err) => { - eprintln!("error during handshake with backup server: {}", err); - std::ptr::null_mut() - } - } - } - Err(err) => { - eprintln!("error connecting to backup server: {}", err); - std::ptr::null_mut() - } - } -} diff --git a/proxmox-protocol/src/chunk_stream.rs b/proxmox-protocol/src/chunk_stream.rs deleted file mode 100644 index 4502b90b..00000000 --- a/proxmox-protocol/src/chunk_stream.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::io::Read; - -use failure::Error; - -use crate::Chunker; - -pub struct ChunkStream { - input: T, - buffer: Vec, - fill: usize, - pos: usize, - keep: bool, - eof: bool, - chunker: Chunker, -} - -impl ChunkStream { - pub fn new(input: T) -> Self { - Self { - input, - buffer: Vec::new(), - fill: 0, - pos: 0, - keep: false, - eof: false, - chunker: Chunker::new(4 * 1024 * 1024), - } - } - - pub fn stream(&mut self) -> &mut Self { - self - } - - fn fill_buf(&mut self) -> Result { - if self.fill == self.buffer.len() { - let mut more = self.buffer.len(); // just double it - if more == 0 { - more = 1024 * 1024; // at the start, make a 1M buffer - } - // we need more data: - self.buffer.reserve(more); - unsafe { - self.buffer.set_len(self.buffer.capacity()); - } - } - - match self.input.read(&mut self.buffer[self.fill..]) { - Ok(more) => { - if more == 0 { - self.eof = true; - } - self.fill += more; - Ok(true) - } - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Ok(false) - } else { - Err(err.into()) - } - } - } - } - - fn consume(&mut self) { - assert!(self.fill >= self.pos); - - let remaining = self.fill - self.pos; - unsafe { - std::ptr::copy_nonoverlapping( - &self.buffer[self.pos] as *const u8, - self.buffer.as_mut_ptr(), - remaining, - ); - } - self.fill = remaining; - self.pos = 0; - } - - pub fn next(&mut self) { - self.keep = false; - } - - // This crate should not depend on the futures create, so we use another Option instead of - // Async. - pub fn get(&mut self) -> Result>, Error> { - if self.keep { - return Ok(Some(Some(&self.buffer[0..self.pos]))); - } - - if self.eof { - return Ok(Some(None)); - } - - if self.pos != 0 { - self.consume(); - } - - loop { - match self.fill_buf() { - Ok(true) => (), - Ok(false) => return Ok(None), - Err(err) => return Err(err), - } - - // Note that if we hit EOF we hit a hard boundary... - let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]); - if boundary == 0 && !self.eof { - self.pos = self.fill; - continue; - } - - self.pos += boundary; - self.keep = true; - return Ok(Some(Some(&self.buffer[0..self.pos]))); - } - } -} diff --git a/proxmox-protocol/src/client.rs b/proxmox-protocol/src/client.rs deleted file mode 100644 index 86cc6c07..00000000 --- a/proxmox-protocol/src/client.rs +++ /dev/null @@ -1,603 +0,0 @@ -use std::borrow::Borrow; -use std::collections::hash_map; -use std::collections::{HashMap, HashSet}; -use std::io::{Read, Write}; -use std::mem; -use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; - -use endian_trait::Endian; -use failure::*; - -use crate::common; -use crate::protocol::*; -use crate::tools::swapped_data_to_buf; -use crate::{ChunkEntry, FixedChunk, IndexType}; - -#[derive(Clone, Copy, Eq, PartialEq)] -#[repr(transparent)] -pub struct BackupStream(pub(crate) u8); - -#[derive(Clone, Copy, Eq, PartialEq)] -#[repr(transparent)] -pub struct StreamId(pub(crate) u8); - -impl From for StreamId { - fn from(v: BackupStream) -> Self { - Self(v.0) - } -} - -struct BackupStreamData { - id: u8, - index_type: IndexType, - pos: u64, - path: Option, -} - -pub enum AckState { - Waiting, // no ack received yet. - Received, // already received an ack, but the user hasn't seen it yet. - Ignore, // client doesn't care. - AwaitingData, // waiting for something other than an 'Ok' packet -} - -pub struct Client -where - S: Read + Write, -{ - chunks: RwLock>, - common: common::Connection, - handshake_done: bool, - - cur_id: u8, - free_ids: Vec, - waiting_ids: HashMap, - hash_download: Option, - - upload_chunk: Option, - upload_id: u8, - upload_pos: usize, - upload_state: u8, - - streams: HashMap, -} - -type Result = std::result::Result; - -impl Client -where - S: Read + Write, -{ - pub fn new(socket: S) -> Self { - Self { - chunks: RwLock::new(HashSet::new()), - common: common::Connection::new(socket), - handshake_done: false, - - cur_id: 1, - free_ids: Vec::new(), - waiting_ids: HashMap::new(), - hash_download: None, - - upload_state: 0, - upload_pos: 0, - upload_id: 0, - upload_chunk: None, - - streams: HashMap::new(), - } - } - - pub fn eof(&self) -> bool { - self.common.eof - } - - pub fn error(&self) -> bool { - self.common.error - } - - /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. - pub fn clear_err(&mut self) { - self.common.clear_err() - } - - pub fn wait_for_handshake(&mut self) -> Result { - if !self.handshake_done { - self.poll_read(true)?; - } - Ok(self.handshake_done) - } - - pub fn query_hashes(&mut self, file_name: &str) -> Result<()> { - if self.hash_download.is_some() { - bail!("hash query already in progress"); - } - - let id = self.next_id()?; - let mut packet = Packet::builder(id, PacketType::GetHashList); - packet - .write_data(client::GetHashList { - name_length: file_name.len() as u16, - }) - .write_buf(file_name.as_bytes()); - self.common.queue_data(packet.finish())?; - self.hash_download = Some(id); - Ok(()) - } - - pub fn wait_for_hashes(&mut self) -> Result { - while self.hash_download.is_some() { - if !self.poll_read(true)? { - break; - } - } - Ok(self.hash_download.is_none()) - } - - fn chunk_read_lock(&self) -> Result>> { - self.chunks - .read() - .map_err(|_| format_err!("lock poisoned, disconnecting client...")) - } - - pub fn is_chunk_available>(&self, chunk: &T) -> bool { - self.chunk_read_lock().unwrap().contains(chunk.borrow()) - } - - /// Attempts to upload a chunk. Returns an error state only on fatal errors. If the underlying - /// writer returns a `WouldBlock` error this returns `None` and `continue_upload_chunk` has to - /// be called until the chunk is uploaded completely. During this time no other operations - /// should be performed on the this object! - /// See `continue_upload_chunk()` for a description of the returned value. - pub fn upload_chunk(&mut self, info: &T, data: &[u8]) -> Result> - where - T: Borrow, - { - if self.upload_chunk.is_some() { - bail!("cannot concurrently upload multiple chunks"); - } - - self.upload_id = self.next_id()?; - self.upload_chunk = Some(info.borrow().clone()); - self.next_upload_state(0); - self.continue_upload_chunk(data) - } - - fn next_upload_state(&mut self, state: u8) { - self.upload_state = state; - self.upload_pos = 0; - } - - // This is split into a static method not borrowing self so the buffer can point into self... - fn do_upload_write( - writer: &mut common::Connection, - buf: &[u8], - ) -> Result> { - match writer.write_some(buf) { - Ok(put) => Ok(Some((put, put == buf.len()))), - Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - Ok(None) - } else { - Err(e.into()) - } - } - } - } - - fn after_upload_write(&mut self, put: Option<(usize, bool)>, next_state: u8) -> bool { - match put { - None => return false, - Some((put, done)) => { - if done { - self.next_upload_state(next_state); - } else { - self.upload_pos += put; - } - done - } - } - } - - fn upload_write(&mut self, buf: &[u8], next_state: u8) -> Result { - let wrote = Self::do_upload_write(&mut self.common, buf)?; - Ok(self.after_upload_write(wrote, next_state)) - } - - /// If an `upload_chunk()` call returned `Ok(false)` this needs to be used to complete the - /// upload process as the chunk may have already been partially written to the socket. - /// This function will return `Ok(false)` on `WouldBlock` errors just like `upload_chunk()` - /// will, after which the caller should wait for the writer to become write-ready and then - /// call this method again. - /// Once the complete chunk packet has been written to the underlying socket, this returns a - /// packet ID which can be waited upon for completion - pub fn continue_upload_chunk(&mut self, data: &[u8]) -> Result> { - loop { - match self.upload_state { - // Writing the packet header: - 0 => { - let len = mem::size_of::() - + mem::size_of::() - + data.len(); - let packet = Packet { - id: self.upload_id, - pkttype: PacketType::UploadChunk as _, - length: len as _, - } - .to_le(); - let buf = unsafe { swapped_data_to_buf(&packet) }; - if !self.upload_write(&buf[self.upload_pos..], 1)? { - return Ok(None); - } - } - // Writing the hash: - 1 => { - let chunk = self.upload_chunk.as_ref().unwrap(); - let buf = &chunk.0[self.upload_pos..]; - let wrote = Self::do_upload_write(&mut self.common, buf)?; - if !self.after_upload_write(wrote, 2) { - return Ok(None); - } - } - // Writing the data: - 2 => { - if !self.upload_write(&data[self.upload_pos..], 3)? { - return Ok(None); - } - } - // Done: - 3 => { - self.upload_chunk = None; - self.expect_ok_for_id(self.upload_id); - return Ok(Some(StreamId(self.upload_id))); - } - n => bail!("bad chunk upload state: {}", n), - } - } - } - - // generic data polling method, returns true if at least one packet was received - pub fn poll_read(&mut self, one: bool) -> Result { - if self.common.eof { - // polls after EOF are errors: - bail!("server disconnected"); - } - - if !self.common.poll_read()? { - // On the client side we do not expect a server-side disconnect, so error out! - if self.common.eof { - bail!("server disconnected"); - } - return Ok(false); - } - - loop { - match self.common.current_packet_type { - PacketType::Ok => self.recv_ok()?, - PacketType::Hello => self.recv_hello()?, - PacketType::HashListPart => self.recv_hash_list()?, - PacketType::BackupCreated => self.backup_created()?, - _ => bail!( - "server sent an unexpected packet of type {}", - self.common.current_packet_type as u32, - ), - } - self.common.next()?; - if one || !self.common.poll_read()? { - break; - } - } - Ok(true) - } - - // None => nothing was queued - // Some(true) => queue finished - // Some(false) => queue not finished - pub fn poll_send(&mut self) -> Result> { - self.common.poll_send() - } - - // private helpermethods - - fn next_id(&mut self) -> Result { - if let Some(id) = self.free_ids.pop() { - return Ok(id); - } - if self.cur_id < 0xff { - self.cur_id += 1; - return Ok(self.cur_id - 1); - } - bail!("too many concurrent transactions"); - } - - fn free_id(&mut self, id: u8) { - self.free_ids.push(id); - } - - fn expect_ok_for_id(&mut self, id: u8) { - self.waiting_ids.insert(id, AckState::Waiting); - } - - fn expect_response_for_id(&mut self, id: u8) { - self.waiting_ids.insert(id, AckState::AwaitingData); - } - - // Methods handling received packets: - - fn recv_hello(&mut self) -> Result<()> { - let hello = self.common.read_unaligned_data::(0)?; - - if hello.magic != server::HELLO_MAGIC { - bail!("received an invalid hello packet"); - } - - let ver = hello.version; - if ver != server::PROTOCOL_VERSION { - bail!( - "hello packet contained incompatible protocol version: {} (!= {})", - ver, - server::PROTOCOL_VERSION, - ); - } - - self.handshake_done = true; - self.free_id(0); - Ok(()) - } - - fn chunk_write_lock(&self) -> Result>> { - self.chunks - .write() - .map_err(|_| format_err!("lock poisoned, disconnecting client...")) - } - - fn recv_hash_list(&mut self) -> Result<()> { - let data = self.common.packet_data(); - if data.len() == 0 { - // No more hashes, we're done - self.hash_download = None; - return Ok(()); - } - - if (data.len() % mem::size_of::()) != 0 { - bail!("hash list contains invalid size"); - } - - let chunk_list: &[FixedChunk] = unsafe { - std::slice::from_raw_parts( - data.as_ptr() as *const FixedChunk, - data.len() / mem::size_of::(), - ) - }; - - let mut my_chunks = self.chunk_write_lock()?; - for c in chunk_list { - eprintln!("Got chunk '{}'", c.digest_to_hex()); - my_chunks.insert(c.clone()); - } - - Ok(()) - } - - fn ack_id(&mut self, id: u8, data_packet: bool) -> Result<()> { - use hash_map::Entry::*; - - match self.waiting_ids.entry(id) { - Vacant(_) => bail!("received unexpected packet for transaction id {}", id), - Occupied(mut entry) => match entry.get() { - AckState::Ignore => { - entry.remove(); - } - AckState::Received => bail!("duplicate Ack received for transaction id {}", id), - AckState::Waiting => { - if data_packet { - bail!("received data packet while expecting simple Ok for {}", id); - } - *entry.get_mut() = AckState::Received; - } - AckState::AwaitingData => { - if !data_packet { - bail!( - "received empty Ok while waiting for data on stream id {}", - id - ); - } - *entry.get_mut() = AckState::Received; - } - }, - } - Ok(()) - } - - fn recv_ok(&mut self) -> Result<()> { - self.ack_id(self.common.current_packet.id, false) - } - - pub fn wait_for_id(&mut self, id: StreamId) -> Result { - if !self.waiting_ids.contains_key(&id.0) { - bail!("wait_for_id() called on unexpected id {}", id.0); - } - - loop { - if !self.poll_read(true)? { - return Ok(false); - } - - use hash_map::Entry::*; - match self.waiting_ids.entry(id.0) { - Vacant(_) => return Ok(true), - Occupied(entry) => match entry.get() { - AckState::Received => { - entry.remove(); - return Ok(true); - } - _ => continue, - }, - } - } - } - - pub fn discard_id(&mut self, id: StreamId) -> Result<()> { - use hash_map::Entry::*; - match self.waiting_ids.entry(id.0) { - Vacant(_) => bail!("discard_id called with unknown id {}", id.0), - Occupied(mut entry) => match entry.get() { - AckState::Ignore => (), - AckState::Received => { - entry.remove(); - } - AckState::Waiting | AckState::AwaitingData => { - *entry.get_mut() = AckState::Ignore; - } - }, - } - Ok(()) - } - - pub fn create_backup( - &mut self, - index_type: IndexType, - backup_type: &str, - id: &str, - timestamp: i64, - file_name: &str, - chunk_size: usize, - file_size: Option, - is_new: bool, - ) -> Result { - let backup_type = backup_type::name_to_id(backup_type)?; - - if id.len() > 0xff { - bail!("id too long"); - } - - if file_name.len() > 0xff { - bail!("file name too long"); - } - - let mut flags: backup_flags::Type = 0; - if is_new { - flags |= backup_flags::EXCL; - } - if index_type == IndexType::Dynamic { - flags |= backup_flags::DYNAMIC_CHUNKS; - if file_size.is_some() { - bail!("file size must be None on dynamic backup streams"); - } - } else if file_size.is_none() { - bail!("file size is mandatory for fixed backup streams"); - } - - let packet_id = self.next_id()?; - let mut packet = Packet::builder(packet_id, PacketType::CreateBackup); - packet - .write_data(client::CreateBackup { - backup_type, - id_length: id.len() as _, - timestamp: timestamp as u64, - flags, - name_length: file_name.len() as _, - chunk_size: chunk_size as _, - file_size: file_size.unwrap_or(0) as u64, - }) - .write_buf(id.as_bytes()) - .write_buf(file_name.as_bytes()); - - self.streams.insert( - packet_id, - BackupStreamData { - id: packet_id, - index_type, - pos: 0, - path: None, - }, - ); - - self.expect_response_for_id(packet_id); - self.common.queue_data(packet.finish())?; - Ok(BackupStream(packet_id)) - } - - fn backup_created(&mut self) -> Result<()> { - let info = self - .common - .read_unaligned_data::(0)?; - let data = &self.common.packet_data()[mem::size_of_val(&info)..]; - if data.len() != info.path_length as usize { - bail!("backup-created packet has invalid length"); - } - let name = std::str::from_utf8(data)?; - let pkt_id = self.common.current_packet.id; - self.streams - .get_mut(&pkt_id) - .ok_or_else(|| format_err!("BackupCreated response for invalid stream: {}", pkt_id))? - .path = Some(name.to_string()); - self.ack_id(pkt_id, true)?; - Ok(()) - } - - pub fn dynamic_chunk(&mut self, stream: BackupStream, entry: &ChunkEntry) -> Result { - self.dynamic_data(stream, &entry.hash, entry.size) - } - - pub fn dynamic_data>( - &mut self, - stream: BackupStream, - digest: &T, - size: u64, - ) -> Result { - let data = self - .streams - .get_mut(&stream.0) - .ok_or_else(|| format_err!("no such active backup stream"))?; - - if data.index_type != IndexType::Dynamic { - bail!("dynamic_data called for stream of static chunks"); - } - - let mut packet = Packet::builder(data.id, PacketType::BackupDataDynamic); - packet - .write_data(data.pos as u64) - .write_buf(&digest.borrow().0); - data.pos += size; - - self.common.queue_data(packet.finish()) - } - - pub fn fixed_data>( - &mut self, - stream: BackupStream, - index: usize, - digest: &T, - ) -> Result { - let data = self - .streams - .get_mut(&stream.0) - .ok_or_else(|| format_err!("no such active backup stream"))?; - - if data.index_type != IndexType::Fixed { - bail!("fixed_data called for stream of dynamic chunks"); - } - - let mut packet = Packet::builder(data.id, PacketType::BackupDataFixed); - packet - .write_data(index as u64) - .write_buf(&digest.borrow().0); - - self.common.queue_data(packet.finish()) - } - - pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(String, bool)> { - let path = self - .streams - .remove(&stream.0) - .ok_or_else(|| format_err!("no such active backup stream"))? - .path - .unwrap_or_else(|| "".to_string()); - let done = self - .common - .queue_data(Packet::simple(stream.0, PacketType::BackupFinished))?; - self.expect_ok_for_id(stream.0); - Ok((path, done)) - } -} diff --git a/proxmox-protocol/src/common.rs b/proxmox-protocol/src/common.rs deleted file mode 100644 index 3c7012b4..00000000 --- a/proxmox-protocol/src/common.rs +++ /dev/null @@ -1,274 +0,0 @@ -use std::io::{self, Read, Write}; -use std::mem; -use std::ptr; - -use failure::*; - -use endian_trait::Endian; - -use crate::protocol::*; - -type Result = std::result::Result; - -pub(crate) struct Connection -where - S: Read + Write, -{ - socket: S, - pub buffer: Vec, - pub current_packet: Packet, - pub current_packet_type: PacketType, - pub error: bool, - pub eof: bool, - upload_queue: Option<(Vec, usize)>, -} - -impl Connection -where - S: Read + Write, -{ - pub fn new(socket: S) -> Self { - Self { - socket, - buffer: Vec::new(), - current_packet: unsafe { mem::zeroed() }, - current_packet_type: PacketType::Error, - error: false, - eof: false, - upload_queue: None, - } - } - - pub fn write_some(&mut self, buf: &[u8]) -> std::io::Result { - self.socket.write(buf) - } - - /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. - pub fn clear_err(&mut self) { - self.error = false; - } - - // None => nothing was queued - // Some(true) => queue finished - // Some(false) => queue not finished - pub fn poll_send(&mut self) -> Result> { - if let Some((ref data, ref mut pos)) = self.upload_queue { - loop { - match self.socket.write(&data[*pos..]) { - Ok(put) => { - *pos += put; - if *pos == data.len() { - self.upload_queue = None; - return Ok(Some(true)); - } - // Keep writing - continue; - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - return Ok(Some(false)); - } - Err(e) => return Err(e.into()), - } - } - } else { - Ok(None) - } - } - - // Returns true when the data was also sent out, false if the queue is now full. - // For now we only allow a single dataset to be queued at once. - pub fn queue_data(&mut self, buf: Vec) -> Result { - if self.upload_queue.is_some() { - bail!("upload queue clash"); - } - - self.upload_queue = Some((buf, 0)); - - match self.poll_send()? { - None => unreachable!(), // We literally just set self.upload_queue to Some(value) - Some(v) => Ok(v), - } - } - - // Returns 'true' if there's data available, 'false' if there isn't (if the - // underlying reader returned `WouldBlock` or the `read()` was short). - // Other errors are propagated. - pub fn poll_read(&mut self) -> Result { - if self.eof { - return Ok(false); - } - - if self.error { - eprintln!("refusing to read from a client in error state"); - bail!("client is in error state"); - } - - match self.poll_data_do() { - Ok(has_packet) => Ok(has_packet), - Err(e) => { - self.error = true; - Err(e) - } - } - } - - fn poll_data_do(&mut self) -> Result { - if !self.read_packet()? { - return Ok(false); - } - - if self.current_packet.length > MAX_PACKET_SIZE { - bail!("client tried to send a huge packet"); - } - - if !self.fill_packet()? { - return Ok(false); - } - - Ok(true) - } - - pub fn packet_length(&self) -> usize { - self.current_packet.length as usize - } - - pub fn packet_data(&self) -> &[u8] { - let beg = mem::size_of::(); - let end = self.packet_length(); - &self.buffer[beg..end] - } - - pub fn next(&mut self) -> Result<()> { - let pktlen = self.packet_length(); - unsafe { - if self.buffer.len() != pktlen { - std::ptr::copy_nonoverlapping( - &self.buffer[pktlen], - &mut self.buffer[0], - self.buffer.len() - pktlen, - ); - } - self.buffer.set_len(self.buffer.len() - pktlen); - } - Ok(()) - } - - // NOTE: After calling this you must `self.buffer.set_len()` when done! - #[must_use] - fn buffer_set_min_size(&mut self, size: usize) -> usize { - if self.buffer.capacity() < size { - self.buffer.reserve(size - self.buffer.len()); - } - let start = self.buffer.len(); - unsafe { - self.buffer.set_len(size); - } - start - } - - fn fill_buffer(&mut self, size: usize) -> Result { - if self.buffer.len() >= size { - return Ok(true); - } - let mut filled = self.buffer_set_min_size(size); - loop { - // We don't use read_exact to not block too long or busy-read on nonblocking sockets... - match self.socket.read(&mut self.buffer[filled..]) { - Ok(got) => { - if got == 0 { - self.eof = true; - unsafe { - self.buffer.set_len(filled); - } - return Ok(false); - } - filled += got; - if filled >= size { - unsafe { - self.buffer.set_len(filled); - } - return Ok(true); - } - // reloop - } - Err(e) => { - unsafe { - self.buffer.set_len(filled); - } - return Err(e.into()); - } - } - } - } - - fn read_packet_do(&mut self) -> Result { - if !self.fill_buffer(mem::size_of::())? { - return Ok(false); - } - - self.current_packet = self.read_unaligned::(0)?.from_le(); - - self.current_packet_type = match PacketType::try_from(self.current_packet.pkttype) { - Some(t) => t, - None => bail!("unexpected packet type"), - }; - - let length = self.current_packet.length; - if (length as usize) < mem::size_of::() { - bail!("received packet of bad length ({})", length); - } - - Ok(true) - } - - fn read_packet(&mut self) -> Result { - match self.read_packet_do() { - Ok(b) => Ok(b), - Err(e) => { - if let Some(ioe) = e.downcast_ref::() { - if ioe.kind() == io::ErrorKind::WouldBlock { - return Ok(false); - } - } - Err(e) - } - } - } - - fn read_unaligned(&self, offset: usize) -> Result { - if offset + mem::size_of::() > self.buffer.len() { - bail!("buffer underrun"); - } - Ok(unsafe { ptr::read_unaligned(&self.buffer[offset] as *const _ as *const T) }.from_le()) - } - - pub fn read_unaligned_data(&self, offset: usize) -> Result { - self.read_unaligned(offset + mem::size_of::()) - } - - fn fill_packet(&mut self) -> Result { - self.fill_buffer(self.current_packet.length as usize) - } - - // convenience helpers: - - pub fn assert_size(&self, size: usize) -> Result<()> { - if self.packet_data().len() != size { - bail!( - "protocol error: invalid packet size (type {})", - self.current_packet.pkttype, - ); - } - Ok(()) - } - - pub fn assert_atleast(&self, size: usize) -> Result<()> { - if self.packet_data().len() < size { - bail!( - "protocol error: invalid packet size (type {})", - self.current_packet.pkttype, - ); - } - Ok(()) - } -} diff --git a/proxmox-protocol/src/connect.rs b/proxmox-protocol/src/connect.rs deleted file mode 100644 index 37b653b2..00000000 --- a/proxmox-protocol/src/connect.rs +++ /dev/null @@ -1,225 +0,0 @@ -//! This module provides a `Connector` used to log into a Proxmox Backup API server and connect to -//! the proxmox protocol via an HTTP Upgrade request. - -use std::io::{Read, Write}; -use std::net::TcpStream; - -use failure::{bail, format_err, Error}; -use openssl::ssl::{self, SslStream}; -use url::form_urlencoded; - -use crate::Client; - -enum Authentication { - Password(String), - Ticket(String, String), -} - -/// Connector used to log into a Proxmox Backup API server and open a backup protocol connection. -/// If successful, this will create a `Client` used to communicate over the Proxmox Backup -/// Protocol. -pub struct Connector { - user: String, - server: String, - store: String, - auth: Option, - certificate_validation: bool, -} - -fn build_login(host: &str, user: &str, pass: &str) -> Vec { - let formdata = form_urlencoded::Serializer::new(String::new()) - .append_pair("username", user) - .append_pair("password", pass) - .finish(); - - format!("\ - POST /api2/json/access/ticket HTTP/1.1\r\n\ - host: {}\r\n\ - content-length: {}\r\n\ - content-type: application/x-www-form-urlencoded\r\n\ - \r\n\ - {}", - host, - formdata.as_bytes().len(), - formdata, - ) - .into_bytes() -} - -fn build_protocol_connect(host: &str, store: &str, ticket: &str, token: &str) -> Vec { - format!("\ - GET /api2/json/admin/datastore/{}/test-upload HTTP/1.1\r\n\ - host: {}\r\n\ - connection: upgrade\r\n\ - upgrade: proxmox-backup-protocol-1\r\n\ - cookie: PBSAuthCookie={}\r\n\ - CSRFPreventionToken: {}\r\n\ - \r\n", - store, - host, - ticket, - token, - ) - .into_bytes() -} - -// Minimalistic http response reader. The only things we care about here are the status code and -// the payload... -fn read_http_response(sock: T) -> Result<(u16, Vec), Error> { - use std::io::BufRead; - let mut reader = std::io::BufReader::new(sock); - - let mut status = String::new(); - reader.read_line(&mut status)?; - - let status = status.trim_end(); - let mut parts = status.splitn(3, ' '); - let _version = parts - .next() - .ok_or_else(|| format_err!("bad http response (missing version)"))?; - let code = parts - .next() - .ok_or_else(|| format_err!("bad http response (missing status code)"))?; - let _reason = parts.next(); - - let code: u16 = code.parse()?; - - // We need the payload's length if there is one: - let mut length: Option = None; - let mut line = String::new(); - loop { - line.clear(); - reader.read_line(&mut line)?; - let line = line.trim_end(); - - if line.len() == 0 { - break; - } - - let parts: Vec<&str> = line.splitn(2, ':').collect(); - if parts.len() != 2 { - bail!("invalid header in http response"); - } - - let name = parts[0].trim().to_lowercase().to_string(); - let value = parts[1].trim(); - - // The only important header (important to know how much we need to read!) - if name == "content-length" { - length = Some(value.parse()?); - } - - // Don't care about any other header contents currently... - } - - match length { - None => Ok((code, Vec::new())), - Some(length) => { - let length = length as usize; - - let mut out = Vec::with_capacity(length); - unsafe { - out.set_len(length); - } - - reader.read_exact(&mut out)?; - Ok((code, out)) - }, - } -} - -fn parse_login_response(data: &[u8]) -> Result<(String, String), Error> { - let value: serde_json::Value = serde_json::from_slice(data)?; - let ticket = value["data"]["ticket"] - .as_str() - .ok_or_else(|| format_err!("no ticket found in login response"))? - .to_string(); - let token = value["data"]["CSRFPreventionToken"] - .as_str() - .ok_or_else(|| format_err!("no token found in login response"))? - .to_string(); - Ok((ticket, token)) -} - -impl Connector { - /// Create a new connector for a specified user, server and remote backup store. - pub fn new(user: String, server: String, store: String) -> Self { - Self { - user, - server, - store, - auth: None, - certificate_validation: true, - } - } - - /// Use a password to authenticate with the remote server. - pub fn password(mut self, pass: String) -> Self { - self.auth = Some(Authentication::Password(pass)); - self - } - - /// Use an already existing ticket to connect to the server. - pub fn ticket(mut self, ticket: String, token: String) -> Self { - self.auth = Some(Authentication::Ticket(ticket, token)); - self - } - - /// Disable TLS certificate validation. - pub fn certificate_validation(mut self, on: bool) -> Self { - self.certificate_validation = on; - self - } - - pub(crate) fn do_connect(self) -> Result, Error> { - if self.auth.is_none() { - bail!("missing authentication"); - } - - let stream = TcpStream::connect(&self.server)?; - - let mut connector = ssl::SslConnector::builder(ssl::SslMethod::tls())?; - if !self.certificate_validation { - connector.set_verify(ssl::SslVerifyMode::NONE); - } - let connector = connector.build(); - - let mut stream = connector.connect(&self.server, stream)?; - let (ticket, token) = match self.auth { - None => unreachable!(), // checked above - Some(Authentication::Password(password)) => { - let login_request = build_login(&self.server, &self.user, &password); - stream.write_all(&login_request)?; - - let (code, ticket) = read_http_response(&mut stream)?; - if code != 200 { - bail!("login failed"); - } - - parse_login_response(&ticket)? - } - Some(Authentication::Ticket(ticket, token)) => (ticket, token), - }; - - let protocol_request = build_protocol_connect(&self.server, &self.store, &ticket, &token); - stream.write_all(&protocol_request)?; - let (code, _empty_body) = read_http_response(&mut stream)?; - if code != 101 { - bail!("expected 101 Switching Protocol, received code: {}", code); - } - - Ok(stream) - } - - /// This creates creates a synchronous client (via blocking I/O), tries to authenticate with - /// the server and connect to the protocol endpoint. On success, a `Client` is returned. - pub fn connect(self) -> Result>, Error> { - let stream = self.do_connect()?; - - let mut client = Client::new(stream); - if !client.wait_for_handshake()? { - bail!("handshake failed"); - } - Ok(client) - } -} diff --git a/proxmox-protocol/src/lib.rs b/proxmox-protocol/src/lib.rs deleted file mode 100644 index d08e94c9..00000000 --- a/proxmox-protocol/src/lib.rs +++ /dev/null @@ -1,24 +0,0 @@ -pub(crate) mod common; - -pub mod protocol; -pub mod server; -pub mod tools; - -mod chunk_stream; -pub use chunk_stream::*; - -mod chunker; -pub use chunker::*; - -mod client; -pub use client::*; - -mod connect; -pub use connect::*; - -mod types; -pub use types::*; - -pub mod c_chunker; -pub mod c_client; -pub mod c_connector; diff --git a/proxmox-protocol/src/protocol.rs b/proxmox-protocol/src/protocol.rs deleted file mode 100644 index 1cd958f2..00000000 --- a/proxmox-protocol/src/protocol.rs +++ /dev/null @@ -1,281 +0,0 @@ -use std::mem; - -use endian_trait::Endian; - -// There's no reason to have more than that in a single packet... -pub const MAX_PACKET_SIZE: u32 = 16 * 1024 * 1024; - -// Each packet has a transaction ID (eg. when uploading multiple disks each -// upload is a separate stream). -#[derive(Endian)] -#[repr(C, packed)] -pub struct Packet { - pub id: u8, // request/command id - pub pkttype: u8, // packet type - pub length: u32, // data length before the next packet - - // content is attached directly afterwards -} - -impl Packet { - pub fn builder(id: u8, pkttype: PacketType) -> PacketBuilder { - PacketBuilder::new(id, pkttype) - } - - pub fn simple(id: u8, pkttype: PacketType) -> Vec { - Self::builder(id, pkttype).finish() - } -} - -#[derive(Endian, Clone, Copy)] -#[repr(u8)] -pub enum ErrorId { - Generic, - Busy, -} - -#[repr(u8)] -#[derive(Clone, Copy)] -pub enum PacketType { - /// First packet sent by the server. - Hello, - - /// Generic acknowledgement. - Ok, - - /// Error packet sent by the server, this cancels the request for which it is produced. - Error, - - /// The client wants the list of available hashes in order to know which ones require an - /// upload. - /// - /// The body should contain a backup file name of which to retrieve the hash list. - /// - /// Server responds with a sequence of ``HashListPart`` packets. - GetHashList, - - /// Array of hashes. The number of hashes in a packet is calculated from the packet length as - /// provided in the ``Packet`` struct. An empty packet indicates the end of the list. We send a - /// sequence of such packets because we don't know whether the server will be keeping the list - /// in memory yet, so it might not know the number in advance and may be iterating through - /// directories until it hits an end. It can produce the network packets asynchronously while - /// walking the chunk dir. - HashListPart, - - /// Client requests to download chunks via a hash list from the server. The number of chunks - /// can be derived from the length of this request, so it works similar to ``HashListPart``, - /// but there's only 1 chunk list per request ID. - /// - /// The server responds with a sequence of ``Chunk`` packets or ``Error``. - DownloadChunks, - - /// The response to ``DownloadChunks``. One packet per requested chunk. - Chunk, - - /// The upload of a chunk can happen independent from the ongoing backup - /// streams. Server responds with an ``OK``. - UploadChunk, - - /// Create a file in a new or existing backup. Contains all the metadata of - /// a file. - /// - /// The server responds with ``BackupCreated`` or ``Error``. On ``BackupCreated`` the client - /// may proceed to send as many ``BackupData...`` packets as necessary to fill the file. - /// The sequence is finished by the client with a ``BackupFinished``. - CreateBackup, - - /// Successful from the server to a client's ``CreateBackup`` packet. Contains the server side - /// path relative to the store. - BackupCreated, - - /// This packet contains an array of references to fixed sized chunks. Clients should upload - /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent - /// chunk is an error. - /// - /// The server produces an ``Error`` packet in case of an error. - BackupDataFixed, - - /// This packet contains an array of references to dynamic sized chunks. Clients should upload - /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent - /// chunk is an error. - /// - /// The server produces an ``Error`` packet in case of an error. - BackupDataDynamic, - - /// This ends a backup file. The server responds with an ``OK`` or an ``Error`` packet. - BackupFinished, -} - -// Nightly has a std::convert::TryFrom, actually... -impl PacketType { - pub fn try_from(v: u8) -> Option { - if v <= PacketType::BackupFinished as u8 { - Some(unsafe { std::mem::transmute(v) }) - } else { - None - } - } -} - -// Not using bitflags! for Endian derive... -pub mod backup_flags { - pub type Type = u8; - /// The backup must not exist yet. - pub const EXCL: Type = 0x00000001; - /// The data represents a raw file - pub const RAW: Type = 0x00000002; - /// The data uses dynamically sized chunks (catar file) - pub const DYNAMIC_CHUNKS: Type = 0x00000004; -} - -pub mod backup_type { - pub type Type = u8; - pub const VM: Type = 0; - pub const CT: Type = 1; - pub const HOST: Type = 2; - - use failure::{bail, Error}; - pub fn id_to_name(id: Type) -> Result<&'static str, Error> { - Ok(match id { - VM => "vm", - CT => "ct", - HOST => "host", - n => bail!("unknown backup type id: {}", n), - }) - } - - pub fn name_to_id(id: &str) -> Result { - Ok(match id { - "vm" => VM, - "ct" => CT, - "host" => HOST, - n => bail!("unknown backup type name: {}", n), - }) - } -} - -#[repr(C, packed)] -#[derive(Endian)] -pub struct DynamicChunk { - pub offset: u64, - pub digest: [u8; 32], -} - -pub mod server { - use endian_trait::Endian; - - pub const PROTOCOL_VERSION: u32 = 1; - - pub const HELLO_MAGIC: [u8; 8] = *b"PMXBCKUP"; - - pub const HELLO_VERSION: u32 = 1; // the current version - #[derive(Endian)] - #[repr(C, packed)] - pub struct Hello { - pub magic: [u8; 8], - pub version: u32, - } - - #[derive(Endian)] - #[repr(C, packed)] - pub struct Error { - pub id: u8, - } - - #[derive(Endian)] - #[repr(C, packed)] - pub struct Chunk { - pub hash: super::DynamicChunk, - // Data follows here... - } - - #[derive(Endian)] - #[repr(C, packed)] - pub struct BackupCreated { - pub path_length: u16, - // path follows here - } -} - -pub mod client { - use endian_trait::Endian; - - #[derive(Endian)] - #[repr(C, packed)] - pub struct UploadChunk { - pub hash: crate::FixedChunk, - } - - #[derive(Endian)] - #[repr(C, packed)] - pub struct CreateBackup { - pub backup_type: super::backup_type::Type, - pub id_length: u8, // length of the ID string - pub timestamp: u64, // seconds since the epoch - pub flags: super::backup_flags::Type, - pub name_length: u8, // file name length - pub chunk_size: u32, // average or "fixed" chunk size - pub file_size: u64, // size for fixed size files (must be 0 if DYNAMIC_CHUNKS is set) - - // ``id_length`` bytes of ID follow - // ``name_length`` bytes of file name follow - // Further packets contain the data or chunks - } - - #[derive(Endian)] - #[repr(C, packed)] - pub struct GetHashList { - pub name_length: u16, - // name follows as payload - } -} - -pub struct PacketBuilder { - data: Vec, -} - -impl PacketBuilder { - pub fn new(id: u8, pkttype: PacketType) -> Self { - let data = Vec::with_capacity(mem::size_of::()); - let mut me = Self { data }; - me.write_data( - Packet { - id, - pkttype: pkttype as _, - length: 0, - } - .to_le(), - ); - me - } - - pub fn reserve(&mut self, more: usize) -> &mut Self { - self.data.reserve(more); - self - } - - pub fn write_buf(&mut self, buf: &[u8]) -> &mut Self { - self.data.extend_from_slice(buf); - self - } - - pub fn write_data(&mut self, data: T) -> &mut Self { - self.write_data_noswap(&data.to_le()) - } - - pub fn write_data_noswap(&mut self, data: &T) -> &mut Self { - self.write_buf(unsafe { - std::slice::from_raw_parts(data as *const T as *const u8, mem::size_of::()) - }) - } - - pub fn finish(mut self) -> Vec { - let length = self.data.len(); - assert!(length >= mem::size_of::()); - unsafe { - let head = self.data.as_mut_ptr() as *mut Packet; - std::ptr::write_unaligned((&mut (*head).length) as *mut u32, (length as u32).to_le()); - } - self.data - } -} diff --git a/proxmox-protocol/src/server.rs b/proxmox-protocol/src/server.rs deleted file mode 100644 index 20e84173..00000000 --- a/proxmox-protocol/src/server.rs +++ /dev/null @@ -1,466 +0,0 @@ -use std::collections::hash_map::{self, HashMap}; -use std::io::{Read, Write}; -use std::{mem, ptr}; - -use failure::*; - -use endian_trait::Endian; - -use crate::common; -use crate::protocol::*; -use crate::ChunkEntry; -use crate::FixedChunk; - -type Result = std::result::Result; - -pub trait ChunkList: Send { - fn next(&mut self) -> Result>; -} - -/// This provides callbacks used by a `Connection` when it receives a packet. -pub trait HandleClient { - /// The protocol handler will call this when the client produces an irrecoverable error. - fn error(&self); - - /// The client wants the list of hashes, the provider should provide an iterator over chunk - /// entries. - fn get_chunk_list(&self, backup_name: &str) -> Result>; - - /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the - /// chunk was new. - fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result; - - /// The client wants to create a backup. Since multiple backup streams can happen in parallel, - /// this should return a handler used to create the individual streams. - /// The handler will be informed about success via the ``finish()`` method. - fn create_backup( - &self, - backup_type: &str, - id: &str, - timestamp: i64, - new: bool, - ) -> Result>; -} - -/// A single backup may contain multiple files. Currently we represent this via a hierarchy where -/// the `HandleBackup` trait is instantiated for each backup, which is responsible for -/// instantiating the `BackupFile` trait objects. -pub trait HandleBackup { - /// All individual streams for this backup have been successfully finished. - fn finish(&mut self) -> Result<()>; - - /// Create a specific file in this backup. - fn create_file( - &self, - name: &str, - fixed_size: Option, - chunk_size: usize, - ) -> Result>; -} - -/// This handles backup files created by calling `create_file` on a `Backup`. -pub trait BackupFile { - /// Backup use the server-local timestamp formatting, so we want to be able to tell the client - /// the real remote path: - fn relative_path(&self) -> &str; - - /// The client wants to add a chunk to a fixed index file at a certain position. - fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>; - - /// The client wants to add a chunks to a dynamic index file. - fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>; - - /// This notifies the handler that the backup has finished successfully. This should commit the - /// data to the store for good. After this the client will receive an "ok". - /// - /// If the Drop handler gets called before this method, the backup was aborted due to an error - /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be - /// performed. - fn finish(&mut self) -> Result<()>; -} - -#[derive(Clone, Eq, Hash, PartialEq)] -struct BackupId(backup_type::Type, String, i64); - -/// Associates a socket with the server side of the backup protocol. -/// The communcation channel should be `Read + Write` and may be non-blocking (provided it -/// correctly returns `io::ErrorKind::WouldBlock`). -/// The handler has to implement the `HandleClient` trait to provide callbacks used while -/// communicating with the client. -pub struct Connection -where - S: Read + Write, - H: HandleClient, -{ - handler: H, - common: common::Connection, - - // states: - - // If this is set we are currently transferring our hash list to the client: - hash_list: Option<( - u8, // data stream ID - Box, - )>, - - // currently active 'backups' (handlers for a specific BackupDir) - backups: HashMap>, - - // currently active backup *file* streams - backup_files: HashMap>, -} - -impl Connection -where - S: Read + Write, - H: HandleClient, -{ - pub fn new(socket: S, handler: H) -> Result { - let mut me = Self { - handler, - common: common::Connection::new(socket), - hash_list: None, - backups: HashMap::new(), - backup_files: HashMap::new(), - }; - - me.send_hello()?; - Ok(me) - } - - fn send_hello(&mut self) -> Result<()> { - let mut packet = Packet::builder(0, PacketType::Hello); - packet.write_data(server::Hello { - magic: server::HELLO_MAGIC, - version: server::PROTOCOL_VERSION, - }); - self.common.queue_data(packet.finish())?; - Ok(()) - } - - pub fn eof(&self) -> bool { - self.common.eof - } - - /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. - pub fn clear_err(&mut self) { - self.common.clear_err() - } - - pub fn main(&mut self) -> Result<()> { - self.poll_read()?; - self.poll_send()?; - Ok(()) - } - - // If this returns an error it is considered fatal and the connection should be dropped! - fn poll_read(&mut self) -> Result<()> { - if self.common.eof { - // polls after EOF are errors: - bail!("client disconnected"); - } - - if !self.common.poll_read()? { - // No data available - if self.common.eof { - bail!("client disconnected"); - } - return Ok(()); - } - - // we received a packet, handle it: - - loop { - use PacketType::*; - match self.common.current_packet_type { - GetHashList => self.hash_list_requested()?, - UploadChunk => self.receive_chunk()?, - CreateBackup => self.create_backup()?, - BackupDataDynamic => self.backup_data_dynamic()?, - BackupDataFixed => self.backup_data_fixed()?, - BackupFinished => self.backup_finished()?, - _ => bail!( - "client sent an unexpected packet of type {}", - self.common.current_packet_type as u32, - ), - }; - self.common.next()?; - if !self.common.poll_read()? { - break; - } - } - - Ok(()) - } - - fn poll_send(&mut self) -> Result<()> { - if self.common.error { - eprintln!("refusing to send datato client in error state"); - bail!("client is in error state"); - } - - if let Some(false) = self.common.poll_send()? { - // send queue is not finished, don't add anything else... - return Ok(()); - } - - // Queue has either finished or was empty, see if we should enqueue more data: - if self.hash_list.is_some() { - return self.send_hash_list(); - } - Ok(()) - } - - fn hash_list_requested(&mut self) -> Result<()> { - // Verify protocol: GetHashList is an empty packet. - let request = self.common.read_unaligned_data::(0)?; - self.common - .assert_size(mem::size_of_val(&request) + request.name_length as usize)?; - let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..]; - let name = std::str::from_utf8(name_bytes)?; - - // We support only one active hash list stream: - if self.hash_list.is_some() { - return self.respond_error(ErrorId::Busy); - } - - self.hash_list = Some(( - self.common.current_packet.id, - self.handler.get_chunk_list(name)?, - )); - - Ok(()) - } - - fn send_hash_list(&mut self) -> Result<()> { - loop { - let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap(); - - let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::()) - / mem::size_of::(); - - let mut packet = Packet::builder(*stream_id, PacketType::HashListPart); - packet.reserve(mem::size_of::() * max_chunks_per_packet); - - let mut count = 0; - for _ in 0..max_chunks_per_packet { - let entry: &[u8; 32] = match hash_iter.next() { - Ok(Some(entry)) => entry, - Ok(None) => break, - Err(e) => { - eprintln!("error sending chunk list to client: {}", e); - continue; - } - }; - - packet.write_buf(entry); - count += 1; - } - - let can_send_more = self.common.queue_data(packet.finish())?; - - if count == 0 { - // We just sent the EOF packet, clear our iterator state! - self.hash_list = None; - break; - } - - if !can_send_more { - break; - } - } - Ok(()) - } - - fn respond_error(&mut self, kind: ErrorId) -> Result<()> { - self.respond_value(PacketType::Error, kind)?; - Ok(()) - } - - fn respond_value(&mut self, pkttype: PacketType, data: T) -> Result<()> { - let mut packet = Packet::builder(self.common.current_packet.id, pkttype); - packet.write_data(data); - self.common.queue_data(packet.finish())?; - Ok(()) - } - - fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> { - self.common - .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?; - Ok(()) - } - - fn respond_ok(&mut self) -> Result<()> { - self.respond_empty(PacketType::Ok) - } - - fn receive_chunk(&mut self) -> Result<()> { - self.common - .assert_atleast(mem::size_of::())?; - let data = self.common.packet_data(); - let (hash, data) = data.split_at(mem::size_of::()); - if data.len() == 0 { - bail!("received an empty chunk"); - } - let entry = ChunkEntry::from_data(data); - if entry.hash != hash { - let cli_hash = crate::tools::digest_to_hex(hash); - let data_hash = entry.digest_to_hex(); - bail!( - "client claimed data with digest {} has digest {}", - data_hash, - cli_hash - ); - } - let _new = self.handler.upload_chunk(&entry, data)?; - self.respond_ok() - } - - fn create_backup(&mut self) -> Result<()> { - if self - .backup_files - .contains_key(&self.common.current_packet.id) - { - bail!("stream id already in use..."); - } - - let create_msg = self.common.read_unaligned_data::(0)?; - - // simple data: - let flags = create_msg.flags; - let backup_type = create_msg.backup_type; - let time = create_msg.timestamp as i64; - - // text comes from the payload data after the CreateBackup struct: - let data = self.common.packet_data(); - let payload = &data[mem::size_of_val(&create_msg)..]; - - // there must be exactly the ID and the file name in the payload: - let id_len = create_msg.id_length as usize; - let name_len = create_msg.name_length as usize; - let expected_len = id_len + name_len; - if payload.len() < expected_len { - bail!("client sent incomplete CreateBackup request"); - } else if payload.len() > expected_len { - bail!("client sent excess data in CreateBackup request"); - } - - // id and file name must be utf8: - let id = std::str::from_utf8(&payload[0..id_len]) - .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?; - let file_name = std::str::from_utf8(&payload[id_len..]) - .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?; - - // Sanity check dynamic vs fixed: - let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0; - let file_size = match (is_dynamic, create_msg.file_size) { - (false, size) => Some(size), - (true, 0) => None, - (true, _) => bail!("file size of dynamic streams must be zero"), - }; - - // search or create the handler: - let hashmap_id = BackupId(backup_type, id.to_string(), time); - let handle = match self.backups.entry(hashmap_id) { - hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup( - backup_type::id_to_name(backup_type)?, - id, - time, - (flags & backup_flags::EXCL) != 0, - )?), - hash_map::Entry::Occupied(entry) => entry.into_mut(), - }; - let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?; - - let mut response = - Packet::builder(self.common.current_packet.id, PacketType::BackupCreated); - let path = file.relative_path(); - if path.len() > 0xffff { - bail!("path too long"); - } - response - .write_data(server::BackupCreated { - path_length: path.len() as _, - }) - .write_buf(path.as_bytes()); - self.common.queue_data(response.finish())?; - - self.backup_files - .insert(self.common.current_packet.id, file); - - Ok(()) - } - - fn backup_data_dynamic(&mut self) -> Result<()> { - let stream_id = self.common.current_packet.id; - let file = self - .backup_files - .get_mut(&stream_id) - .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; - - let mut data = self.common.packet_data(); - // Data consists of (offset: u64, hash: [u8; 32]) - let entry_len = mem::size_of::(); - - while data.len() >= entry_len { - let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) }; - data = &data[entry_len..]; - - entry.offset = entry.offset.from_le(); - file.add_dynamic_data(&entry)?; - } - - if data.len() != 0 { - bail!( - "client sent excess data ({} bytes) after dynamic chunk indices!", - data.len() - ); - } - - Ok(()) - } - - fn backup_data_fixed(&mut self) -> Result<()> { - let stream_id = self.common.current_packet.id; - let file = self - .backup_files - .get_mut(&stream_id) - .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?; - - let mut data = self.common.packet_data(); - // Data consists of (index: u64, hash: [u8; 32]) - #[repr(C, packed)] - struct IndexedChunk { - index: u64, - digest: FixedChunk, - } - let entry_len = mem::size_of::(); - - while data.len() >= entry_len { - let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) }; - data = &data[entry_len..]; - - entry.index = entry.index.from_le(); - file.add_fixed_data(entry.index, &entry.digest)?; - } - - if data.len() != 0 { - bail!( - "client sent excess data ({} bytes) after dynamic chunk indices!", - data.len() - ); - } - - Ok(()) - } - - fn backup_finished(&mut self) -> Result<()> { - let stream_id = self.common.current_packet.id; - let mut file = self - .backup_files - .remove(&stream_id) - .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; - file.finish()?; - self.respond_ok() - } -} diff --git a/proxmox-protocol/src/tools.rs b/proxmox-protocol/src/tools.rs deleted file mode 100644 index a614e28f..00000000 --- a/proxmox-protocol/src/tools.rs +++ /dev/null @@ -1,47 +0,0 @@ -use failure::{bail, Error}; - -pub fn digest_to_hex(digest: &[u8]) -> String { - const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef"; - - let mut buf = Vec::::with_capacity(digest.len() * 2); - - for i in 0..digest.len() { - buf.push(HEX_CHARS[(digest[i] >> 4) as usize]); - buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]); - } - - unsafe { String::from_utf8_unchecked(buf) } -} - -pub unsafe fn swapped_data_to_buf(data: &T) -> &[u8] { - std::slice::from_raw_parts(data as *const T as *const u8, std::mem::size_of::()) -} - -fn hex_nibble(c: u8) -> Result { - Ok(match c { - b'0'..=b'9' => c - b'0', - b'a'..=b'f' => c - b'a' + 0xa, - b'A'..=b'F' => c - b'A' + 0xa, - _ => bail!("not a hex digit: {}", c as char), - }) -} - -#[inline] -pub fn parse_hex_digest>(hex: T) -> Result<[u8; 32], Error> { - let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; - - let hex = hex.as_ref(); - - if hex.len() != 64 { - bail!( - "invalid hex digest ({} instead of 64 digits long)", - hex.len() - ); - } - - for i in 0..32 { - digest[i] = (hex_nibble(hex[i * 2])? << 4) + hex_nibble(hex[i * 2 + 1])?; - } - - Ok(digest) -} diff --git a/proxmox-protocol/src/types.rs b/proxmox-protocol/src/types.rs deleted file mode 100644 index 7bf759af..00000000 --- a/proxmox-protocol/src/types.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::borrow::Borrow; - -use endian_trait::Endian; -use failure::*; - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum IndexType { - Fixed, - Dynamic, -} - -#[derive(Endian, Clone, Debug, Eq, Hash, PartialEq)] -#[repr(transparent)] -pub struct FixedChunk(pub [u8; 32]); - -impl FixedChunk { - pub fn new(hash: [u8; 32]) -> Self { - Self(hash) - } - - pub fn from_hex>(hex: T) -> Result { - Ok(Self::new(crate::tools::parse_hex_digest(hex.as_ref())?)) - } - - pub fn from_data(data: &[u8]) -> Self { - let mut hasher = openssl::sha::Sha256::new(); - hasher.update(data); - Self::new(hasher.finish()) - } - - pub fn digest_to_hex(&self) -> String { - crate::tools::digest_to_hex(&self.0) - } -} - -#[derive(Endian, Clone, Copy, Debug, Hash)] -#[repr(C, packed)] -pub struct ChunkEntry { - pub hash: [u8; 32], - pub size: u64, -} - -impl ChunkEntry { - pub fn new(hash: [u8; 32], size: u64) -> Self { - Self { hash, size } - } - - pub fn from_hex>(hex: T, size: u64) -> Result { - Ok(Self::new( - crate::tools::parse_hex_digest(hex.as_ref())?, - size, - )) - } - - pub fn len(&self) -> u64 { - self.size - } - - pub fn from_data(data: &[u8]) -> Self { - let mut hasher = openssl::sha::Sha256::new(); - hasher.update(data); - Self::new(hasher.finish(), data.len() as u64) - } - - pub fn digest_to_hex(&self) -> String { - crate::tools::digest_to_hex(&self.hash) - } - - pub fn to_fixed(&self) -> FixedChunk { - FixedChunk(self.hash) - } -} - -impl PartialEq for ChunkEntry { - fn eq(&self, other: &Self) -> bool { - self.size == other.size && self.hash == other.hash - } -} - -impl Eq for ChunkEntry {} - -impl Into for ChunkEntry { - fn into(self) -> FixedChunk { - FixedChunk(self.hash) - } -} - -impl Borrow for ChunkEntry { - fn borrow(&self) -> &FixedChunk { - unsafe { std::mem::transmute(&self.hash) } - } -} - -impl Borrow for [u8; 32] { - fn borrow(&self) -> &FixedChunk { - unsafe { std::mem::transmute(self) } - } -} diff --git a/src/backup.rs b/src/backup.rs index ce6d5922..887e7361 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -134,6 +134,9 @@ pub use checksum_reader::*; mod checksum_writer; pub use checksum_writer::*; +mod chunker; +pub use chunker::*; + mod data_chunk; pub use data_chunk::*; @@ -155,8 +158,6 @@ pub use chunk_stream::*; mod chunk_stat; pub use chunk_stat::*; -pub use proxmox_protocol::Chunker; - mod read_chunk; pub use read_chunk::*; diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index a8a77e09..d542a328 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -1,10 +1,9 @@ -use failure::*; - -use proxmox_protocol::Chunker; -use futures::{Async, Poll}; -use futures::stream::Stream; - use bytes::BytesMut; +use failure::*; +use futures::stream::Stream; +use futures::{Async, Poll}; + +use super::Chunker; /// Split input stream into dynamic sized chunks pub struct ChunkStream { diff --git a/proxmox-protocol/src/chunker.rs b/src/backup/chunker.rs similarity index 59% rename from proxmox-protocol/src/chunker.rs rename to src/backup/chunker.rs index 088d6b67..4ef7d0bf 100644 --- a/proxmox-protocol/src/chunker.rs +++ b/src/backup/chunker.rs @@ -1,4 +1,3 @@ - /// Note: window size 32 or 64, is faster because we can /// speedup modulo operations, but always computes hash 0 /// for constant data streams .. 0,0,0,0,0,0 @@ -34,74 +33,41 @@ pub struct Chunker { } const BUZHASH_TABLE: [u32; 256] = [ - 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, - 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801, - 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, - 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494, - 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, - 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68, - 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, - 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93, - 0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, - 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00, - 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, - 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2, - 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, - 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9, - 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, - 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5, - 0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, - 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed, - 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, - 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f, - 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, - 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4, - 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, - 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85, - 0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, - 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6, - 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, - 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6, - 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, - 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f, - 0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, - 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2, - 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, - 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b, - 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, - 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae, - 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, - 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f, - 0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, - 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18, - 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, - 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a, - 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, - 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140, - 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, - 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9, - 0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, - 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9, - 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, - 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc, - 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, - 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b, - 0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, - 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c, - 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, - 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319, - 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, - 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb, - 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, - 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b, - 0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, - 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964, - 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, - 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11, + 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801, + 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494, + 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68, + 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93, + 0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00, + 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2, + 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9, + 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5, + 0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed, + 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f, + 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4, + 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85, + 0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6, + 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6, + 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f, + 0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2, + 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b, + 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae, + 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f, + 0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18, + 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a, + 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140, + 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9, + 0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9, + 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc, + 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b, + 0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c, + 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319, + 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb, + 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b, + 0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964, + 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11, ]; impl Chunker { - /// Create a new Chunker instance, which produces and average /// chunk size of `chunk_size_avg` (need to be a power of two). We /// allow variation from `chunk_size_avg/4` up to a maximum of @@ -122,17 +88,17 @@ impl Chunker { panic!("got unexpected chunk size - not a power of two."); } - let break_test_mask = (chunk_size_avg*2 - 1) as u32; - let break_test_minimum = break_test_mask - 2; + let break_test_mask = (chunk_size_avg * 2 - 1) as u32; + let break_test_minimum = break_test_mask - 2; Self { h: 0, window_size: 0, chunk_size: 0, - chunk_size_min: chunk_size_avg>>2, - chunk_size_max: chunk_size_avg<<2, + chunk_size_min: chunk_size_avg >> 2, + chunk_size_max: chunk_size_avg << 2, _chunk_size_avg: chunk_size_avg, - _discriminator: discriminator, + _discriminator: discriminator, break_test_mask: break_test_mask, break_test_minimum: break_test_minimum, window: [0u8; CA_CHUNKER_WINDOW_SIZE], @@ -144,14 +110,13 @@ impl Chunker { /// later on), or another value indicating the position of a /// border. pub fn scan(&mut self, data: &[u8]) -> usize { - let window_len = self.window.len(); let data_len = data.len(); let mut pos = 0; - if self.window_size < window_len { - let need = window_len - self.window_size; + if self.window_size < window_len { + let need = window_len - self.window_size; let copy_len = if need < data_len { need } else { data_len }; for _i in 0..copy_len { @@ -165,7 +130,7 @@ impl Chunker { self.chunk_size += copy_len; // return if window is still not full - if self.window_size < window_len { + if self.window_size < window_len { return 0; } } @@ -203,12 +168,15 @@ impl Chunker { } // fast implementation avoiding modulo - // #[inline(always)] + // #[inline(always)] fn shall_break(&self) -> bool { + if self.chunk_size >= self.chunk_size_max { + return true; + } - if self.chunk_size >= self.chunk_size_max { return true; } - - if self.chunk_size < self.chunk_size_min { return false; } + if self.chunk_size < self.chunk_size_min { + return false; + } //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!! @@ -233,16 +201,15 @@ impl Chunker { #[test] fn test_chunker1() { - let mut buffer = Vec::new(); - for i in 0..256*1024 { + for i in 0..(256 * 1024) { for j in 0..4 { - let byte = ((i >> (j<<3))&0xff) as u8; + let byte = ((i >> (j << 3)) & 0xff) as u8; buffer.push(byte); } } - let mut chunker = Chunker::new(64*1024); + let mut chunker = Chunker::new(64 * 1024); let mut pos = 0; let mut last = 0; @@ -252,17 +219,17 @@ fn test_chunker1() { // test1: feed single bytes while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..pos+1]); + let k = chunker.scan(&buffer[pos..pos + 1]); pos += 1; if k != 0 { let prev = last; last = pos; - chunks1.push((prev, pos-prev)); - } + chunks1.push((prev, pos - prev)); + } } chunks1.push((last, buffer.len() - last)); - let mut chunker = Chunker::new(64*1024); + let mut chunker = Chunker::new(64 * 1024); let mut pos = 0; @@ -272,7 +239,7 @@ fn test_chunker1() { if k != 0 { chunks2.push((pos, k)); pos += k; - } else { + } else { break; } } @@ -280,7 +247,6 @@ fn test_chunker1() { chunks2.push((pos, buffer.len() - pos)); if chunks1 != chunks2 { - let mut size1 = 0; for (_offset, len) in &chunks1 { size1 += len; @@ -293,14 +259,13 @@ fn test_chunker1() { } println!("Chunks2:{}\n{:?}\n", size2, chunks2); - if size1 != 256*4*1024 { + if size1 != 256 * 4 * 1024 { panic!("wrong size for chunks1"); } - if size2 != 256*4*1024 { + if size2 != 256 * 4 * 1024 { panic!("wrong size for chunks2"); } panic!("got different chunks"); } - } diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs index 6103bdd1..90c8b511 100644 --- a/src/backup/dynamic_index.rs +++ b/src/backup/dynamic_index.rs @@ -10,12 +10,12 @@ use uuid::Uuid; use proxmox::tools::io::ReadExt; use proxmox::tools::vec; -use proxmox_protocol::Chunker; +use super::Chunker; +use super::IndexFile; use super::chunk_stat::ChunkStat; use super::chunk_store::ChunkStore; use super::read_chunk::ReadChunk; -use super::IndexFile; use super::{DataChunk, DataChunkBuilder}; use crate::tools;