src/api2/admin/datastore/h2upload.rs: log to worker task
This commit is contained in:
		| @ -2,6 +2,7 @@ use failure::*; | |||||||
| use lazy_static::lazy_static; | use lazy_static::lazy_static; | ||||||
|  |  | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| use futures::*; | use futures::*; | ||||||
| use hyper::header::{HeaderValue, UPGRADE}; | use hyper::header::{HeaderValue, UPGRADE}; | ||||||
| @ -30,14 +31,13 @@ lazy_static!{ | |||||||
|  |  | ||||||
| pub struct BackupService { | pub struct BackupService { | ||||||
|     rpcenv: RestEnvironment, |     rpcenv: RestEnvironment, | ||||||
|  |     worker: Arc<WorkerTask>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl BackupService { | impl BackupService { | ||||||
|  |  | ||||||
|     fn new(rpcenv: &RpcEnvironment) -> Self { |     fn new(rpcenv: RestEnvironment, worker: Arc<WorkerTask>) -> Self { | ||||||
|         let mut rpcenv = RestEnvironment::new(rpcenv.env_type()); |         Self { rpcenv, worker } | ||||||
|         rpcenv.set_user(rpcenv.get_user()); |  | ||||||
|         Self { rpcenv } |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn handle_request(&self, req: Request<Body>) -> BoxFut { |     fn handle_request(&self, req: Request<Body>) -> BoxFut { | ||||||
| @ -53,8 +53,8 @@ impl BackupService { | |||||||
|  |  | ||||||
|         let formatter = &JSON_FORMATTER; |         let formatter = &JSON_FORMATTER; | ||||||
|  |  | ||||||
|         println!("H2 REQUEST {} {}", method, path); |         self.worker.log(format!("H2 REQUEST {} {}", method, path)); | ||||||
|         println!("H2 COMPO {:?}", components); |         self.worker.log(format!("H2 COMPO {:?}", components)); | ||||||
|  |  | ||||||
|         let mut uri_param = HashMap::new(); |         let mut uri_param = HashMap::new(); | ||||||
|  |  | ||||||
| @ -71,6 +71,23 @@ impl BackupService { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) { | ||||||
|  |  | ||||||
|  |         let status = resp.status(); | ||||||
|  |  | ||||||
|  |         if !status.is_success() { | ||||||
|  |             let reason = status.canonical_reason().unwrap_or("unknown reason"); | ||||||
|  |             let client = "unknown"; // fixme: howto get peer_addr ? | ||||||
|  |  | ||||||
|  |             let mut message = "request failed"; | ||||||
|  |             if let Some(data) = resp.extensions().get::<ErrorMessageExtension>() { | ||||||
|  |                 message = &data.0; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             worker.log(format!("{} {}: {} {}: [client {}] {}", method.as_str(), path, status.as_str(), reason, client, message)); | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Drop for  BackupService { | impl Drop for  BackupService { | ||||||
| @ -86,25 +103,26 @@ impl hyper::service::Service for BackupService { | |||||||
|     type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>; |     type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>; | ||||||
|  |  | ||||||
|     fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { |     fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { | ||||||
|         let _path = req.uri().path().to_owned(); |         let path = req.uri().path().to_owned(); | ||||||
|         let _method = req.method().clone(); |         let method = req.method().clone(); | ||||||
|  |         let worker = self.worker.clone(); | ||||||
|  |  | ||||||
|         Box::new(self.handle_request(req).then(move |result| { |         Box::new(self.handle_request(req).then(move |result| { | ||||||
|             match result { |             match result { | ||||||
|                 Ok(res) => { |                 Ok(res) => { | ||||||
|                     //log_response(method, &path, &res); |                     Self::log_response(worker, method, &path, &res); | ||||||
|                     Ok::<_, hyper::Error>(res) |                     Ok::<_, hyper::Error>(res) | ||||||
|                 } |                 } | ||||||
|                 Err(err) => { |                 Err(err) => { | ||||||
|                     if let Some(apierr) = err.downcast_ref::<HttpError>() { |                     if let Some(apierr) = err.downcast_ref::<HttpError>() { | ||||||
|                         let mut resp = Response::new(Body::from(apierr.message.clone())); |                         let mut resp = Response::new(Body::from(apierr.message.clone())); | ||||||
|                         *resp.status_mut() = apierr.code; |                         *resp.status_mut() = apierr.code; | ||||||
|                         //log_response(method, &path, &resp); |                         Self::log_response(worker, method, &path, &resp); | ||||||
|                         Ok(resp) |                         Ok(resp) | ||||||
|                     } else { |                     } else { | ||||||
|                         let mut resp = Response::new(Body::from(err.to_string())); |                         let mut resp = Response::new(Body::from(err.to_string())); | ||||||
|                         *resp.status_mut() = StatusCode::BAD_REQUEST; |                         *resp.status_mut() = StatusCode::BAD_REQUEST; | ||||||
|                         //log_response(method, &path, &resp); |                         Self::log_response(worker, method, &path, &resp); | ||||||
|                         Ok(resp) |                         Ok(resp) | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
| @ -138,9 +156,13 @@ fn upgrade_h2upload( | |||||||
|  |  | ||||||
|     let worker_id = String::from("test2workerid"); |     let worker_id = String::from("test2workerid"); | ||||||
|  |  | ||||||
|     let service = BackupService::new(rpcenv); |  | ||||||
|  |  | ||||||
|     WorkerTask::spawn("test2_download", Some(worker_id), &rpcenv.get_user().unwrap(), false, move |worker| { |     let mut rpcenv1 = RestEnvironment::new(rpcenv.env_type()); | ||||||
|  |     rpcenv1.set_user(rpcenv.get_user()); | ||||||
|  |  | ||||||
|  |     WorkerTask::spawn("test2_download", Some(worker_id), &rpcenv.get_user().unwrap(), true, move |worker| { | ||||||
|  |         let service = BackupService::new(rpcenv1, worker.clone()); | ||||||
|  |  | ||||||
|         req_body |         req_body | ||||||
|             .on_upgrade() |             .on_upgrade() | ||||||
|             .map_err(Error::from) |             .map_err(Error::from) | ||||||
| @ -200,7 +222,6 @@ fn test1_get ( | |||||||
|     _rpcenv: &mut RpcEnvironment, |     _rpcenv: &mut RpcEnvironment, | ||||||
| ) -> Result<Value, Error> { | ) -> Result<Value, Error> { | ||||||
|  |  | ||||||
|  |  | ||||||
|     Ok(Value::Null) |     Ok(Value::Null) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user