src/server/worker_task.rs: Avoid using pbs-api-type::Authid
Because we want to move worker_task.rs into proxmox-rest-server crate.
This commit is contained in:
		| @ -8,8 +8,6 @@ use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, Array | ||||
| use proxmox::const_regex; | ||||
| use proxmox::sys::linux::procfs; | ||||
|  | ||||
| use crate::Authid; | ||||
|  | ||||
| /// Unique Process/Task Identifier | ||||
| /// | ||||
| /// We use this to uniquely identify worker task. UPIDs have a short | ||||
| @ -37,7 +35,7 @@ pub struct UPID { | ||||
|     /// Worker ID (arbitrary ASCII string) | ||||
|     pub worker_id: Option<String>, | ||||
|     /// The authenticated entity who started the task | ||||
|     pub auth_id: Authid, | ||||
|     pub auth_id: String, | ||||
|     /// The node name. | ||||
|     pub node: String, | ||||
| } | ||||
| @ -71,7 +69,7 @@ impl UPID { | ||||
|     pub fn new( | ||||
|         worker_type: &str, | ||||
|         worker_id: Option<String>, | ||||
|         auth_id: Authid, | ||||
|         auth_id: String, | ||||
|     ) -> Result<Self, Error> { | ||||
|  | ||||
|         let pid = unsafe { libc::getpid() }; | ||||
| @ -82,6 +80,10 @@ impl UPID { | ||||
|             bail!("illegal characters in worker type '{}'", worker_type); | ||||
|         } | ||||
|  | ||||
|         if auth_id.contains(bad) { | ||||
|             bail!("illegal characters in auth_id '{}'", auth_id); | ||||
|         } | ||||
|  | ||||
|         static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0); | ||||
|  | ||||
|         let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst); | ||||
| @ -184,7 +186,7 @@ pub struct TaskListItem { | ||||
|     /// Worker ID (arbitrary ASCII string) | ||||
|     pub worker_id: Option<String>, | ||||
|     /// The authenticated entity who started the task | ||||
|     pub user: Authid, | ||||
|     pub user: String, | ||||
|     /// The task end time (Epoch) | ||||
|     #[serde(skip_serializing_if="Option::is_none")] | ||||
|     pub endtime: Option<i64>, | ||||
| @ -200,4 +202,3 @@ pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType { | ||||
|         &TaskListItem::API_SCHEMA, | ||||
|     ).schema(), | ||||
| }; | ||||
|  | ||||
|  | ||||
| @ -722,7 +722,7 @@ pub fn verify( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         worker_type, | ||||
|         Some(worker_id), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); | ||||
| @ -862,7 +862,7 @@ pub fn prune( | ||||
|  | ||||
|  | ||||
|     // We use a WorkerTask just to have a task log, but run synchrounously | ||||
|     let worker = WorkerTask::new("prune", Some(worker_id), auth_id, true)?; | ||||
|     let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?; | ||||
|  | ||||
|     if keep_all { | ||||
|         worker.log("No prune selection - keeping all files."); | ||||
| @ -957,7 +957,7 @@ pub fn prune_datastore( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         "prune", | ||||
|         Some(store.clone()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| crate::server::prune_datastore( | ||||
|             worker.clone(), | ||||
|  | ||||
| @ -525,7 +525,7 @@ impl BackupEnvironment { | ||||
|         WorkerTask::new_thread( | ||||
|             "verify", | ||||
|             Some(worker_id), | ||||
|             self.auth_id.clone(), | ||||
|             self.auth_id.to_string(), | ||||
|             false, | ||||
|             move |worker| { | ||||
|                 worker.log("Automatically verifying newly added snapshot"); | ||||
|  | ||||
| @ -166,7 +166,7 @@ async move { | ||||
|     if !is_new { bail!("backup directory already exists."); } | ||||
|  | ||||
|  | ||||
|     WorkerTask::spawn(worker_type, Some(worker_id), auth_id.clone(), true, move |worker| { | ||||
|     WorkerTask::spawn(worker_type, Some(worker_id), auth_id.to_string(), true, move |worker| { | ||||
|         let mut env = BackupEnvironment::new( | ||||
|             env_type, auth_id, worker.clone(), datastore, backup_dir); | ||||
|  | ||||
|  | ||||
| @ -215,7 +215,7 @@ fn register_account( | ||||
|     WorkerTask::spawn( | ||||
|         "acme-register", | ||||
|         Some(name.to_string()), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         true, | ||||
|         move |worker| async move { | ||||
|             let mut client = AcmeClient::new(directory); | ||||
| @ -275,7 +275,7 @@ pub fn update_account( | ||||
|     WorkerTask::spawn( | ||||
|         "acme-update", | ||||
|         Some(name.to_string()), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         true, | ||||
|         move |_worker| async move { | ||||
|             let data = match contact { | ||||
| @ -320,7 +320,7 @@ pub fn deactivate_account( | ||||
|     WorkerTask::spawn( | ||||
|         "acme-deactivate", | ||||
|         Some(name.to_string()), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         true, | ||||
|         move |worker| async move { | ||||
|             match AcmeClient::load(&name) | ||||
|  | ||||
| @ -119,9 +119,9 @@ pub fn create_datastore( | ||||
|     WorkerTask::new_thread( | ||||
|         "create-datastore", | ||||
|         Some(config.name.to_string()), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| do_create_datastore(lock, section_config, config, Some(&worker)), | ||||
|        move |worker| do_create_datastore(lock, section_config, config, Some(&worker)), | ||||
|     ) | ||||
| } | ||||
|  | ||||
|  | ||||
| @ -14,7 +14,7 @@ use proxmox_apt::repositories::{ | ||||
| use proxmox_http::ProxyConfig; | ||||
|  | ||||
| use pbs_api_types::{ | ||||
|     Authid, APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA, | ||||
|     APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA, | ||||
|     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, | ||||
| }; | ||||
|  | ||||
| @ -154,7 +154,7 @@ pub fn apt_update_database( | ||||
|     rpcenv: &mut dyn RpcEnvironment, | ||||
| ) -> Result<String, Error> { | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; | ||||
|  | ||||
|     let upid_str = WorkerTask::new_thread("aptupdate", None, auth_id, to_stdout, move |worker| { | ||||
|  | ||||
| @ -11,7 +11,7 @@ use proxmox::api::router::SubdirMap; | ||||
| use proxmox::api::{api, Permission, Router, RpcEnvironment}; | ||||
| use proxmox::list_subdirs_api_method; | ||||
|  | ||||
| use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_MODIFY}; | ||||
| use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY}; | ||||
| use pbs_buildcfg::configdir; | ||||
| use pbs_tools::cert; | ||||
|  | ||||
| @ -530,7 +530,7 @@ fn spawn_certificate_worker( | ||||
|  | ||||
|     let (node_config, _digest) = crate::config::node::config()?; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     WorkerTask::spawn(name, None, auth_id, true, move |worker| async move { | ||||
|         if let Some(cert) = order_certificate(worker, &node_config).await? { | ||||
| @ -559,7 +559,7 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error | ||||
|  | ||||
|     let cert_pem = get_certificate_pem()?; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     WorkerTask::spawn( | ||||
|         "acme-revoke-cert", | ||||
|  | ||||
| @ -7,7 +7,7 @@ use proxmox::api::section_config::SectionConfigData; | ||||
| use proxmox::api::router::Router; | ||||
|  | ||||
| use pbs_api_types::{ | ||||
|     Authid, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, | ||||
|     DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, | ||||
|     DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, | ||||
| }; | ||||
|  | ||||
| @ -146,7 +146,7 @@ pub fn create_datastore_disk( | ||||
|  | ||||
|     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     let info = get_disk_usage_info(&disk, true)?; | ||||
|  | ||||
|  | ||||
| @ -7,7 +7,7 @@ use proxmox::{sortable, identity}; | ||||
| use proxmox::{list_subdirs_api_method}; | ||||
|  | ||||
| use pbs_api_types::{ | ||||
|     Authid, UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, | ||||
|     UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, | ||||
|     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, | ||||
| }; | ||||
|  | ||||
| @ -144,7 +144,7 @@ pub fn initialize_disk( | ||||
|  | ||||
|     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     let info = get_disk_usage_info(&disk, true)?; | ||||
|  | ||||
|  | ||||
| @ -8,7 +8,7 @@ use proxmox::api::{ | ||||
| use proxmox::api::router::Router; | ||||
|  | ||||
| use pbs_api_types::{ | ||||
|     Authid, ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig, | ||||
|     ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig, | ||||
|     NODE_SCHEMA, ZPOOL_NAME_SCHEMA, DATASTORE_SCHEMA, DISK_ARRAY_SCHEMA, | ||||
|     DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA, | ||||
|     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY, | ||||
| @ -168,7 +168,7 @@ pub fn create_zpool( | ||||
|  | ||||
|     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     let add_datastore = add_datastore.unwrap_or(false); | ||||
|  | ||||
|  | ||||
| @ -146,7 +146,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu | ||||
|     let upid = WorkerTask::spawn( | ||||
|         "termproxy", | ||||
|         None, | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         false, | ||||
|         move |worker| async move { | ||||
|             // move inside the worker so that it survives and does not close the port | ||||
|  | ||||
| @ -703,7 +703,7 @@ pub async fn reload_network_config( | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|  | ||||
|     let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id, true, |_worker| async { | ||||
|     let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id.to_string(), true, |_worker| async { | ||||
|  | ||||
|         let _ = std::fs::rename(network::NETWORK_INTERFACES_NEW_FILENAME, network::NETWORK_INTERFACES_FILENAME); | ||||
|  | ||||
|  | ||||
| @ -195,7 +195,7 @@ fn run_service_command(service: &str, cmd: &str, auth_id: Authid) -> Result<Valu | ||||
|     let upid = WorkerTask::new_thread( | ||||
|         &workerid, | ||||
|         Some(service.clone()), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         false, | ||||
|         move |_worker| { | ||||
|  | ||||
|  | ||||
| @ -99,8 +99,8 @@ fn check_job_store(upid: &UPID, store: &str) -> bool { | ||||
| } | ||||
|  | ||||
| fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> { | ||||
|     let task_auth_id = &upid.auth_id; | ||||
|     if auth_id == task_auth_id | ||||
|     let task_auth_id: Authid = upid.auth_id.parse()?; | ||||
|     if auth_id == &task_auth_id | ||||
|         || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) { | ||||
|         // task owner can always read | ||||
|         Ok(()) | ||||
| @ -200,6 +200,8 @@ async fn get_task_status( | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     check_task_access(&auth_id, &upid)?; | ||||
|  | ||||
|     let task_auth_id: Authid = upid.auth_id.parse()?; | ||||
|  | ||||
|     let mut result = json!({ | ||||
|         "upid": param["upid"], | ||||
|         "node": upid.node, | ||||
| @ -208,11 +210,11 @@ async fn get_task_status( | ||||
|         "starttime": upid.starttime, | ||||
|         "type": upid.worker_type, | ||||
|         "id": upid.worker_id, | ||||
|         "user": upid.auth_id.user(), | ||||
|         "user": task_auth_id.user(), | ||||
|     }); | ||||
|  | ||||
|     if upid.auth_id.is_token() { | ||||
|         result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str()); | ||||
|     if task_auth_id.is_token() { | ||||
|         result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str()); | ||||
|     } | ||||
|  | ||||
|     if crate::server::worker_is_active(&upid).await? { | ||||
| @ -344,10 +346,11 @@ fn stop_task( | ||||
|  | ||||
|     let upid = extract_upid(¶m)?; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|  | ||||
|     if auth_id != upid.auth_id { | ||||
|         let user_info = CachedUserInfo::new()?; | ||||
|         let auth_id: Authid = auth_id.parse()?; | ||||
|         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?; | ||||
|     } | ||||
|  | ||||
|  | ||||
| @ -81,7 +81,7 @@ pub fn do_sync_job( | ||||
|     let upid_str = WorkerTask::spawn( | ||||
|         &worker_type, | ||||
|         Some(job_id.clone()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| async move { | ||||
|  | ||||
| @ -183,7 +183,7 @@ async fn pull ( | ||||
|     let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?; | ||||
|  | ||||
|     // fixme: set to_stdout to false? | ||||
|     let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.clone(), true, move |worker| async move { | ||||
|     let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { | ||||
|  | ||||
|         worker.log(format!("sync datastore '{}' start", store)); | ||||
|  | ||||
|  | ||||
| @ -143,7 +143,7 @@ fn upgrade_to_backup_reader_protocol( | ||||
|  | ||||
|         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time()); | ||||
|  | ||||
|         WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move { | ||||
|         WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move { | ||||
|             let _guard = _guard; | ||||
|  | ||||
|             let mut env = ReaderEnvironment::new( | ||||
|  | ||||
| @ -195,7 +195,7 @@ pub fn do_tape_backup_job( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         &worker_type, | ||||
|         Some(job_id.clone()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             job.start(&worker.upid().to_string())?; | ||||
| @ -376,7 +376,7 @@ pub fn backup( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         "tape-backup", | ||||
|         Some(job_id), | ||||
|         auth_id, | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             let _drive_lock = drive_lock; // keep lock guard | ||||
|  | ||||
| @ -87,7 +87,7 @@ where | ||||
|     let (config, _digest) = pbs_config::drive::config()?; | ||||
|     let lock_guard = lock_tape_device(&config, &drive)?; | ||||
|  | ||||
|     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; | ||||
|     let auth_id = rpcenv.get_auth_id().unwrap(); | ||||
|     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; | ||||
|  | ||||
|     WorkerTask::new_thread(worker_type, job_id, auth_id, to_stdout, move |worker| { | ||||
|  | ||||
| @ -275,7 +275,7 @@ pub fn restore( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         "tape-restore", | ||||
|         Some(taskid), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             let _drive_lock = drive_lock; // keep lock guard | ||||
|  | ||||
| @ -745,7 +745,7 @@ async fn schedule_task_log_rotate() { | ||||
|     if let Err(err) = WorkerTask::new_thread( | ||||
|         worker_type, | ||||
|         None, | ||||
|         Authid::root_auth_id().clone(), | ||||
|         Authid::root_auth_id().to_string(), | ||||
|         false, | ||||
|         move |worker| { | ||||
|             job.start(&worker.upid().to_string())?; | ||||
|  | ||||
| @ -26,7 +26,7 @@ pub fn do_garbage_collection_job( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         &worker_type, | ||||
|         Some(store.clone()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             job.start(&worker.upid().to_string())?; | ||||
|  | ||||
| @ -105,7 +105,7 @@ pub fn do_prune_job( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         &worker_type, | ||||
|         Some(job.jobname().to_string()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         false, | ||||
|         move |worker| { | ||||
|             job.start(&worker.upid().to_string())?; | ||||
|  | ||||
| @ -36,7 +36,7 @@ pub fn do_verification_job( | ||||
|     let upid_str = WorkerTask::new_thread( | ||||
|         &worker_type, | ||||
|         Some(job_id.clone()), | ||||
|         auth_id.clone(), | ||||
|         auth_id.to_string(), | ||||
|         to_stdout, | ||||
|         move |worker| { | ||||
|             job.start(&worker.upid().to_string())?; | ||||
|  | ||||
| @ -18,7 +18,7 @@ use proxmox::tools::fs::{create_path, replace_file, CreateOptions}; | ||||
|  | ||||
| use pbs_buildcfg; | ||||
| use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; | ||||
| use pbs_api_types::{Authid, UPID}; | ||||
| use pbs_api_types::UPID; | ||||
| use pbs_config::{open_backup_lockfile, BackupLockGuard}; | ||||
| use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions}; | ||||
|  | ||||
| @ -589,7 +589,7 @@ struct WorkerTaskData { | ||||
|  | ||||
| impl WorkerTask { | ||||
|  | ||||
|     pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> { | ||||
|     pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> { | ||||
|         let upid = UPID::new(worker_type, worker_id, auth_id)?; | ||||
|         let task_id = upid.task_id; | ||||
|  | ||||
| @ -640,7 +640,7 @@ impl WorkerTask { | ||||
|     pub fn spawn<F, T>( | ||||
|         worker_type: &str, | ||||
|         worker_id: Option<String>, | ||||
|         auth_id: Authid, | ||||
|         auth_id: String, | ||||
|         to_stdout: bool, | ||||
|         f: F, | ||||
|     ) -> Result<String, Error> | ||||
| @ -662,7 +662,7 @@ impl WorkerTask { | ||||
|     pub fn new_thread<F>( | ||||
|         worker_type: &str, | ||||
|         worker_id: Option<String>, | ||||
|         auth_id: Authid, | ||||
|         auth_id: String, | ||||
|         to_stdout: bool, | ||||
|         f: F, | ||||
|     ) -> Result<String, Error> | ||||
|  | ||||
		Reference in New Issue
	
	Block a user