From b582d51c90f2324b38e97651f7734fb645442e7d Mon Sep 17 00:00:00 2001 From: Vicki Pfau Date: Fri, 24 May 2024 17:21:23 -0700 Subject: [PATCH] daemon: Add message channel for sending commands to the daemon object --- src/daemon/mod.rs | 33 ++++++++++++++++++++++++++++++++- src/daemon/root.rs | 5 +++-- src/daemon/user.rs | 5 +++-- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index a826e2c..0cda9bd 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -11,6 +11,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::path::PathBuf; use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -18,7 +19,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::LookupSpan; use zbus::connection::Connection; -use crate::daemon::config::{read_config, read_state}; +use crate::daemon::config::{read_config, read_state, write_state}; use crate::sls::{LogLayer, LogReceiver}; use crate::Service; @@ -55,13 +56,21 @@ pub(crate) trait DaemonContext: Sized { pub(crate) struct Daemon { services: JoinSet>, token: CancellationToken, + channel: Receiver, _context: PhantomData, } +#[derive(Debug)] +pub(crate) enum DaemonCommand { + ReadConfig, + WriteState, +} + impl Daemon { pub(crate) async fn new LookupSpan<'a>>( subscriber: S, connection: Connection, + channel: Receiver, ) -> Result> { let services = JoinSet::new(); let token = CancellationToken::new(); @@ -74,6 +83,7 @@ impl Daemon { let mut daemon = Daemon { services, token, + channel, _context: PhantomData::default(), }; daemon.add_service(log_receiver); @@ -128,6 +138,10 @@ impl Daemon { } None => Err(anyhow!("SIGHUP machine broke")), }, + msg = self.channel.recv() => match msg { + Some(msg) => self.handle_message(&mut context, msg).await, + None => Err(anyhow!("All senders have been closed")), + }, _ = sigquit.recv() => Err(anyhow!("Got SIGQUIT")), } .inspect_err(|e| error!("Encountered error running: {e}")); @@ -150,4 +164,21 @@ impl Daemon { res.inspect_err(|e| error!("Encountered error: {e}")) } + + async fn handle_message(&mut self, context: &mut C, cmd: DaemonCommand) -> Result<()> { + match cmd { + DaemonCommand::ReadConfig => match read_config(context).await { + Ok(config) => context.reload(config, self).await, + Err(error) => { + error!("Failed to load configuration: {error}"); + Ok(()) + } + }, + DaemonCommand::WriteState => write_state(context).await, + } + } +} + +pub(crate) fn channel() -> (Sender, Receiver) { + mpsc::channel(10) } diff --git a/src/daemon/root.rs b/src/daemon/root.rs index 1a8f621..18dbf6f 100644 --- a/src/daemon/root.rs +++ b/src/daemon/root.rs @@ -14,7 +14,7 @@ use tracing_subscriber::{fmt, Registry}; use zbus::connection::Connection; use zbus::ConnectionBuilder; -use crate::daemon::{Daemon, DaemonContext}; +use crate::daemon::{channel, Daemon, DaemonContext}; use crate::ds_inhibit::Inhibitor; use crate::manager::root::SteamOSManager; use crate::path; @@ -93,6 +93,7 @@ pub async fn daemon() -> Result<()> { let stdout_log = fmt::layer(); let subscriber = Registry::default().with(stdout_log); + let (_tx, rx) = channel(); let connection = match create_connection().await { Ok(c) => c, @@ -104,7 +105,7 @@ pub async fn daemon() -> Result<()> { }; let context = RootContext {}; - let mut daemon = Daemon::new(subscriber, connection.clone()).await?; + let mut daemon = Daemon::new(subscriber, connection.clone(), rx).await?; let ftrace = Ftrace::init(connection.clone()).await?; daemon.add_service(ftrace); diff --git a/src/daemon/user.rs b/src/daemon/user.rs index 18eb64f..9ea7ee7 100644 --- a/src/daemon/user.rs +++ b/src/daemon/user.rs @@ -16,7 +16,7 @@ use xdg::BaseDirectories; use zbus::connection::Connection; use zbus::ConnectionBuilder; -use crate::daemon::{Daemon, DaemonContext}; +use crate::daemon::{channel, Daemon, DaemonContext}; use crate::manager::user::SteamOSManager; use crate::path; @@ -103,6 +103,7 @@ pub async fn daemon() -> Result<()> { let stdout_log = fmt::layer(); let subscriber = Registry::default().with(stdout_log); + let (_tx, rx) = channel(); let (_session, system) = match create_connections().await { Ok(c) => c, @@ -114,7 +115,7 @@ pub async fn daemon() -> Result<()> { }; let context = UserContext {}; - let mut daemon = Daemon::new(subscriber, system).await?; + let mut daemon = Daemon::new(subscriber, system, rx).await?; daemon.run(context).await }