job: Add JobManagerInterface to emit JobStarted signal

This adds the JobManagerInterface helper object to emit a signal when a job is
started. It also renames the job prefix to be its own path so that the
JobManagerInterface can claim that as an object.
This commit is contained in:
Vicki Pfau 2024-06-27 19:55:32 -07:00
parent 8412adcd4a
commit 8e3fc1afeb
4 changed files with 100 additions and 9 deletions

View file

@ -405,4 +405,12 @@
</interface>
<interface name="com.steampowered.SteamOSManager1.JobManager">
<signal name="JobStarted">
<arg type="o" name="job"/>
</signal>
</interface>
</node>

View file

@ -15,16 +15,17 @@ use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use tokio::process::{Child, Command};
use tracing::error;
use zbus::{fdo, interface, zvariant, Connection};
use zbus::{fdo, interface, zvariant, Connection, InterfaceRef, SignalContext};
use crate::error::to_zbus_fdo_error;
const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Job";
const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Jobs";
pub struct JobManager {
// This object manages exported jobs. It spawns processes, numbers them, and
// keeps a handle to the zbus connection to expose the name over the bus.
connection: Connection,
jm_iface: InterfaceRef<JobManagerInterface>,
next_job: u32,
}
@ -34,12 +35,23 @@ struct Job {
exit_code: Option<i32>,
}
struct JobManagerInterface {}
impl JobManager {
pub fn new(conn: Connection) -> JobManager {
JobManager {
connection: conn,
pub async fn new(connection: Connection) -> Result<JobManager> {
let jm_iface = JobManagerInterface {};
let jm_iface: InterfaceRef<JobManagerInterface> = {
// This object needs to be dropped to appease the borrow checker
let object_server = connection.object_server();
object_server.at(JOB_PREFIX, jm_iface).await?;
object_server.interface(JOB_PREFIX).await?
};
Ok(JobManager {
connection,
jm_iface,
next_job: 0,
}
})
}
pub async fn run_process(
@ -49,7 +61,7 @@ impl JobManager {
operation_name: &str,
) -> fdo::Result<zvariant::OwnedObjectPath> {
// Run the given executable and give back an object path
let path = format!("{}{}", JOB_PREFIX, self.next_job);
let path = format!("{}/{}", JOB_PREFIX, self.next_job);
self.next_job += 1;
let job = Job::spawn(executable, args)
.await
@ -59,10 +71,23 @@ impl JobManager {
.object_server()
.at(path.as_str(), job)
.await?;
zvariant::OwnedObjectPath::try_from(path).map_err(to_zbus_fdo_error)
let object_path = zvariant::OwnedObjectPath::try_from(path).map_err(to_zbus_fdo_error)?;
JobManagerInterface::job_started(self.jm_iface.signal_context(), object_path.as_ref())
.await?;
Ok(object_path)
}
}
#[interface(name = "com.steampowered.SteamOSManager1.JobManager")]
impl JobManagerInterface {
#[zbus(signal)]
async fn job_started(
signal_ctxt: &SignalContext<'_>,
job: zvariant::ObjectPath<'_>,
) -> zbus::Result<()>;
}
impl Job {
async fn spawn(executable: &str, args: &[impl AsRef<OsStr>]) -> Result<Job> {
let child = Command::new(executable).args(args).spawn()?;
@ -176,8 +201,55 @@ impl Job {
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::proxy::JobManagerProxy;
use crate::testing;
use anyhow::anyhow;
use nix::sys::signal::Signal;
use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use zbus::ConnectionBuilder;
#[tokio::test]
async fn test_job_emitted() {
let _h = testing::start();
let connection = ConnectionBuilder::session()
.expect("session")
.build()
.await
.expect("connection");
let sender = connection.unique_name().unwrap().to_owned();
let mut pm = JobManager::new(connection).await.expect("pm");
let (tx, rx) = oneshot::channel::<()>();
let job = tokio::spawn(async move {
let connection = ConnectionBuilder::session()?.build().await?;
let jm = JobManagerProxy::builder(&connection)
.destination(sender)?
.build()
.await?;
let mut spawned = jm.receive_job_started().await?;
let next = spawned.next();
let _ = tx.send(());
next.await.ok_or(anyhow!("nothing"))
});
rx.await.expect("rx");
let object = pm
.run_process("/usr/bin/true", &[] as &[&OsStr], "")
.await
.expect("path");
assert_eq!(object.as_ref(), "/com/steampowered/SteamOSManager1/Jobs/0");
let job = job.await.expect("job");
let job = job.expect("job2");
let path = job.args().expect("args").job;
assert_eq!(object.as_ref(), path);
}
#[tokio::test]
async fn test_job_manager() {

View file

@ -55,7 +55,7 @@ impl SteamOSManager {
fan_control: FanControl::new(connection.clone()),
wifi_debug_mode: WifiDebugMode::Off,
should_trace: variant().await? == HardwareVariant::Galileo,
job_manager: JobManager::new(connection.clone()),
job_manager: JobManager::new(connection.clone()).await?,
connection,
channel,
})

View file

@ -176,3 +176,14 @@ trait Job {
/// Wait method
fn wait(&self) -> zbus::Result<i32>;
}
#[proxy(
default_service = "com.steampowered.SteamOSManager1",
default_path = "/com/steampowered/SteamOSManager1/Jobs",
interface = "com.steampowered.SteamOSManager1.JobManager"
)]
trait JobManager {
/// JobStarted signal
#[zbus(signal)]
fn job_started(&self, job: zbus::zvariant::ObjectPath<'_>) -> zbus::Result<()>;
}