From 603cd7217cfef83bd26db3445ef1cbedfd7ef4b2 Mon Sep 17 00:00:00 2001 From: Vicki Pfau Date: Tue, 16 Jul 2024 20:09:22 -0700 Subject: [PATCH] job: Split out from process Rename ProcessManager to JobManager, as that's what it really handles. This also renames get_command_object_path, which actually spawns a job instead of just getting something. Further, it moves job spawning into the Job object instead of being a helper function on ProcessManager. --- src/job.rs | 242 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/manager/root.rs | 23 +++-- src/process.rs | 244 +------------------------------------------- 4 files changed, 258 insertions(+), 252 deletions(-) create mode 100644 src/job.rs diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..c739a2c --- /dev/null +++ b/src/job.rs @@ -0,0 +1,242 @@ +/* + * Copyright © 2023 Collabora Ltd. + * Copyright © 2024 Valve Software + * + * SPDX-License-Identifier: MIT + */ + +use anyhow::{bail, Result}; +use libc::pid_t; +use nix::sys::signal; +use nix::sys::signal::Signal; +use nix::unistd::Pid; +use std::ffi::OsStr; +use std::os::unix::process::ExitStatusExt; +use std::process::ExitStatus; +use tokio::process::{Child, Command}; +use tracing::error; +use zbus::{fdo, interface, zvariant, Connection}; + +use crate::error::to_zbus_fdo_error; + +const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Job"; + +pub struct JobManager { + // This object manages exported jobs. It spawns processes, numbers them, and + // keeps a handle to the zbus connection to expose the name over the bus. + connection: Connection, + next_job: u32, +} + +pub struct Job { + process: Child, + paused: bool, + exit_code: Option, +} + +impl JobManager { + pub fn new(conn: Connection) -> JobManager { + JobManager { + connection: conn, + next_job: 0, + } + } + + pub async fn run_process( + &mut self, + executable: &str, + args: &[impl AsRef], + operation_name: &str, + ) -> fdo::Result { + // Run the given executable and give back an object path + let path = format!("{}{}", JOB_PREFIX, self.next_job); + self.next_job += 1; + let job = Job::spawn(executable, args) + .await + .inspect_err(|message| error!("Error {operation_name}: {message}")) + .map_err(to_zbus_fdo_error)?; + self.connection + .object_server() + .at(path.as_str(), job) + .await?; + zvariant::OwnedObjectPath::try_from(path).map_err(to_zbus_fdo_error) + } +} + +impl Job { + async fn spawn(executable: &str, args: &[impl AsRef]) -> Result { + let child = Command::new(executable).args(args).spawn()?; + Ok(Job { + process: child, + paused: false, + exit_code: None, + }) + } + + fn send_signal(&self, signal: nix::sys::signal::Signal) -> Result<()> { + let pid = match self.process.id() { + Some(id) => id, + None => bail!("Unable to get pid from command, it likely finished running"), + }; + let pid: pid_t = match pid.try_into() { + Ok(pid) => pid, + Err(message) => bail!("Unable to get pid_t from command {message}"), + }; + signal::kill(Pid::from_raw(pid), signal)?; + Ok(()) + } + + fn update_exit_code(&mut self, status: ExitStatus) -> Result { + if let Some(code) = status.code() { + self.exit_code = Some(code); + Ok(code) + } else if let Some(signal) = status.signal() { + self.exit_code = Some(-signal); + Ok(-signal) + } else { + bail!("Process exited without return code or signal"); + } + } + + fn try_wait(&mut self) -> Result> { + if self.exit_code.is_none() { + // If we don't already have an exit code, try to wait for the process + if let Some(status) = self.process.try_wait()? { + self.update_exit_code(status)?; + } + } + Ok(self.exit_code) + } + + async fn wait_internal(&mut self) -> Result { + if let Some(code) = self.exit_code { + // Just give the exit_code if we have it already + Ok(code) + } else { + // Otherwise wait for the process + let status = self.process.wait().await?; + self.update_exit_code(status) + } + } +} + +#[interface(name = "com.steampowered.SteamOSManager1.Job")] +impl Job { + pub async fn pause(&mut self) -> fdo::Result<()> { + if self.paused { + return Err(fdo::Error::Failed("Already paused".to_string())); + } + // Pause the given process if possible + // Return true on success, false otherwise + let result = self.send_signal(Signal::SIGSTOP).map_err(to_zbus_fdo_error); + self.paused = true; + result + } + + pub async fn resume(&mut self) -> fdo::Result<()> { + // Resume the given process if possible + if !self.paused { + return Err(fdo::Error::Failed("Not paused".to_string())); + } + let result = self.send_signal(Signal::SIGCONT).map_err(to_zbus_fdo_error); + self.paused = false; + result + } + + pub async fn cancel(&mut self, force: bool) -> fdo::Result<()> { + if self.try_wait().map_err(to_zbus_fdo_error)?.is_none() { + self.send_signal(match force { + true => Signal::SIGKILL, + false => Signal::SIGTERM, + }) + .map_err(to_zbus_fdo_error)?; + if self.paused { + self.resume().await?; + } + } + Ok(()) + } + + pub async fn wait(&mut self) -> fdo::Result { + if self.paused { + self.resume().await?; + } + + let code = match self.wait_internal().await.map_err(to_zbus_fdo_error) { + Ok(v) => v, + Err(_) => { + return Err(fdo::Error::Failed("Unable to get exit code".to_string())); + } + }; + self.exit_code = Some(code); + Ok(code) + } +} + +#[cfg(test)] +pub(crate) mod test { + use super::*; + use crate::testing; + use nix::sys::signal::Signal; + + #[tokio::test] + async fn test_job_manager() { + let _h = testing::start(); + + let mut false_process = Job::spawn("/bin/false", &[] as &[String; 0]).await.unwrap(); + let mut true_process = Job::spawn("/bin/true", &[] as &[String; 0]).await.unwrap(); + + let mut pause_process = Job::spawn("/usr/bin/sleep", &["0.2"]).await.unwrap(); + pause_process.pause().await.expect("pause"); + + assert_eq!( + pause_process.pause().await.unwrap_err(), + fdo::Error::Failed("Already paused".to_string()) + ); + + pause_process.resume().await.expect("resume"); + + assert_eq!( + pause_process.resume().await.unwrap_err(), + fdo::Error::Failed("Not paused".to_string()) + ); + + // Sleep gives 0 exit code when done, -1 when we haven't waited for it yet + assert_eq!(pause_process.wait().await.unwrap(), 0); + + assert_eq!(false_process.wait().await.unwrap(), 1); + assert_eq!(true_process.wait().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_multikill() { + let _h = testing::start(); + + let mut sleep_process = Job::spawn("/usr/bin/sleep", &["0.1"]).await.unwrap(); + sleep_process.cancel(true).await.expect("kill"); + + // Killing a process should be idempotent + sleep_process.cancel(true).await.expect("kill"); + + assert_eq!( + sleep_process.wait().await.unwrap(), + -(Signal::SIGKILL as i32) + ); + } + + #[tokio::test] + async fn test_terminate_unpause() { + let _h = testing::start(); + + let mut pause_process = Job::spawn("/usr/bin/sleep", &["0.2"]).await.unwrap(); + pause_process.pause().await.expect("pause"); + assert_eq!(pause_process.try_wait().expect("try_wait"), None); + + // Canceling a process should unpause it + pause_process.cancel(false).await.expect("pause"); + assert_eq!( + pause_process.wait().await.unwrap(), + -(Signal::SIGTERM as i32) + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 1126061..3c0f6b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ use tracing::{info, warn}; mod ds_inhibit; mod error; +mod job; mod manager; mod process; mod sls; diff --git a/src/manager/root.rs b/src/manager/root.rs index aeea2f0..9549d29 100644 --- a/src/manager/root.rs +++ b/src/manager/root.rs @@ -19,11 +19,12 @@ use crate::daemon::root::{Command, RootCommand}; use crate::daemon::DaemonCommand; use crate::error::{to_zbus_error, to_zbus_fdo_error}; use crate::hardware::{variant, FanControl, FanControlState, HardwareVariant}; +use crate::job::JobManager; use crate::power::{ set_cpu_scaling_governor, set_gpu_clocks, set_gpu_performance_level, set_gpu_power_profile, set_tdp_limit, CPUScalingGovernor, GPUPerformanceLevel, GPUPowerProfile, }; -use crate::process::{run_script, script_output, ProcessManager}; +use crate::process::{run_script, script_output}; use crate::wifi::{ set_wifi_backend, set_wifi_debug_mode, set_wifi_power_management_state, WifiBackend, WifiDebugMode, WifiPowerManagement, @@ -45,7 +46,7 @@ pub struct SteamOSManager { // Whether we should use trace-cmd or not. // True on galileo devices, false otherwise should_trace: bool, - process_manager: ProcessManager, + job_manager: JobManager, } impl SteamOSManager { @@ -54,7 +55,7 @@ impl SteamOSManager { fan_control: FanControl::new(connection.clone()), wifi_debug_mode: WifiDebugMode::Off, should_trace: variant().await? == HardwareVariant::Galileo, - process_manager: ProcessManager::new(connection.clone()), + job_manager: JobManager::new(connection.clone()), connection, channel, }) @@ -137,15 +138,15 @@ impl SteamOSManager { async fn update_bios(&mut self) -> fdo::Result { // Update the bios as needed - self.process_manager - .get_command_object_path("/usr/bin/jupiter-biosupdate", &["--auto"], "updating BIOS") + self.job_manager + .run_process("/usr/bin/jupiter-biosupdate", &["--auto"], "updating BIOS") .await } async fn update_dock(&mut self) -> fdo::Result { // Update the dock firmware as needed - self.process_manager - .get_command_object_path( + self.job_manager + .run_process( "/usr/lib/jupiter-dock-updater/jupiter-dock-updater.sh", &[] as &[String; 0], "updating dock", @@ -155,8 +156,8 @@ impl SteamOSManager { async fn trim_devices(&mut self) -> fdo::Result { // Run steamos-trim-devices script - self.process_manager - .get_command_object_path( + self.job_manager + .run_process( "/usr/lib/hwsupport/trim-devices.sh", &[] as &[String; 0], "trimming devices", @@ -174,8 +175,8 @@ impl SteamOSManager { if !validate { args.push("--skip-validation"); } - self.process_manager - .get_command_object_path( + self.job_manager + .run_process( "/usr/lib/hwsupport/format-device.sh", args.as_ref(), format!("formatting {device}").as_str(), diff --git a/src/process.rs b/src/process.rs index 16c9166..db72317 100644 --- a/src/process.rs +++ b/src/process.rs @@ -5,176 +5,10 @@ * SPDX-License-Identifier: MIT */ -use anyhow::{anyhow, bail, Result}; -use libc::pid_t; -use nix::sys::signal; -use nix::sys::signal::Signal; -use nix::unistd::Pid; +use anyhow::{anyhow, Result}; use std::ffi::OsStr; -use std::os::unix::process::ExitStatusExt; -use std::process::ExitStatus; -use tokio::process::{Child, Command}; -use tracing::error; -use zbus::{fdo, interface, zvariant, Connection}; - -use crate::error::to_zbus_fdo_error; - -const PROCESS_PREFIX: &str = "/com/steampowered/SteamOSManager1/Job"; - -pub struct ProcessManager { - // The thing that manages subprocesses. - // Keeps a handle to the zbus connection and - // what the next process id on the bus should be - connection: Connection, - next_process: u32, -} - -pub struct Job { - process: Child, - paused: bool, - exit_code: Option, -} - -impl ProcessManager { - pub fn new(conn: Connection) -> ProcessManager { - ProcessManager { - connection: conn, - next_process: 0, - } - } - - pub async fn get_command_object_path( - &mut self, - executable: &str, - args: &[impl AsRef], - operation_name: &str, - ) -> fdo::Result { - // Run the given executable and give back an object path - let path = format!("{}{}", PROCESS_PREFIX, self.next_process); - self.next_process += 1; - let pm = ProcessManager::run_long_command(executable, args) - .await - .inspect_err(|message| error!("Error {operation_name}: {message}")) - .map_err(to_zbus_fdo_error)?; - self.connection - .object_server() - .at(path.as_str(), pm) - .await?; - zvariant::OwnedObjectPath::try_from(path).map_err(to_zbus_fdo_error) - } - - pub async fn run_long_command(executable: &str, args: &[impl AsRef]) -> Result { - // Run the given executable with the given arguments - // Return an id that can be used later to pause/cancel/resume as needed - let child = Command::new(executable).args(args).spawn()?; - Ok(Job { - process: child, - paused: false, - exit_code: None, - }) - } -} - -impl Job { - fn send_signal(&self, signal: nix::sys::signal::Signal) -> Result<()> { - let pid = match self.process.id() { - Some(id) => id, - None => bail!("Unable to get pid from command, it likely finished running"), - }; - let pid: pid_t = match pid.try_into() { - Ok(pid) => pid, - Err(message) => bail!("Unable to get pid_t from command {message}"), - }; - signal::kill(Pid::from_raw(pid), signal)?; - Ok(()) - } - - fn update_exit_code(&mut self, status: ExitStatus) -> Result { - if let Some(code) = status.code() { - self.exit_code = Some(code); - Ok(code) - } else if let Some(signal) = status.signal() { - self.exit_code = Some(-signal); - Ok(-signal) - } else { - bail!("Process exited without return code or signal"); - } - } - - fn try_wait(&mut self) -> Result> { - if self.exit_code.is_none() { - // If we don't already have an exit code, try to wait for the process - if let Some(status) = self.process.try_wait()? { - self.update_exit_code(status)?; - } - } - Ok(self.exit_code) - } - - async fn wait_internal(&mut self) -> Result { - if let Some(code) = self.exit_code { - // Just give the exit_code if we have it already - Ok(code) - } else { - // Otherwise wait for the process - let status = self.process.wait().await?; - self.update_exit_code(status) - } - } -} - -#[interface(name = "com.steampowered.SteamOSManager1.Job")] -impl Job { - pub async fn pause(&mut self) -> fdo::Result<()> { - if self.paused { - return Err(fdo::Error::Failed("Already paused".to_string())); - } - // Pause the given process if possible - // Return true on success, false otherwise - let result = self.send_signal(Signal::SIGSTOP).map_err(to_zbus_fdo_error); - self.paused = true; - result - } - - pub async fn resume(&mut self) -> fdo::Result<()> { - // Resume the given process if possible - if !self.paused { - return Err(fdo::Error::Failed("Not paused".to_string())); - } - let result = self.send_signal(Signal::SIGCONT).map_err(to_zbus_fdo_error); - self.paused = false; - result - } - - pub async fn cancel(&mut self, force: bool) -> fdo::Result<()> { - if self.try_wait().map_err(to_zbus_fdo_error)?.is_none() { - self.send_signal(match force { - true => Signal::SIGKILL, - false => Signal::SIGTERM, - }) - .map_err(to_zbus_fdo_error)?; - if self.paused { - self.resume().await?; - } - } - Ok(()) - } - - pub async fn wait(&mut self) -> fdo::Result { - if self.paused { - self.resume().await?; - } - - let code = match self.wait_internal().await.map_err(to_zbus_fdo_error) { - Ok(v) => v, - Err(_) => { - return Err(fdo::Error::Failed("Unable to get exit code".to_string())); - } - }; - self.exit_code = Some(code); - Ok(code) - } -} +#[cfg(not(test))] +use tokio::process::Command; #[cfg(not(test))] pub async fn script_exit_code(executable: &str, args: &[impl AsRef]) -> Result { @@ -225,7 +59,6 @@ pub async fn script_output(executable: &str, args: &[impl AsRef]) -> Resu pub(crate) mod test { use super::*; use crate::testing; - use nix::sys::signal::Signal; pub fn ok(_: &str, _: &[&OsStr]) -> Result<(i32, String)> { Ok((0, String::from("ok"))) @@ -239,77 +72,6 @@ pub(crate) mod test { Err(anyhow!("oops!")) } - #[tokio::test] - async fn test_process_manager() { - let _h = testing::start(); - - let mut false_process = ProcessManager::run_long_command("/bin/false", &[] as &[String; 0]) - .await - .unwrap(); - let mut true_process = ProcessManager::run_long_command("/bin/true", &[] as &[String; 0]) - .await - .unwrap(); - - let mut pause_process = ProcessManager::run_long_command("/usr/bin/sleep", &["0.2"]) - .await - .unwrap(); - pause_process.pause().await.expect("pause"); - - assert_eq!( - pause_process.pause().await.unwrap_err(), - fdo::Error::Failed("Already paused".to_string()) - ); - - pause_process.resume().await.expect("resume"); - - assert_eq!( - pause_process.resume().await.unwrap_err(), - fdo::Error::Failed("Not paused".to_string()) - ); - - // Sleep gives 0 exit code when done, -1 when we haven't waited for it yet - assert_eq!(pause_process.wait().await.unwrap(), 0); - - assert_eq!(false_process.wait().await.unwrap(), 1); - assert_eq!(true_process.wait().await.unwrap(), 0); - } - - #[tokio::test] - async fn test_multikill() { - let _h = testing::start(); - - let mut sleep_process = ProcessManager::run_long_command("/usr/bin/sleep", &["0.1"]) - .await - .unwrap(); - sleep_process.cancel(true).await.expect("kill"); - - // Killing a process should be idempotent - sleep_process.cancel(true).await.expect("kill"); - - assert_eq!( - sleep_process.wait().await.unwrap(), - -(Signal::SIGKILL as i32) - ); - } - - #[tokio::test] - async fn test_terminate_unpause() { - let _h = testing::start(); - - let mut pause_process = ProcessManager::run_long_command("/usr/bin/sleep", &["0.2"]) - .await - .unwrap(); - pause_process.pause().await.expect("pause"); - assert_eq!(pause_process.try_wait().expect("try_wait"), None); - - // Canceling a process should unpause it - pause_process.cancel(false).await.expect("pause"); - assert_eq!( - pause_process.wait().await.unwrap(), - -(Signal::SIGTERM as i32) - ); - } - #[tokio::test] async fn test_run_script() { let h = testing::start();