src/server/command_socket.rs: correctly handle/spawn handle parallel connections
This commit is contained in:
		| @ -18,7 +18,6 @@ pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), | ||||
|           F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>, | ||||
| { | ||||
|     let path: PathBuf = path.into(); | ||||
|     let path1: PathBuf = path.clone(); | ||||
|  | ||||
|     let socket = UnixListener::bind(&path)?; | ||||
|  | ||||
| @ -35,29 +34,34 @@ pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), | ||||
|             let path = path3.clone(); | ||||
|             let path2 = path3.clone(); | ||||
|  | ||||
|             tokio::io::lines(std::io::BufReader::new(rx)) | ||||
|                 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) | ||||
|                 .and_then(move |cmd| { | ||||
|                     let res = try_block!({ | ||||
|                         let param = match cmd.parse::<Value>() { | ||||
|                             Ok(p) => p, | ||||
|                             Err(err) => bail!("unable to parse json value - {}", err), | ||||
|             let abort_future = super::last_worker_future().map_err(|_| {}); | ||||
|  | ||||
|             tokio::spawn( | ||||
|                 tokio::io::lines(std::io::BufReader::new(rx)) | ||||
|                     .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) | ||||
|                     .and_then(move |cmd| { | ||||
|                         let res = try_block!({ | ||||
|                             let param = match cmd.parse::<Value>() { | ||||
|                                 Ok(p) => p, | ||||
|                                 Err(err) => bail!("unable to parse json value - {}", err), | ||||
|                             }; | ||||
|  | ||||
|                             f1(param) | ||||
|                         }); | ||||
|  | ||||
|                         let resp = match res { | ||||
|                             Ok(v) => format!("OK: {}\n", v), | ||||
|                             Err(err) => format!("ERROR: {}\n", err), | ||||
|                         }; | ||||
|  | ||||
|                         f1(param) | ||||
|                     }); | ||||
|  | ||||
|                     let resp = match res { | ||||
|                         Ok(v) => format!("OK: {}\n", v), | ||||
|                         Err(err) => format!("ERROR: {}\n", err), | ||||
|                     }; | ||||
|                     Ok(resp) | ||||
|                 }) | ||||
|                 .for_each(move |resp| { | ||||
|                     tx.write_all(resp.as_bytes()) | ||||
|                         .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) | ||||
|                 }) | ||||
|  | ||||
|                         Ok(resp) | ||||
|                     }) | ||||
|                     .for_each(move |resp| { | ||||
|                         tx.write_all(resp.as_bytes()) | ||||
|                             .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) | ||||
|                     }) | ||||
|                     .select(abort_future) | ||||
|                     .then(move |_| { Ok(()) }) | ||||
|             ) | ||||
|         }); | ||||
|  | ||||
|     let abort_future = super::last_worker_future().map_err(|_| {}); | ||||
|  | ||||
		Reference in New Issue
	
	Block a user