Merge branch 'endrift/sysfs-writer' into 'master'

Add sysfs writer task

See merge request holo/steamos-manager!11
This commit is contained in:
Vicki Pfau 2025-06-18 16:30:33 -07:00
commit f65b49c3de
5 changed files with 182 additions and 34 deletions

View file

@ -21,6 +21,7 @@ use crate::ds_inhibit::Inhibitor;
use crate::inputplumber::DeckService; use crate::inputplumber::DeckService;
use crate::manager::root::SteamOSManager; use crate::manager::root::SteamOSManager;
use crate::path; use crate::path;
use crate::power::SysfsWriterService;
use crate::sls::ftrace::Ftrace; use crate::sls::ftrace::Ftrace;
#[derive(Copy, Clone, Default, Deserialize, Debug)] #[derive(Copy, Clone, Default, Deserialize, Debug)]
@ -127,6 +128,9 @@ impl DaemonContext for RootContext {
let ip = DeckService::init(connection); let ip = DeckService::init(connection);
daemon.add_service(ip); daemon.add_service(ip);
let sysfs = SysfsWriterService::init()?;
daemon.add_service(sysfs);
self.reload_ds_inhibit(daemon).await?; self.reload_ds_inhibit(daemon).await?;
Ok(()) Ok(())

View file

@ -21,7 +21,7 @@ use zbus::fdo::ObjectManager;
use crate::daemon::{channel, Daemon, DaemonCommand, DaemonContext}; use crate::daemon::{channel, Daemon, DaemonCommand, DaemonContext};
use crate::job::{JobManager, JobManagerService}; use crate::job::{JobManager, JobManagerService};
use crate::manager::user::create_interfaces; use crate::manager::user::{create_interfaces, SignalRelayService};
use crate::path; use crate::path;
use crate::power::TdpManagerService; use crate::power::TdpManagerService;
use crate::udev::UdevMonitor; use crate::udev::UdevMonitor;
@ -114,6 +114,7 @@ async fn create_connections(
Connection, Connection,
JobManagerService, JobManagerService,
Result<TdpManagerService>, Result<TdpManagerService>,
SignalRelayService,
)> { )> {
let system = Connection::system().await?; let system = Connection::system().await?;
let connection = Builder::session()? let connection = Builder::session()?
@ -133,9 +134,16 @@ async fn create_connections(
None None
}; };
create_interfaces(connection.clone(), system.clone(), channel, jm_tx, tdp_tx).await?; let signal_relay_service =
create_interfaces(connection.clone(), system.clone(), channel, jm_tx, tdp_tx).await?;
Ok((connection, system, jm_service, tdp_service)) Ok((
connection,
system,
jm_service,
tdp_service,
signal_relay_service,
))
} }
pub async fn daemon() -> Result<()> { pub async fn daemon() -> Result<()> {
@ -146,20 +154,22 @@ pub async fn daemon() -> Result<()> {
let subscriber = Registry::default().with(stdout_log); let subscriber = Registry::default().with(stdout_log);
let (tx, rx) = channel::<UserContext>(); let (tx, rx) = channel::<UserContext>();
let (session, system, mirror_service, tdp_service) = match create_connections(tx).await { let (session, system, mirror_service, tdp_service, signal_relay_service) =
Ok(c) => c, match create_connections(tx).await {
Err(e) => { Ok(c) => c,
let _guard = tracing::subscriber::set_default(subscriber); Err(e) => {
error!("Error connecting to DBus: {}", e); let _guard = tracing::subscriber::set_default(subscriber);
bail!(e); error!("Error connecting to DBus: {}", e);
} bail!(e);
}; }
};
let context = UserContext { let context = UserContext {
session: session.clone(), session: session.clone(),
}; };
let mut daemon = Daemon::new(subscriber, system, rx).await?; let mut daemon = Daemon::new(subscriber, system, rx).await?;
daemon.add_service(signal_relay_service);
daemon.add_service(mirror_service); daemon.add_service(mirror_service);
if let Ok(tdp_service) = tdp_service { if let Ok(tdp_service) = tdp_service {
daemon.add_service(tdp_service); daemon.add_service(tdp_service);

View file

@ -10,6 +10,7 @@ use anyhow::{anyhow, Result};
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::OsStr; use std::ffi::OsStr;
use tokio::fs::File; use tokio::fs::File;
use tokio::spawn;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::{error, info}; use tracing::{error, info};
@ -29,7 +30,7 @@ use crate::platform::platform_config;
use crate::power::{ use crate::power::{
set_cpu_scaling_governor, set_gpu_clocks, set_gpu_performance_level, set_gpu_power_profile, set_cpu_scaling_governor, set_gpu_clocks, set_gpu_performance_level, set_gpu_power_profile,
set_max_charge_level, set_platform_profile, tdp_limit_manager, CPUScalingGovernor, set_max_charge_level, set_platform_profile, tdp_limit_manager, CPUScalingGovernor,
GPUPerformanceLevel, GPUPowerProfile, TdpLimitManager, GPUPerformanceLevel, GPUPowerProfile, SysfsWritten, TdpLimitManager,
}; };
use crate::process::{run_script, script_output}; use crate::process::{run_script, script_output};
use crate::wifi::{ use crate::wifi::{
@ -444,10 +445,34 @@ impl SteamOSManager {
.map_err(to_zbus_fdo_error) .map_err(to_zbus_fdo_error)
} }
async fn set_max_charge_level(&self, level: i32) -> fdo::Result<()> { #[zbus(signal)]
set_max_charge_level(if level == -1 { 0 } else { level }) async fn max_charge_level_changed(signal_emitter: &SignalEmitter<'_>) -> zbus::Result<()>;
async fn set_max_charge_level(
&self,
level: i32,
#[zbus(connection)] connection: &Connection,
) -> fdo::Result<()> {
let written = set_max_charge_level(if level == -1 { 0 } else { level })
.await .await
.map_err(to_zbus_fdo_error) .map_err(to_zbus_fdo_error)?;
let connection = connection.clone();
spawn(async move {
match written.await {
Ok(SysfsWritten::Written(res)) => {
if let Ok(interface) = connection
.object_server()
.interface::<_, Self>("/com/steampowered/SteamOSManager1")
.await
{
interface.max_charge_level_changed().await?;
}
res
}
_ => Ok(()),
}
});
Ok(())
} }
async fn set_performance_profile(&self, profile: &str) -> fdo::Result<()> { async fn set_performance_profile(&self, profile: &str) -> fdo::Result<()> {

View file

@ -10,6 +10,7 @@ use anyhow::{Error, Result};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use tracing::error; use tracing::error;
use zbus::object_server::SignalEmitter; use zbus::object_server::SignalEmitter;
use zbus::proxy::{Builder, CacheProperties}; use zbus::proxy::{Builder, CacheProperties};
@ -35,7 +36,7 @@ use crate::screenreader::{OrcaManager, ScreenReaderAction, ScreenReaderMode};
use crate::wifi::{ use crate::wifi::{
get_wifi_backend, get_wifi_power_management_state, list_wifi_interfaces, WifiBackend, get_wifi_backend, get_wifi_power_management_state, list_wifi_interfaces, WifiBackend,
}; };
use crate::API_VERSION; use crate::{Service, API_VERSION};
pub(crate) const MANAGER_PATH: &str = "/com/steampowered/SteamOSManager1"; pub(crate) const MANAGER_PATH: &str = "/com/steampowered/SteamOSManager1";
@ -185,6 +186,11 @@ struct WifiPowerManagement1 {
proxy: Proxy<'static>, proxy: Proxy<'static>,
} }
pub(crate) struct SignalRelayService {
proxy: Proxy<'static>,
session: Connection,
}
impl SteamOSManager { impl SteamOSManager {
pub async fn new( pub async fn new(
system_conn: Connection, system_conn: Connection,
@ -275,13 +281,8 @@ impl BatteryChargeLimit1 {
} }
#[zbus(property)] #[zbus(property)]
async fn set_max_charge_level( async fn set_max_charge_level(&self, limit: i32) -> zbus::Result<()> {
&self, self.proxy.call("SetMaxChargeLevel", &(limit)).await
limit: i32,
#[zbus(signal_emitter)] ctx: SignalEmitter<'_>,
) -> zbus::Result<()> {
let _: () = self.proxy.call("SetMaxChargeLevel", &(limit)).await?;
self.max_charge_level_changed(&ctx).await
} }
#[zbus(property(emits_changed_signal = "const"))] #[zbus(property(emits_changed_signal = "const"))]
@ -885,6 +886,33 @@ impl WifiPowerManagement1 {
} }
} }
impl Service for SignalRelayService {
const NAME: &'static str = "signal-relay";
async fn run(&mut self) -> Result<()> {
let Ok(battery_charge_limit) = self
.session
.object_server()
.interface::<_, BatteryChargeLimit1>(MANAGER_PATH)
.await
else {
return Ok(());
};
let ctx = battery_charge_limit.signal_emitter();
let mut max_charge_level_changed =
self.proxy.receive_signal("MaxChargeLevelChanged").await?;
loop {
max_charge_level_changed.next().await;
battery_charge_limit
.get()
.await
.max_charge_level_changed(ctx)
.await?;
}
}
}
async fn create_platform_interfaces( async fn create_platform_interfaces(
proxy: &Proxy<'static>, proxy: &Proxy<'static>,
object_server: &ObjectServer, object_server: &ObjectServer,
@ -1025,7 +1053,7 @@ pub(crate) async fn create_interfaces(
daemon: Sender<Command>, daemon: Sender<Command>,
job_manager: UnboundedSender<JobManagerCommand>, job_manager: UnboundedSender<JobManagerCommand>,
tdp_manager: Option<UnboundedSender<TdpManagerCommand>>, tdp_manager: Option<UnboundedSender<TdpManagerCommand>>,
) -> Result<()> { ) -> Result<SignalRelayService> {
let proxy = Builder::<Proxy>::new(&system) let proxy = Builder::<Proxy>::new(&system)
.destination("com.steampowered.SteamOSManager1")? .destination("com.steampowered.SteamOSManager1")?
.path("/com/steampowered/SteamOSManager1")? .path("/com/steampowered/SteamOSManager1")?
@ -1122,7 +1150,7 @@ pub(crate) async fn create_interfaces(
.await?; .await?;
} }
Ok(()) Ok(SignalRelayService { session, proxy })
} }
#[cfg(test)] #[cfg(test)]

View file

@ -17,12 +17,13 @@ use std::ops::RangeInclusive;
use std::os::fd::OwnedFd; use std::os::fd::OwnedFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use strum::{Display, EnumString, VariantNames}; use strum::{Display, EnumString, VariantNames};
use tokio::fs::{self, try_exists, File}; use tokio::fs::{self, try_exists, File};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest};
use tokio::net::unix::pipe; use tokio::net::unix::pipe;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot; use tokio::sync::{oneshot, Mutex, Notify, OnceCell};
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use zbus::Connection; use zbus::Connection;
@ -61,6 +62,8 @@ lazy_static! {
Regex::new(r"^\s*(?<index>[0-9]+): (?<value>[0-9]+)Mhz").unwrap(); Regex::new(r"^\s*(?<index>[0-9]+): (?<value>[0-9]+)Mhz").unwrap();
} }
static SYSFS_WRITER: OnceCell<Arc<SysfsWriterQueue>> = OnceCell::const_new();
#[derive(Display, EnumString, PartialEq, Debug, Copy, Clone, TryFromPrimitive)] #[derive(Display, EnumString, PartialEq, Debug, Copy, Clone, TryFromPrimitive)]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
#[repr(u32)] #[repr(u32)]
@ -168,6 +171,79 @@ pub(crate) enum TdpManagerCommand {
ListDownloadModeHandles(oneshot::Sender<HashMap<String, u32>>), ListDownloadModeHandles(oneshot::Sender<HashMap<String, u32>>),
} }
#[derive(Debug)]
pub(crate) enum SysfsWritten {
Written(Result<()>),
Superseded,
}
#[derive(Debug)]
struct SysfsWriterQueue {
values: Mutex<HashMap<PathBuf, (Vec<u8>, oneshot::Sender<SysfsWritten>)>>,
notify: Notify,
}
impl SysfsWriterQueue {
fn new() -> SysfsWriterQueue {
SysfsWriterQueue {
values: Mutex::new(HashMap::new()),
notify: Notify::new(),
}
}
async fn send(&self, path: PathBuf, contents: Vec<u8>) -> oneshot::Receiver<SysfsWritten> {
let (tx, rx) = oneshot::channel();
if let Some((_, old_tx)) = self.values.lock().await.insert(path, (contents, tx)) {
let _ = old_tx.send(SysfsWritten::Superseded);
}
self.notify.notify_one();
rx
}
async fn recv(&self) -> Option<(PathBuf, Vec<u8>, oneshot::Sender<SysfsWritten>)> {
// Take an arbitrary file from the map
self.notify.notified().await;
let mut values = self.values.lock().await;
if let Some(path) = values.keys().next().cloned() {
values
.remove_entry(&path)
.map(|(path, (contents, tx))| (path, contents, tx))
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) struct SysfsWriterService {
queue: Arc<SysfsWriterQueue>,
}
impl SysfsWriterService {
pub fn init() -> Result<SysfsWriterService> {
ensure!(!SYSFS_WRITER.initialized(), "sysfs writer already active");
let queue = Arc::new(SysfsWriterQueue::new());
SYSFS_WRITER.set(queue.clone())?;
Ok(SysfsWriterService { queue })
}
}
impl Service for SysfsWriterService {
const NAME: &'static str = "sysfs-writer";
async fn run(&mut self) -> Result<()> {
loop {
let Some((path, contents, tx)) = self.queue.recv().await else {
continue;
};
let res = write_synced(path, &contents)
.await
.inspect_err(|message| error!("Error writing to sysfs file: {message}"));
let _ = tx.send(SysfsWritten::Written(res));
}
}
}
async fn read_gpu_sysfs_contents<S: AsRef<Path>>(suffix: S) -> Result<String> { async fn read_gpu_sysfs_contents<S: AsRef<Path>>(suffix: S) -> Result<String> {
// Read a given suffix for the GPU // Read a given suffix for the GPU
let base = find_hwmon(GPU_HWMON_NAME).await?; let base = find_hwmon(GPU_HWMON_NAME).await?;
@ -605,7 +681,7 @@ pub(crate) async fn get_max_charge_level() -> Result<i32> {
.map_err(|e| anyhow!("Error parsing value: {e}")) .map_err(|e| anyhow!("Error parsing value: {e}"))
} }
pub(crate) async fn set_max_charge_level(limit: i32) -> Result<()> { pub(crate) async fn set_max_charge_level(limit: i32) -> Result<oneshot::Receiver<SysfsWritten>> {
ensure!((0..=100).contains(&limit), "Invalid limit"); ensure!((0..=100).contains(&limit), "Invalid limit");
let data = limit.to_string(); let data = limit.to_string();
let config = device_config().await?; let config = device_config().await?;
@ -615,9 +691,14 @@ pub(crate) async fn set_max_charge_level(limit: i32) -> Result<()> {
.ok_or(anyhow!("No battery charge limit configured"))?; .ok_or(anyhow!("No battery charge limit configured"))?;
let base = find_hwmon(config.hwmon_name.as_str()).await?; let base = find_hwmon(config.hwmon_name.as_str()).await?;
write_synced(base.join(config.attribute.as_str()), data.as_bytes()) Ok(SYSFS_WRITER
.await .get()
.inspect_err(|message| error!("Error writing to sysfs file: {message}")) .ok_or(anyhow!("sysfs writer not running"))?
.send(
base.join(config.attribute.clone()),
data.as_bytes().to_owned(),
)
.await)
} }
pub(crate) async fn get_available_platform_profiles(name: &str) -> Result<Vec<String>> { pub(crate) async fn get_available_platform_profiles(name: &str) -> Result<Vec<String>> {
@ -1574,11 +1655,11 @@ CCLK_RANGE in Core0:
assert_eq!(get_max_charge_level().await.unwrap(), 10); assert_eq!(get_max_charge_level().await.unwrap(), 10);
set_max_charge_level(99).await.expect("set"); write(base.join("max_battery_charge_level"), "99\n")
assert_eq!(get_max_charge_level().await.unwrap(), 99); .await
.expect("write");
set_max_charge_level(0).await.expect("set"); assert_eq!(get_max_charge_level().await.unwrap(), 99);
assert_eq!(get_max_charge_level().await.unwrap(), 0);
assert!(set_max_charge_level(101).await.is_err()); assert!(set_max_charge_level(101).await.is_err());
assert!(set_max_charge_level(-1).await.is_err()); assert!(set_max_charge_level(-1).await.is_err());