use std::net::TcpListener; use std::os::unix::io::AsRawFd; use anyhow::{bail, format_err, Error}; use futures::future::{FutureExt, TryFutureExt}; use hyper::body::Body; use hyper::http::request::Parts; use hyper::upgrade::Upgraded; use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, BufReader}; use proxmox::api::router::{Router, SubdirMap}; use proxmox::api::{ api, schema::*, ApiHandler, ApiMethod, ApiResponseFuture, Permission, RpcEnvironment, }; use proxmox::list_subdirs_api_method; use proxmox::tools::websocket::WebSocket; use proxmox::{identity, sortable}; use crate::api2::types::*; use crate::config::acl::PRIV_SYS_CONSOLE; use crate::server::WorkerTask; use crate::tools; use crate::tools::ticket::{self, Empty, Ticket}; pub mod apt; pub mod disks; pub mod dns; pub mod network; pub mod tasks; pub mod subscription; pub(crate) mod rrd; mod journal; pub(crate) mod services; mod status; mod syslog; mod time; mod report; pub const SHELL_CMD_SCHEMA: Schema = StringSchema::new("The command to run.") .format(&ApiStringFormat::Enum(&[ EnumEntry::new("login", "Login"), EnumEntry::new("upgrade", "Upgrade"), ])) .schema(); #[api( protected: true, input: { properties: { node: { schema: NODE_SCHEMA, }, cmd: { schema: SHELL_CMD_SCHEMA, optional: true, }, }, }, returns: { type: Object, description: "Object with the user, ticket, port and upid", properties: { user: { description: "", type: String, }, ticket: { description: "", type: String, }, port: { description: "", type: String, }, upid: { description: "", type: String, }, } }, access: { description: "Restricted to users on realm 'pam'", permission: &Permission::Privilege(&["system"], PRIV_SYS_CONSOLE, false), } )] /// Call termproxy and return shell ticket async fn termproxy( cmd: Option, rpcenv: &mut dyn RpcEnvironment, ) -> Result { // intentionally user only for now let auth_id: Authid = rpcenv .get_auth_id() .ok_or_else(|| format_err!("no authid available"))? .parse()?; if auth_id.is_token() { bail!("API tokens cannot access this API endpoint"); } let userid = auth_id.user(); if userid.realm() != "pam" { bail!("only pam users can use the console"); } let path = "/system"; // use port 0 and let the kernel decide which port is free let listener = TcpListener::bind("localhost:0")?; let port = listener.local_addr()?.port(); let ticket = Ticket::new(ticket::TERM_PREFIX, &Empty)? .sign( crate::auth_helpers::private_auth_key(), Some(&ticket::term_aad(&userid, &path, port)), )?; let mut command = Vec::new(); match cmd.as_ref().map(|x| x.as_str()) { Some("login") | None => { command.push("login"); if userid == "root@pam" { command.push("-f"); command.push("root"); } } Some("upgrade") => { if userid != "root@pam" { bail!("only root@pam can upgrade"); } // TODO: add nicer/safer wrapper like in PVE instead command.push("sh"); command.push("-c"); command.push("apt full-upgrade; bash -l"); } _ => bail!("invalid command"), }; let username = userid.name().to_owned(); let upid = WorkerTask::spawn( "termproxy", None, auth_id, false, move |worker| async move { // move inside the worker so that it survives and does not close the port // remove CLOEXEC from listenere so that we can reuse it in termproxy tools::fd_change_cloexec(listener.as_raw_fd(), false)?; let mut arguments: Vec<&str> = Vec::new(); let fd_string = listener.as_raw_fd().to_string(); arguments.push(&fd_string); arguments.extend_from_slice(&[ "--path", &path, "--perm", "Sys.Console", "--authport", "82", "--port-as-fd", "--", ]); arguments.extend_from_slice(&command); let mut cmd = tokio::process::Command::new("/usr/bin/termproxy"); cmd.args(&arguments) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()); let mut child = cmd.spawn().expect("error executing termproxy"); let stdout = child.stdout.take().expect("no child stdout handle"); let stderr = child.stderr.take().expect("no child stderr handle"); let worker_stdout = worker.clone(); let stdout_fut = async move { let mut reader = BufReader::new(stdout).lines(); while let Some(line) = reader.next_line().await? { worker_stdout.log(line); } Ok::<(), Error>(()) }; let worker_stderr = worker.clone(); let stderr_fut = async move { let mut reader = BufReader::new(stderr).lines(); while let Some(line) = reader.next_line().await? { worker_stderr.warn(line); } Ok::<(), Error>(()) }; let mut needs_kill = false; let res = tokio::select!{ res = child.wait() => { let exit_code = res?; if !exit_code.success() { match exit_code.code() { Some(code) => bail!("termproxy exited with {}", code), None => bail!("termproxy exited by signal"), } } Ok(()) }, res = stdout_fut => res, res = stderr_fut => res, res = worker.abort_future() => { needs_kill = true; res.map_err(Error::from) } }; if needs_kill { if res.is_ok() { child.kill().await?; return Ok(()); } if let Err(err) = child.kill().await { worker.warn(format!("error killing termproxy: {}", err)); } else if let Err(err) = child.wait().await { worker.warn(format!("error awaiting termproxy: {}", err)); } } res }, )?; // FIXME: We're returning the user NAME only? Ok(json!({ "user": username, "ticket": ticket, "port": port, "upid": upid, })) } #[sortable] pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new( &ApiHandler::AsyncHttp(&upgrade_to_websocket), &ObjectSchema::new( "Upgraded to websocket", &sorted!([ ("node", false, &NODE_SCHEMA), ( "vncticket", false, &StringSchema::new("Terminal ticket").schema() ), ("port", false, &IntegerSchema::new("Terminal port").schema()), ]), ), ) .access( Some("The user needs Sys.Console on /system."), &Permission::Privilege(&["system"], PRIV_SYS_CONSOLE, false), ); fn upgrade_to_websocket( parts: Parts, req_body: Body, param: Value, _info: &ApiMethod, rpcenv: Box, ) -> ApiResponseFuture { async move { // intentionally user only for now let auth_id: Authid = rpcenv .get_auth_id() .ok_or_else(|| format_err!("no authid available"))? .parse()?; if auth_id.is_token() { bail!("API tokens cannot access this API endpoint"); } let userid = auth_id.user(); let ticket = tools::required_string_param(¶m, "vncticket")?; let port: u16 = tools::required_integer_param(¶m, "port")? as u16; // will be checked again by termproxy Ticket::::parse(ticket)? .verify( crate::auth_helpers::public_auth_key(), ticket::TERM_PREFIX, Some(&ticket::term_aad(&userid, "/system", port)), )?; let (ws, response) = WebSocket::new(parts.headers)?; crate::server::spawn_internal_task(async move { let conn: Upgraded = match req_body.on_upgrade().map_err(Error::from).await { Ok(upgraded) => upgraded, _ => bail!("error"), }; let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?; ws.serve_connection(conn, local).await }); Ok(response) } .boxed() } pub const SUBDIRS: SubdirMap = &[ ("apt", &apt::ROUTER), ("disks", &disks::ROUTER), ("dns", &dns::ROUTER), ("journal", &journal::ROUTER), ("network", &network::ROUTER), ("report", &report::ROUTER), ("rrd", &rrd::ROUTER), ("services", &services::ROUTER), ("status", &status::ROUTER), ("subscription", &subscription::ROUTER), ("syslog", &syslog::ROUTER), ("tasks", &tasks::ROUTER), ("termproxy", &Router::new().post(&API_METHOD_TERMPROXY)), ("time", &time::ROUTER), ( "vncwebsocket", &Router::new().upgrade(&API_METHOD_WEBSOCKET), ), ]; pub const ROUTER: Router = Router::new() .get(&list_subdirs_api_method!(SUBDIRS)) .subdirs(SUBDIRS);