daemon: Add message channel for sending commands to the daemon object

This commit is contained in:
Vicki Pfau 2024-05-24 17:21:23 -07:00 committed by Jeremy Whiting
parent c6113ee739
commit b582d51c90
3 changed files with 38 additions and 5 deletions

View file

@ -11,6 +11,7 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::PathBuf;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
@ -18,7 +19,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
use zbus::connection::Connection;
use crate::daemon::config::{read_config, read_state};
use crate::daemon::config::{read_config, read_state, write_state};
use crate::sls::{LogLayer, LogReceiver};
use crate::Service;
@ -55,13 +56,21 @@ pub(crate) trait DaemonContext: Sized {
pub(crate) struct Daemon<C: DaemonContext> {
services: JoinSet<Result<()>>,
token: CancellationToken,
channel: Receiver<DaemonCommand>,
_context: PhantomData<C>,
}
#[derive(Debug)]
pub(crate) enum DaemonCommand {
ReadConfig,
WriteState,
}
impl<C: DaemonContext> Daemon<C> {
pub(crate) async fn new<S: SubscriberExt + Send + Sync + for<'a> LookupSpan<'a>>(
subscriber: S,
connection: Connection,
channel: Receiver<DaemonCommand>,
) -> Result<Daemon<C>> {
let services = JoinSet::new();
let token = CancellationToken::new();
@ -74,6 +83,7 @@ impl<C: DaemonContext> Daemon<C> {
let mut daemon = Daemon {
services,
token,
channel,
_context: PhantomData::default(),
};
daemon.add_service(log_receiver);
@ -128,6 +138,10 @@ impl<C: DaemonContext> Daemon<C> {
}
None => Err(anyhow!("SIGHUP machine broke")),
},
msg = self.channel.recv() => match msg {
Some(msg) => self.handle_message(&mut context, msg).await,
None => Err(anyhow!("All senders have been closed")),
},
_ = sigquit.recv() => Err(anyhow!("Got SIGQUIT")),
}
.inspect_err(|e| error!("Encountered error running: {e}"));
@ -150,4 +164,21 @@ impl<C: DaemonContext> Daemon<C> {
res.inspect_err(|e| error!("Encountered error: {e}"))
}
async fn handle_message(&mut self, context: &mut C, cmd: DaemonCommand) -> Result<()> {
match cmd {
DaemonCommand::ReadConfig => match read_config(context).await {
Ok(config) => context.reload(config, self).await,
Err(error) => {
error!("Failed to load configuration: {error}");
Ok(())
}
},
DaemonCommand::WriteState => write_state(context).await,
}
}
}
pub(crate) fn channel() -> (Sender<DaemonCommand>, Receiver<DaemonCommand>) {
mpsc::channel(10)
}

View file

@ -14,7 +14,7 @@ use tracing_subscriber::{fmt, Registry};
use zbus::connection::Connection;
use zbus::ConnectionBuilder;
use crate::daemon::{Daemon, DaemonContext};
use crate::daemon::{channel, Daemon, DaemonContext};
use crate::ds_inhibit::Inhibitor;
use crate::manager::root::SteamOSManager;
use crate::path;
@ -93,6 +93,7 @@ pub async fn daemon() -> Result<()> {
let stdout_log = fmt::layer();
let subscriber = Registry::default().with(stdout_log);
let (_tx, rx) = channel();
let connection = match create_connection().await {
Ok(c) => c,
@ -104,7 +105,7 @@ pub async fn daemon() -> Result<()> {
};
let context = RootContext {};
let mut daemon = Daemon::new(subscriber, connection.clone()).await?;
let mut daemon = Daemon::new(subscriber, connection.clone(), rx).await?;
let ftrace = Ftrace::init(connection.clone()).await?;
daemon.add_service(ftrace);

View file

@ -16,7 +16,7 @@ use xdg::BaseDirectories;
use zbus::connection::Connection;
use zbus::ConnectionBuilder;
use crate::daemon::{Daemon, DaemonContext};
use crate::daemon::{channel, Daemon, DaemonContext};
use crate::manager::user::SteamOSManager;
use crate::path;
@ -103,6 +103,7 @@ pub async fn daemon() -> Result<()> {
let stdout_log = fmt::layer();
let subscriber = Registry::default().with(stdout_log);
let (_tx, rx) = channel();
let (_session, system) = match create_connections().await {
Ok(c) => c,
@ -114,7 +115,7 @@ pub async fn daemon() -> Result<()> {
};
let context = UserContext {};
let mut daemon = Daemon::new(subscriber, system).await?;
let mut daemon = Daemon::new(subscriber, system, rx).await?;
daemon.run(context).await
}