From 1a69cce50b52a339281ab4f9bcb6f27a425c6541 Mon Sep 17 00:00:00 2001 From: Vicki Pfau Date: Wed, 22 May 2024 21:04:13 -0700 Subject: [PATCH] daemon: Allow context-specific commands on the message channel --- src/daemon/mod.rs | 37 ++++++++++++++++++++++++++----------- src/daemon/root.rs | 14 +++++++++++++- src/daemon/user.rs | 12 +++++++++++- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 0cda9bd..f53464f 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -8,7 +8,6 @@ use anyhow::{anyhow, ensure, Result}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use std::marker::PhantomData; use std::path::PathBuf; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -24,8 +23,8 @@ use crate::sls::{LogLayer, LogReceiver}; use crate::Service; mod config; -mod root; -mod user; +pub(crate) mod root; +pub(crate) mod user; pub use root::daemon as root; pub use user::daemon as user; @@ -33,6 +32,7 @@ pub use user::daemon as user; pub(crate) trait DaemonContext: Sized { type State: for<'a> Deserialize<'a> + Serialize + Default + Debug; type Config: for<'a> Deserialize<'a> + Default + Debug; + type Command: Send; fn state_path(&self) -> Result { let config_path = self.user_config_path()?; @@ -51,17 +51,20 @@ pub(crate) trait DaemonContext: Sized { ) -> Result<()>; async fn reload(&mut self, config: Self::Config, daemon: &mut Daemon) -> Result<()>; + + async fn handle_command(&mut self, cmd: Self::Command, daemon: &mut Daemon) + -> Result<()>; } pub(crate) struct Daemon { services: JoinSet>, token: CancellationToken, - channel: Receiver, - _context: PhantomData, + channel: Receiver>, } #[derive(Debug)] -pub(crate) enum DaemonCommand { +pub(crate) enum DaemonCommand { + ContextCommand(T), ReadConfig, WriteState, } @@ -70,7 +73,7 @@ impl Daemon { pub(crate) async fn new LookupSpan<'a>>( subscriber: S, connection: Connection, - channel: Receiver, + channel: Receiver>, ) -> Result> { let services = JoinSet::new(); let token = CancellationToken::new(); @@ -84,7 +87,6 @@ impl Daemon { services, token, channel, - _context: PhantomData::default(), }; daemon.add_service(log_receiver); @@ -139,7 +141,9 @@ impl Daemon { None => Err(anyhow!("SIGHUP machine broke")), }, msg = self.channel.recv() => match msg { - Some(msg) => self.handle_message(&mut context, msg).await, + Some(msg) => { + self.handle_message(msg, &mut context).await + } None => Err(anyhow!("All senders have been closed")), }, _ = sigquit.recv() => Err(anyhow!("Got SIGQUIT")), @@ -165,8 +169,13 @@ impl Daemon { res.inspect_err(|e| error!("Encountered error: {e}")) } - async fn handle_message(&mut self, context: &mut C, cmd: DaemonCommand) -> Result<()> { + async fn handle_message( + &mut self, + cmd: DaemonCommand, + context: &mut C, + ) -> Result<()> { match cmd { + DaemonCommand::ContextCommand(cmd) => context.handle_command(cmd, self).await, DaemonCommand::ReadConfig => match read_config(context).await { Ok(config) => context.reload(config, self).await, Err(error) => { @@ -179,6 +188,12 @@ impl Daemon { } } -pub(crate) fn channel() -> (Sender, Receiver) { +// Rust doesn't support a good way to simplify this type yet +// See +#[allow(clippy::type_complexity)] +pub(crate) fn channel() -> ( + Sender>, + Receiver>, +) { mpsc::channel(10) } diff --git a/src/daemon/root.rs b/src/daemon/root.rs index 18dbf6f..fb3c532 100644 --- a/src/daemon/root.rs +++ b/src/daemon/root.rs @@ -36,11 +36,15 @@ pub(crate) struct RootState { #[derive(Copy, Clone, Default, Deserialize, Serialize, Debug)] pub(crate) struct RootServicesState {} +#[derive(Debug)] +pub(crate) enum RootCommand {} + struct RootContext {} impl DaemonContext for RootContext { type State = RootState; type Config = RootConfig; + type Command = RootCommand; fn user_config_path(&self) -> Result { Ok(path("/etc/steamos-manager")) @@ -72,6 +76,14 @@ impl DaemonContext for RootContext { // Nothing to do yet Ok(()) } + + async fn handle_command( + &mut self, + _cmd: RootCommand, + _daemon: &mut Daemon, + ) -> Result<()> { + Ok(()) + } } async fn create_connection() -> Result { @@ -93,7 +105,7 @@ pub async fn daemon() -> Result<()> { let stdout_log = fmt::layer(); let subscriber = Registry::default().with(stdout_log); - let (_tx, rx) = channel(); + let (_tx, rx) = channel::(); let connection = match create_connection().await { Ok(c) => c, diff --git a/src/daemon/user.rs b/src/daemon/user.rs index 9ea7ee7..2be5e37 100644 --- a/src/daemon/user.rs +++ b/src/daemon/user.rs @@ -41,6 +41,7 @@ struct UserContext {} impl DaemonContext for UserContext { type State = UserState; type Config = UserConfig; + type Command = (); #[cfg(not(test))] fn user_config_path(&self) -> Result { @@ -79,6 +80,15 @@ impl DaemonContext for UserContext { // Nothing to do yet Ok(()) } + + async fn handle_command( + &mut self, + _cmd: Self::Command, + _daemon: &mut Daemon, + ) -> Result<()> { + // Nothing to do yet + Ok(()) + } } async fn create_connections() -> Result<(Connection, Connection)> { @@ -103,7 +113,7 @@ pub async fn daemon() -> Result<()> { let stdout_log = fmt::layer(); let subscriber = Registry::default().with(stdout_log); - let (_tx, rx) = channel(); + let (_tx, rx) = channel::(); let (_session, system) = match create_connections().await { Ok(c) => c,