diff --git a/src/daemon/user.rs b/src/daemon/user.rs index 4ebf25d..0d9a2b1 100644 --- a/src/daemon/user.rs +++ b/src/daemon/user.rs @@ -8,7 +8,7 @@ use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{unbounded_channel, Sender}; use tracing::error; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, Registry}; @@ -18,9 +18,11 @@ use zbus::connection::Connection; use zbus::ConnectionBuilder; use crate::daemon::{channel, Daemon, DaemonCommand, DaemonContext}; +use crate::job::{JobManager, JobManagerService}; use crate::manager::user::SteamOSManager; use crate::path; use crate::udev::UdevMonitor; +use crate::Service; #[derive(Copy, Clone, Default, Deserialize, Debug)] #[serde(default)] @@ -101,20 +103,25 @@ impl DaemonContext for UserContext { pub(crate) type Command = DaemonCommand<()>; -async fn create_connections(channel: Sender) -> Result<(Connection, Connection)> { +async fn create_connections( + channel: Sender, +) -> Result<(Connection, Connection, impl Service)> { let system = Connection::system().await?; let connection = ConnectionBuilder::session()? .name("com.steampowered.SteamOSManager1")? .build() .await?; - let manager = SteamOSManager::new(connection.clone(), &system, channel).await?; + let (tx, rx) = unbounded_channel(); + let job_manager = JobManager::new(connection.clone()).await?; + let manager = SteamOSManager::new(connection.clone(), system.clone(), channel, tx).await?; + let service = JobManagerService::new(job_manager, rx, system.clone()); connection .object_server() .at("/com/steampowered/SteamOSManager1", manager) .await?; - Ok((connection, system)) + Ok((connection, system, service)) } pub async fn daemon() -> Result<()> { @@ -125,7 +132,7 @@ pub async fn daemon() -> Result<()> { let subscriber = Registry::default().with(stdout_log); let (tx, rx) = channel::(); - let (session, system) = match create_connections(tx).await { + let (session, system, mirror_service) = match create_connections(tx).await { Ok(c) => c, Err(e) => { let _guard = tracing::subscriber::set_default(subscriber); @@ -137,5 +144,7 @@ pub async fn daemon() -> Result<()> { let context = UserContext { session }; let mut daemon = Daemon::new(subscriber, system, rx).await?; + daemon.add_service(mirror_service); + daemon.run(context).await } diff --git a/src/job.rs b/src/job.rs index d1521d9..6da15ac 100644 --- a/src/job.rs +++ b/src/job.rs @@ -5,24 +5,28 @@ * SPDX-License-Identifier: MIT */ -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use libc::pid_t; use nix::sys::signal; use nix::sys::signal::Signal; use nix::unistd::Pid; use std::collections::HashMap; -use std::ffi::OsStr; +use std::ffi::{OsStr, OsString}; use std::io::Cursor; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; use tokio::process::{Child, Command}; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::oneshot; +use tokio_stream::StreamExt; use tracing::error; use zbus::fdo::{self, IntrospectableProxy}; use zbus::{interface, zvariant, Connection, Interface, InterfaceRef, SignalContext}; use zbus_xml::Node; use crate::error::{to_zbus_fdo_error, zbus_to_zbus_fdo}; -use crate::proxy::JobProxy; +use crate::proxy::{JobManagerProxy, JobProxy}; +use crate::Service; const JOB_PREFIX: &str = "/com/steampowered/SteamOSManager1/Jobs"; @@ -43,10 +47,32 @@ struct Job { struct JobManagerInterface {} +pub struct JobManagerService { + job_manager: JobManager, + channel: UnboundedReceiver, + connection: Connection, +} + struct MirroredJob { job: JobProxy<'static>, } +pub enum JobManagerCommand { + MirrorConnection(Connection), + MirrorJob { + connection: Connection, + path: zvariant::OwnedObjectPath, + reply: oneshot::Sender>, + }, + #[allow(unused)] + RunProcess { + executable: String, + args: Vec, + operation_name: String, + reply: oneshot::Sender>, + }, +} + impl JobManager { pub async fn new(connection: Connection) -> Result { let jm_iface = JobManagerInterface {}; @@ -278,15 +304,87 @@ impl MirroredJob { } } +impl JobManagerService { + pub fn new( + job_manager: JobManager, + channel: UnboundedReceiver, + connection: Connection, + ) -> JobManagerService { + JobManagerService { + job_manager, + channel, + connection, + } + } + + async fn handle_command(&mut self, command: JobManagerCommand) -> Result<()> { + match command { + JobManagerCommand::MirrorConnection(connection) => { + self.job_manager.mirror_connection(&connection).await? + } + JobManagerCommand::MirrorJob { + connection, + path, + reply, + } => { + let path = self.job_manager.mirror_job(&connection, path).await; + reply + .send(path) + .map_err(|e| anyhow!("Failed to send reply {e:?}"))?; + } + JobManagerCommand::RunProcess { + executable, + args, + operation_name, + reply, + } => { + let path = self + .job_manager + .run_process(&executable, &args, &operation_name) + .await; + reply + .send(path) + .map_err(|e| anyhow!("Failed to send reply {e:?}"))?; + } + } + Ok(()) + } +} + +impl Service for JobManagerService { + const NAME: &'static str = "job-manager"; + + async fn run(&mut self) -> Result<()> { + let jm = JobManagerProxy::new(&self.connection).await?; + let mut stream = jm.receive_job_started().await?; + + loop { + tokio::select! { + Some(job) = stream.next() => { + let path = job.args()?.job; + self.job_manager + .mirror_job(&self.connection, path) + .await?; + }, + message = self.channel.recv() => { + let message = match message { + None => bail!("Job manager service channel broke"), + Some(message) => message, + }; + self.handle_command(message).await.inspect_err(|e| error!("Failed to handle command: {e}"))?; + }, + } + } + } +} + #[cfg(test)] pub(crate) mod test { use super::*; - use crate::proxy::JobManagerProxy; use crate::testing; use anyhow::anyhow; use nix::sys::signal::Signal; use tokio::sync::oneshot; - use tokio_stream::StreamExt; use zbus::ConnectionBuilder; #[tokio::test] diff --git a/src/manager/user.rs b/src/manager/user.rs index dcfc22e..a7e6172 100644 --- a/src/manager/user.rs +++ b/src/manager/user.rs @@ -8,8 +8,9 @@ use anyhow::Result; use std::collections::HashMap; -use tokio::sync::mpsc::Sender; -use tracing::{error, warn}; +use tokio::sync::mpsc::{Sender, UnboundedSender}; +use tokio::sync::oneshot; +use tracing::error; use zbus::proxy::Builder; use zbus::zvariant::{self, Fd}; use zbus::{fdo, interface, CacheProperties, Connection, Proxy, SignalContext}; @@ -19,7 +20,7 @@ use crate::daemon::user::Command; use crate::daemon::DaemonCommand; use crate::error::{to_zbus_error, to_zbus_fdo_error, zbus_to_zbus_fdo}; use crate::hardware::check_support; -use crate::job::JobManager; +use crate::job::JobManagerCommand; use crate::power::{ get_available_cpu_scaling_governors, get_cpu_scaling_governor, get_gpu_clocks, get_gpu_performance_level, get_gpu_power_profile, get_gpu_power_profiles, get_tdp_limit, @@ -44,16 +45,26 @@ macro_rules! method { macro_rules! job_method { ($self:expr, $method:expr, $($args:expr),+) => { - $self.job_manager.mirror_job::( - $self.proxy.connection(), - method!($self, $method, $($args),+)? - ).await + { + let (tx, rx) = oneshot::channel(); + $self.job_manager.send(JobManagerCommand::MirrorJob { + connection: $self.proxy.connection().clone(), + path: method!($self, $method, $($args),+)?, + reply: tx, + }).map_err(to_zbus_fdo_error)?; + rx.await.map_err(to_zbus_fdo_error)? + } }; ($self:expr, $method:expr) => { - $self.job_manager.mirror_job::( - $self.proxy.connection(), - method!($self, $method)? - ).await + { + let (tx, rx) = oneshot::channel(); + $self.job_manager.send(JobManagerCommand::MirrorJob { + connection: $self.proxy.connection().clone(), + path: method!($self, $method)?, + reply: tx, + }).map_err(to_zbus_fdo_error)?; + rx.await.map_err(to_zbus_fdo_error)? + } }; } @@ -81,34 +92,28 @@ pub struct SteamOSManager { proxy: Proxy<'static>, hdmi_cec: HdmiCecControl<'static>, channel: Sender, - job_manager: JobManager, + job_manager: UnboundedSender, } impl SteamOSManager { pub async fn new( connection: Connection, - system_conn: &Connection, + system_conn: Connection, channel: Sender, + job_manager: UnboundedSender, ) -> Result { let hdmi_cec = HdmiCecControl::new(&connection).await?; - let mut job_manager = JobManager::new(connection).await?; - if let Err(e) = job_manager.mirror_connection(system_conn).await { - warn!("Could not mirror jobs: {e}"); - match e { - fdo::Error::ServiceUnknown(_) => (), - e => Err(e)?, - } - } - + let proxy = Builder::new(&system_conn) + .destination("com.steampowered.SteamOSManager1")? + .path("/com/steampowered/SteamOSManager1")? + .interface("com.steampowered.SteamOSManager1.RootManager")? + .cache_properties(CacheProperties::No) + .build() + .await?; + job_manager.send(JobManagerCommand::MirrorConnection(system_conn))?; Ok(SteamOSManager { hdmi_cec, - proxy: Builder::new(system_conn) - .destination("com.steampowered.SteamOSManager1")? - .path("/com/steampowered/SteamOSManager1")? - .interface("com.steampowered.SteamOSManager1.RootManager")? - .cache_properties(CacheProperties::No) - .build() - .await?, + proxy, job_manager, channel, }) @@ -395,6 +400,7 @@ mod test { use std::iter::zip; use std::time::Duration; use tokio::fs::read; + use tokio::sync::mpsc::unbounded_channel; use tokio::time::sleep; use zbus::{Connection, Interface}; use zbus_xml::{Method, Node, Property}; @@ -406,9 +412,11 @@ mod test { async fn start() -> Result { let mut handle = testing::start(); - let (tx, _rx) = channel::(); + let (tx_ctx, _rx_ctx) = channel::(); + let (tx_job, _rx_job) = unbounded_channel::(); let connection = handle.new_dbus().await?; - let manager = SteamOSManager::new(connection.clone(), &connection, tx).await?; + let manager = + SteamOSManager::new(connection.clone(), connection.clone(), tx_ctx, tx_job).await?; connection .object_server() .at("/com/steampowered/SteamOSManager1", manager)