play around with async tasks

This commit is contained in:
Dietmar Maurer 2018-11-10 12:06:39 +01:00
parent 6639c14bd9
commit 805aec1572
4 changed files with 108 additions and 22 deletions

View File

@ -18,6 +18,7 @@ serde_json = "1.0.32"
serde_derive = "1.0.80" serde_derive = "1.0.80"
url = "1.7.1" url = "1.7.1"
futures = "0.1.25" futures = "0.1.25"
tokio = "0.1.11"
http = "0.1.13" http = "0.1.13"
hyper = "0.12.14" hyper = "0.12.14"
lazy_static = "1.1.0" lazy_static = "1.1.0"

View File

@ -6,8 +6,11 @@ use crate::json_schema::*;
use crate::api_info::*; use crate::api_info::*;
use serde_json::{json, Value}; use serde_json::{json, Value};
use futures::future::*;
use tokio::prelude::*;
use hyper::{Method, Body, Request, Response, Server, StatusCode};
fn test_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> { fn test_sync_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> {
println!("This is a test {}", param); println!("This is a test {}", param);
// let force: Option<bool> = Some(false); // let force: Option<bool> = Some(false);
@ -25,12 +28,30 @@ fn test_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> {
Ok(json!(null)) Ok(json!(null))
} }
fn test_async_api_handler(
param: Value,
info: &ApiMethod
) -> Box<Future<Item = Response<Body>, Error = Error> + Send> {
println!("This is a test {}", param);
let task = lazy(|| {
println!("A LAZY TASK");
let mut resp = Response::new(Body::from("A LAZY TASKs RESPONSE"));
*resp.status_mut() = StatusCode::OK;
ok(resp)
});
Box::new(task)
}
pub fn router() -> MethodInfo { pub fn router() -> MethodInfo {
let route = MethodInfo::new() let route = MethodInfo::new()
.get(ApiMethod { .get(ApiMethod {
handler: test_api_handler, handler: test_sync_api_handler,
async_handler: test_async_api_handler,
description: "This is a simple test.", description: "This is a simple test.",
parameters: parameter!{ parameters: parameter!{
force => Boolean!{ force => Boolean!{
@ -43,5 +64,3 @@ pub fn router() -> MethodInfo {
route route
} }

View File

@ -1,7 +1,9 @@
use failure::*; use failure::*;
use futures::future::*;
use crate::json_schema::*; use crate::json_schema::*;
use serde_json::{Value}; use serde_json::{Value};
use hyper::{Body, Response};
use std::collections::HashMap; use std::collections::HashMap;
@ -10,6 +12,7 @@ pub struct ApiMethod {
pub parameters: Jss, pub parameters: Jss,
pub returns: Jss, pub returns: Jss,
pub handler: fn(Value, &ApiMethod) -> Result<Value, Error>, pub handler: fn(Value, &ApiMethod) -> Result<Value, Error>,
pub async_handler: fn(Value, &ApiMethod) -> Box<Future<Item = Response<Body>, Error = Error> + Send>
} }
pub struct MethodInfo { pub struct MethodInfo {

View File

@ -10,7 +10,10 @@ use apitest::api_info::*;
use apitest::json_schema::*; use apitest::json_schema::*;
//use serde_derive::{Serialize, Deserialize}; //use serde_derive::{Serialize, Deserialize};
use serde_json::{json}; use serde_json::{json, Value};
use tokio::prelude::*;
use tokio::timer::Delay;
//use hyper::body::Payload; //use hyper::body::Payload;
use hyper::http::request::Parts; use hyper::http::request::Parts;
@ -18,7 +21,9 @@ use hyper::{Method, Body, Request, Response, Server, StatusCode};
use hyper::rt::{Future, Stream}; use hyper::rt::{Future, Stream};
use hyper::service::service_fn; use hyper::service::service_fn;
use futures::future; use futures::future::*;
use std::time::{Duration, Instant};
type BoxFut = Box<Future<Item = Response<Body>, Error = failure::Error> + Send>; type BoxFut = Box<Future<Item = Response<Body>, Error = failure::Error> + Send>;
@ -33,15 +38,15 @@ macro_rules! error_response {
macro_rules! http_error_future { macro_rules! http_error_future {
($status:ident, $msg:expr) => {{ ($status:ident, $msg:expr) => {{
let resp = error_response!($status, $msg); let resp = error_response!($status, $msg);
return Box::new(futures::future::ok(resp)); return Box::new(ok(resp));
}} }}
} }
fn handle_api_request<'a>( fn get_request_parameters_async<'a>(
info: &'a ApiMethod, info: &'a ApiMethod,
parts: Parts, parts: Parts,
req_body: Body, req_body: Body,
) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a> ) -> Box<Future<Item = Value, Error = failure::Error> + Send + 'a>
{ {
let resp = req_body.concat2() let resp = req_body.concat2()
.map_err(|err| format_err!("Promlems reading request body: {}", err)) .map_err(|err| format_err!("Promlems reading request body: {}", err))
@ -69,24 +74,81 @@ fn handle_api_request<'a>(
} }
println!("GOT PARAMS {}", params); println!("GOT PARAMS {}", params);
Ok(params)
});
Box::new(resp)
}
fn handle_async_api_request<'a>(
info: &'a ApiMethod,
parts: Parts,
req_body: Body,
) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a>
{
let params = get_request_parameters_async(info, parts, req_body);
let resp = params
.and_then(move |params| {
println!("GOT PARAMS {}", params);
/*
let when = Instant::now() + Duration::from_millis(3000);
let task = Delay::new(when).then(|_| {
println!("A LAZY TASK");
ok(())
});
tokio::spawn(task);
*/
(info.async_handler)(params, info)
});
Box::new(resp)
}
fn handle_sync_api_request<'a>(
info: &'a ApiMethod,
parts: Parts,
req_body: Body,
) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a>
{
let params = get_request_parameters_async(info, parts, req_body);
let resp = params
.and_then(move |params| {
println!("GOT PARAMS {}", params);
/*
let when = Instant::now() + Duration::from_millis(3000);
let task = Delay::new(when).then(|_| {
println!("A LAZY TASK");
ok(())
});
tokio::spawn(task);
*/
let res = (info.handler)(params, info)?; let res = (info.handler)(params, info)?;
Ok(res) Ok(res)
}).then(|result| { }).then(|result| {
match result { match result {
Ok(ref value) => { Ok(ref value) => {
let json_str = value.to_string(); let json_str = value.to_string();
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header("ContentType", "application/json") .header("ContentType", "application/json")
.body(Body::from(json_str))?) .body(Body::from(json_str))?)
}
Err(err) => Ok(error_response!(BAD_REQUEST, err.to_string()))
} }
Err(err) => Ok(error_response!(BAD_REQUEST, err.to_string())) });
}
});
Box::new(resp) Box::new(resp)
} }
@ -128,7 +190,8 @@ fn handle_request(req: Request<Body>) -> BoxFut {
// fixme: handle auth // fixme: handle auth
return handle_api_request(api_method, parts, body); //return handle_sync_api_request(api_method, parts, body);
return handle_async_api_request(api_method, parts, body);
} else { } else {
http_error_future!(NOT_FOUND, "Path not found."); http_error_future!(NOT_FOUND, "Path not found.");
@ -136,7 +199,7 @@ fn handle_request(req: Request<Body>) -> BoxFut {
} }
} }
Box::new(future::ok(Response::new(Body::from("RETURN WEB GUI\n")))) Box::new(ok(Response::new(Body::from("RETURN WEB GUI\n"))))
} }
lazy_static!{ lazy_static!{