proxmox-rest-server: OutputFormatter: add new format_data_streaming method
that takes the data in form of a `Box<dyn SerializableReturn + Send>` instead of a Value. Implement it in json and extjs formatter, by starting a thread and stream the serialized data via a `BufWriter<SenderWriter>` and use the Receiver side as a stream for the response body. Signed-off-by: Dominik Csapak <d.csapak@proxmox.com> Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		
				
					committed by
					
						 Wolfgang Bumiller
						Wolfgang Bumiller
					
				
			
			
				
	
			
			
			
						parent
						
							9c3b29bd8f
						
					
				
				
					commit
					2ef2c0fe0c
				
			| @ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] } | |||||||
| serde_json = "1.0" | serde_json = "1.0" | ||||||
| tokio = { version = "1.6", features = ["signal", "process"] } | tokio = { version = "1.6", features = ["signal", "process"] } | ||||||
| tokio-openssl = "0.6.1" | tokio-openssl = "0.6.1" | ||||||
|  | tokio-stream = "0.1.0" | ||||||
| tower-service = "0.3.0" | tower-service = "0.3.0" | ||||||
| url = "2.1" | url = "2.1" | ||||||
|  |  | ||||||
|  | |||||||
| @ -7,7 +7,7 @@ use serde_json::{json, Value}; | |||||||
| use hyper::header; | use hyper::header; | ||||||
| use hyper::{Body, Response, StatusCode}; | use hyper::{Body, Response, StatusCode}; | ||||||
|  |  | ||||||
| use proxmox_router::{HttpError, RpcEnvironment}; | use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn}; | ||||||
| use proxmox_schema::ParameterError; | use proxmox_schema::ParameterError; | ||||||
|  |  | ||||||
| /// Extension to set error message for server side logging | /// Extension to set error message for server side logging | ||||||
| @ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync { | |||||||
|     /// Transform json data into a http response |     /// Transform json data into a http response | ||||||
|     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>; |     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>; | ||||||
|  |  | ||||||
|  |     /// Transform serializable data into a streaming http response | ||||||
|  |     fn format_data_streaming( | ||||||
|  |         &self, | ||||||
|  |         data: Box<dyn SerializableReturn + Send>, | ||||||
|  |         rpcenv: &dyn RpcEnvironment, | ||||||
|  |     ) -> Result<Response<Body>, Error>; | ||||||
|  |  | ||||||
|     /// Transform errors into a http response |     /// Transform errors into a http response | ||||||
|     fn format_error(&self, err: Error) -> Response<Body>; |     fn format_error(&self, err: Error) -> Response<Body>; | ||||||
|  |  | ||||||
| @ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> { | |||||||
|     response |     response | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> { | ||||||
|  |     let response = Response::builder() | ||||||
|  |         .header( | ||||||
|  |             header::CONTENT_TYPE, | ||||||
|  |             header::HeaderValue::from_static(JSON_CONTENT_TYPE), | ||||||
|  |         ) | ||||||
|  |         .body(body)?; | ||||||
|  |     Ok(response) | ||||||
|  | } | ||||||
|  |  | ||||||
| fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { | fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { | ||||||
|     let attributes = match rpcenv.result_attrib().as_object() { |     let attributes = match rpcenv.result_attrib().as_object() { | ||||||
|         Some(attr) => attr, |         Some(attr) => attr, | ||||||
| @ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn start_data_streaming( | ||||||
|  |     value: Value, | ||||||
|  |     data: Box<dyn SerializableReturn + Send>, | ||||||
|  | ) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> { | ||||||
|  |     let (writer, reader) = tokio::sync::mpsc::channel(1); | ||||||
|  |  | ||||||
|  |     tokio::task::spawn_blocking(move || { | ||||||
|  |         let output = proxmox_async::blocking::SenderWriter::from_sender(writer); | ||||||
|  |         let mut output = std::io::BufWriter::new(output); | ||||||
|  |         let mut serializer = serde_json::Serializer::new(&mut output); | ||||||
|  |         let _ = data.sender_serialize(&mut serializer, value); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     reader | ||||||
|  | } | ||||||
|  |  | ||||||
| struct JsonFormatter(); | struct JsonFormatter(); | ||||||
|  |  | ||||||
| /// Format data as ``application/json`` | /// Format data as ``application/json`` | ||||||
| @ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter { | |||||||
|         json_data_response(result) |         json_data_response(result) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn format_data_streaming( | ||||||
|  |         &self, | ||||||
|  |         data: Box<dyn SerializableReturn + Send>, | ||||||
|  |         rpcenv: &dyn RpcEnvironment, | ||||||
|  |     ) -> Result<Response<Body>, Error> { | ||||||
|  |         let mut value = json!({}); | ||||||
|  |  | ||||||
|  |         add_result_attributes(&mut value, rpcenv); | ||||||
|  |  | ||||||
|  |         let reader = start_data_streaming(value, data); | ||||||
|  |         let stream = tokio_stream::wrappers::ReceiverStream::new(reader); | ||||||
|  |  | ||||||
|  |         json_data_response_streaming(Body::wrap_stream(stream)) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn format_error(&self, err: Error) -> Response<Body> { |     fn format_error(&self, err: Error) -> Response<Body> { | ||||||
|         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() { |         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() { | ||||||
|             let mut resp = Response::new(Body::from(apierr.message.clone())); |             let mut resp = Response::new(Body::from(apierr.message.clone())); | ||||||
| @ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter { | |||||||
|         json_data_response(result) |         json_data_response(result) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn format_data_streaming( | ||||||
|  |         &self, | ||||||
|  |         data: Box<dyn SerializableReturn + Send>, | ||||||
|  |         rpcenv: &dyn RpcEnvironment, | ||||||
|  |     ) -> Result<Response<Body>, Error> { | ||||||
|  |         let mut value = json!({ | ||||||
|  |             "success": true, | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         add_result_attributes(&mut value, rpcenv); | ||||||
|  |  | ||||||
|  |         let reader = start_data_streaming(value, data); | ||||||
|  |         let stream = tokio_stream::wrappers::ReceiverStream::new(reader); | ||||||
|  |  | ||||||
|  |         json_data_response_streaming(Body::wrap_stream(stream)) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn format_error(&self, err: Error) -> Response<Body> { |     fn format_error(&self, err: Error) -> Response<Body> { | ||||||
|         let mut errors = HashMap::new(); |         let mut errors = HashMap::new(); | ||||||
|  |  | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user