job: Add service to listen for jobs starting

This commit is contained in:
Vicki Pfau 2024-07-09 17:18:57 -07:00
parent 3c56afe921
commit 90b382cf7e
3 changed files with 156 additions and 41 deletions

View file

@ -8,7 +8,7 @@
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{unbounded_channel, Sender};
use tracing::error; use tracing::error;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, Registry}; use tracing_subscriber::{fmt, Registry};
@ -18,9 +18,11 @@ use zbus::connection::Connection;
use zbus::ConnectionBuilder; use zbus::ConnectionBuilder;
use crate::daemon::{channel, Daemon, DaemonCommand, DaemonContext}; use crate::daemon::{channel, Daemon, DaemonCommand, DaemonContext};
use crate::job::{JobManager, JobManagerService};
use crate::manager::user::SteamOSManager; use crate::manager::user::SteamOSManager;
use crate::path; use crate::path;
use crate::udev::UdevMonitor; use crate::udev::UdevMonitor;
use crate::Service;
#[derive(Copy, Clone, Default, Deserialize, Debug)] #[derive(Copy, Clone, Default, Deserialize, Debug)]
#[serde(default)] #[serde(default)]
@ -101,20 +103,25 @@ impl DaemonContext for UserContext {
pub(crate) type Command = DaemonCommand<()>; pub(crate) type Command = DaemonCommand<()>;
async fn create_connections(channel: Sender<Command>) -> Result<(Connection, Connection)> { async fn create_connections(
channel: Sender<Command>,
) -> Result<(Connection, Connection, impl Service)> {
let system = Connection::system().await?; let system = Connection::system().await?;
let connection = ConnectionBuilder::session()? let connection = ConnectionBuilder::session()?
.name("com.steampowered.SteamOSManager1")? .name("com.steampowered.SteamOSManager1")?
.build() .build()
.await?; .await?;
let manager = SteamOSManager::new(connection.clone(), &system, channel).await?; let (tx, rx) = unbounded_channel();
let job_manager = JobManager::new(connection.clone()).await?;
let manager = SteamOSManager::new(connection.clone(), system.clone(), channel, tx).await?;
let service = JobManagerService::new(job_manager, rx, system.clone());
connection connection
.object_server() .object_server()
.at("/com/steampowered/SteamOSManager1", manager) .at("/com/steampowered/SteamOSManager1", manager)
.await?; .await?;
Ok((connection, system)) Ok((connection, system, service))
} }
pub async fn daemon() -> Result<()> { pub async fn daemon() -> Result<()> {
@ -125,7 +132,7 @@ pub async fn daemon() -> Result<()> {
let subscriber = Registry::default().with(stdout_log); let subscriber = Registry::default().with(stdout_log);
let (tx, rx) = channel::<UserContext>(); let (tx, rx) = channel::<UserContext>();
let (session, system) = match create_connections(tx).await { let (session, system, mirror_service) = match create_connections(tx).await {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let _guard = tracing::subscriber::set_default(subscriber); let _guard = tracing::subscriber::set_default(subscriber);
@ -137,5 +144,7 @@ pub async fn daemon() -> Result<()> {
let context = UserContext { session }; let context = UserContext { session };
let mut daemon = Daemon::new(subscriber, system, rx).await?; let mut daemon = Daemon::new(subscriber, system, rx).await?;
daemon.add_service(mirror_service);
daemon.run(context).await daemon.run(context).await
} }

View file

@ -5,24 +5,28 @@
* SPDX-License-Identifier: MIT * SPDX-License-Identifier: MIT
*/ */
use anyhow::{bail, Result}; use anyhow::{anyhow, bail, Result};
use libc::pid_t; use libc::pid_t;
use nix::sys::signal; use nix::sys::signal;
use nix::sys::signal::Signal; use nix::sys::signal::Signal;
use nix::unistd::Pid; use nix::unistd::Pid;
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::OsStr; use std::ffi::{OsStr, OsString};
use std::io::Cursor; use std::io::Cursor;
use std::os::unix::process::ExitStatusExt; use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus; use std::process::ExitStatus;
use tokio::process::{Child, Command}; use tokio::process::{Child, Command};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use tracing::error; use tracing::error;
use zbus::fdo::{self, IntrospectableProxy}; use zbus::fdo::{self, IntrospectableProxy};
use zbus::{interface, zvariant, Connection, Interface, InterfaceRef, SignalContext}; use zbus::{interface, zvariant, Connection, Interface, InterfaceRef, SignalContext};
use zbus_xml::Node; use zbus_xml::Node;
use crate::error::{to_zbus_fdo_error, zbus_to_zbus_fdo}; use crate::error::{to_zbus_fdo_error, zbus_to_zbus_fdo};
use crate::proxy::JobProxy; use crate::proxy::{JobManagerProxy, JobProxy};
use crate::Service;
const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Jobs"; const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Jobs";
@ -43,10 +47,32 @@ struct Job {
struct JobManagerInterface {} struct JobManagerInterface {}
pub struct JobManagerService {
job_manager: JobManager,
channel: UnboundedReceiver<JobManagerCommand>,
connection: Connection,
}
struct MirroredJob { struct MirroredJob {
job: JobProxy<'static>, job: JobProxy<'static>,
} }
pub enum JobManagerCommand {
MirrorConnection(Connection),
MirrorJob {
connection: Connection,
path: zvariant::OwnedObjectPath,
reply: oneshot::Sender<fdo::Result<zvariant::OwnedObjectPath>>,
},
#[allow(unused)]
RunProcess {
executable: String,
args: Vec<OsString>,
operation_name: String,
reply: oneshot::Sender<fdo::Result<zvariant::OwnedObjectPath>>,
},
}
impl JobManager { impl JobManager {
pub async fn new(connection: Connection) -> Result<JobManager> { pub async fn new(connection: Connection) -> Result<JobManager> {
let jm_iface = JobManagerInterface {}; let jm_iface = JobManagerInterface {};
@ -278,15 +304,87 @@ impl MirroredJob {
} }
} }
impl JobManagerService {
pub fn new(
job_manager: JobManager,
channel: UnboundedReceiver<JobManagerCommand>,
connection: Connection,
) -> JobManagerService {
JobManagerService {
job_manager,
channel,
connection,
}
}
async fn handle_command(&mut self, command: JobManagerCommand) -> Result<()> {
match command {
JobManagerCommand::MirrorConnection(connection) => {
self.job_manager.mirror_connection(&connection).await?
}
JobManagerCommand::MirrorJob {
connection,
path,
reply,
} => {
let path = self.job_manager.mirror_job(&connection, path).await;
reply
.send(path)
.map_err(|e| anyhow!("Failed to send reply {e:?}"))?;
}
JobManagerCommand::RunProcess {
executable,
args,
operation_name,
reply,
} => {
let path = self
.job_manager
.run_process(&executable, &args, &operation_name)
.await;
reply
.send(path)
.map_err(|e| anyhow!("Failed to send reply {e:?}"))?;
}
}
Ok(())
}
}
impl Service for JobManagerService {
const NAME: &'static str = "job-manager";
async fn run(&mut self) -> Result<()> {
let jm = JobManagerProxy::new(&self.connection).await?;
let mut stream = jm.receive_job_started().await?;
loop {
tokio::select! {
Some(job) = stream.next() => {
let path = job.args()?.job;
self.job_manager
.mirror_job(&self.connection, path)
.await?;
},
message = self.channel.recv() => {
let message = match message {
None => bail!("Job manager service channel broke"),
Some(message) => message,
};
self.handle_command(message).await.inspect_err(|e| error!("Failed to handle command: {e}"))?;
},
}
}
}
}
#[cfg(test)] #[cfg(test)]
pub(crate) mod test { pub(crate) mod test {
use super::*; use super::*;
use crate::proxy::JobManagerProxy;
use crate::testing; use crate::testing;
use anyhow::anyhow; use anyhow::anyhow;
use nix::sys::signal::Signal; use nix::sys::signal::Signal;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use zbus::ConnectionBuilder; use zbus::ConnectionBuilder;
#[tokio::test] #[tokio::test]

View file

@ -8,8 +8,9 @@
use anyhow::Result; use anyhow::Result;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{Sender, UnboundedSender};
use tracing::{error, warn}; use tokio::sync::oneshot;
use tracing::error;
use zbus::proxy::Builder; use zbus::proxy::Builder;
use zbus::zvariant::{self, Fd}; use zbus::zvariant::{self, Fd};
use zbus::{fdo, interface, CacheProperties, Connection, Proxy, SignalContext}; use zbus::{fdo, interface, CacheProperties, Connection, Proxy, SignalContext};
@ -19,7 +20,7 @@ use crate::daemon::user::Command;
use crate::daemon::DaemonCommand; use crate::daemon::DaemonCommand;
use crate::error::{to_zbus_error, to_zbus_fdo_error, zbus_to_zbus_fdo}; use crate::error::{to_zbus_error, to_zbus_fdo_error, zbus_to_zbus_fdo};
use crate::hardware::check_support; use crate::hardware::check_support;
use crate::job::JobManager; use crate::job::JobManagerCommand;
use crate::power::{ use crate::power::{
get_available_cpu_scaling_governors, get_cpu_scaling_governor, get_gpu_clocks, get_available_cpu_scaling_governors, get_cpu_scaling_governor, get_gpu_clocks,
get_gpu_performance_level, get_gpu_power_profile, get_gpu_power_profiles, get_tdp_limit, get_gpu_performance_level, get_gpu_power_profile, get_gpu_power_profiles, get_tdp_limit,
@ -44,16 +45,26 @@ macro_rules! method {
macro_rules! job_method { macro_rules! job_method {
($self:expr, $method:expr, $($args:expr),+) => { ($self:expr, $method:expr, $($args:expr),+) => {
$self.job_manager.mirror_job::<zvariant::OwnedObjectPath>( {
$self.proxy.connection(), let (tx, rx) = oneshot::channel();
method!($self, $method, $($args),+)? $self.job_manager.send(JobManagerCommand::MirrorJob {
).await connection: $self.proxy.connection().clone(),
path: method!($self, $method, $($args),+)?,
reply: tx,
}).map_err(to_zbus_fdo_error)?;
rx.await.map_err(to_zbus_fdo_error)?
}
}; };
($self:expr, $method:expr) => { ($self:expr, $method:expr) => {
$self.job_manager.mirror_job::<zvariant::OwnedObjectPath>( {
$self.proxy.connection(), let (tx, rx) = oneshot::channel();
method!($self, $method)? $self.job_manager.send(JobManagerCommand::MirrorJob {
).await connection: $self.proxy.connection().clone(),
path: method!($self, $method)?,
reply: tx,
}).map_err(to_zbus_fdo_error)?;
rx.await.map_err(to_zbus_fdo_error)?
}
}; };
} }
@ -81,34 +92,28 @@ pub struct SteamOSManager {
proxy: Proxy<'static>, proxy: Proxy<'static>,
hdmi_cec: HdmiCecControl<'static>, hdmi_cec: HdmiCecControl<'static>,
channel: Sender<Command>, channel: Sender<Command>,
job_manager: JobManager, job_manager: UnboundedSender<JobManagerCommand>,
} }
impl SteamOSManager { impl SteamOSManager {
pub async fn new( pub async fn new(
connection: Connection, connection: Connection,
system_conn: &Connection, system_conn: Connection,
channel: Sender<Command>, channel: Sender<Command>,
job_manager: UnboundedSender<JobManagerCommand>,
) -> Result<Self> { ) -> Result<Self> {
let hdmi_cec = HdmiCecControl::new(&connection).await?; let hdmi_cec = HdmiCecControl::new(&connection).await?;
let mut job_manager = JobManager::new(connection).await?; let proxy = Builder::new(&system_conn)
if let Err(e) = job_manager.mirror_connection(system_conn).await {
warn!("Could not mirror jobs: {e}");
match e {
fdo::Error::ServiceUnknown(_) => (),
e => Err(e)?,
}
}
Ok(SteamOSManager {
hdmi_cec,
proxy: Builder::new(system_conn)
.destination("com.steampowered.SteamOSManager1")? .destination("com.steampowered.SteamOSManager1")?
.path("/com/steampowered/SteamOSManager1")? .path("/com/steampowered/SteamOSManager1")?
.interface("com.steampowered.SteamOSManager1.RootManager")? .interface("com.steampowered.SteamOSManager1.RootManager")?
.cache_properties(CacheProperties::No) .cache_properties(CacheProperties::No)
.build() .build()
.await?, .await?;
job_manager.send(JobManagerCommand::MirrorConnection(system_conn))?;
Ok(SteamOSManager {
hdmi_cec,
proxy,
job_manager, job_manager,
channel, channel,
}) })
@ -395,6 +400,7 @@ mod test {
use std::iter::zip; use std::iter::zip;
use std::time::Duration; use std::time::Duration;
use tokio::fs::read; use tokio::fs::read;
use tokio::sync::mpsc::unbounded_channel;
use tokio::time::sleep; use tokio::time::sleep;
use zbus::{Connection, Interface}; use zbus::{Connection, Interface};
use zbus_xml::{Method, Node, Property}; use zbus_xml::{Method, Node, Property};
@ -406,9 +412,11 @@ mod test {
async fn start() -> Result<TestHandle> { async fn start() -> Result<TestHandle> {
let mut handle = testing::start(); let mut handle = testing::start();
let (tx, _rx) = channel::<UserContext>(); let (tx_ctx, _rx_ctx) = channel::<UserContext>();
let (tx_job, _rx_job) = unbounded_channel::<JobManagerCommand>();
let connection = handle.new_dbus().await?; let connection = handle.new_dbus().await?;
let manager = SteamOSManager::new(connection.clone(), &connection, tx).await?; let manager =
SteamOSManager::new(connection.clone(), connection.clone(), tx_ctx, tx_job).await?;
connection connection
.object_server() .object_server()
.at("/com/steampowered/SteamOSManager1", manager) .at("/com/steampowered/SteamOSManager1", manager)