Merge steamos-workerd in

This commit is contained in:
Vicki Pfau 2024-03-22 19:21:03 -07:00
parent 6f1f1c032c
commit 83e9de9bcb
10 changed files with 1445 additions and 139 deletions

286
src/sls/ftrace.rs Normal file
View file

@ -0,0 +1,286 @@
/* SPDX-License-Identifier: BSD-2-Clause */
use anyhow::{Error, Result};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::unix::pipe;
use tracing::{error, info};
use zbus::connection::Connection;
use zbus::zvariant;
use crate::{get_appid, read_comm, sysbase, Service};
#[zbus::proxy(
interface = "com.steampowered.SteamOSLogSubmitter.Trace",
default_service = "com.steampowered.SteamOSLogSubmitter",
default_path = "/com/steampowered/SteamOSLogSubmitter/helpers/Trace"
)]
trait TraceHelper {
async fn log_event(
&self,
trace: &str,
data: HashMap<&str, zvariant::Value<'_>>,
) -> zbus::Result<()>;
}
pub struct Ftrace
where
Self: 'static,
{
pipe: Option<BufReader<pipe::Receiver>>,
proxy: TraceHelperProxy<'static>,
}
async fn setup_traces(path: &Path) -> Result<()> {
fs::write(path.join("events/oom/mark_victim/enable"), "1").await?;
fs::write(path.join("set_ftrace_filter"), "split_lock_warn").await?;
fs::write(path.join("current_tracer"), "function").await?;
Ok(())
}
impl Ftrace {
pub async fn init(connection: Connection) -> Result<Ftrace> {
let base = Self::base();
let path = Path::new(base.as_str());
fs::create_dir_all(path).await?;
setup_traces(path).await?;
let file = pipe::OpenOptions::new()
.unchecked(true) // Thanks tracefs for making trace_pipe a "regular" file
.open_receiver(path.join("trace_pipe"))?;
Ok(Ftrace {
pipe: Some(BufReader::new(file)),
proxy: TraceHelperProxy::new(&connection).await?,
})
}
fn base() -> String {
sysbase() + "/sys/kernel/tracing/instances/steamos-log-submitter"
}
async fn handle_pid(data: &mut HashMap<&str, zvariant::Value<'_>>, pid: u32) -> Result<()> {
if let Ok(comm) = read_comm(pid) {
info!("├─ comm: {}", comm);
data.insert("comm", zvariant::Value::new(comm));
} else {
info!("├─ comm not found");
}
if let Ok(Some(appid)) = get_appid(pid) {
info!("└─ appid: {}", appid);
data.insert("appid", zvariant::Value::new(appid));
} else {
info!("└─ appid not found");
}
Ok(())
}
async fn handle_event(&mut self, line: &str) -> Result<()> {
info!("Forwarding line {}", line);
let mut data = HashMap::new();
let mut split = line.rsplit(' ');
if let Some(("pid", pid)) = split.next().and_then(|arg| arg.split_once('=')) {
let pid = pid.parse()?;
Ftrace::handle_pid(&mut data, pid).await?;
}
self.proxy.log_event(line, data).await?;
Ok(())
}
}
impl Service for Ftrace {
const NAME: &'static str = "ftrace";
async fn run(&mut self) -> Result<()> {
loop {
let mut string = String::new();
self.pipe
.as_mut()
.ok_or(Error::msg("BUG: trace_pipe missing"))?
.read_line(&mut string)
.await?;
if let Err(e) = self.handle_event(string.trim_end()).await {
error!("Encountered an error handling event: {}", e);
}
}
}
async fn shutdown(&mut self) -> Result<()> {
self.pipe.take();
fs::remove_dir(Self::base()).await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::testing;
use nix::sys::stat::Mode;
use nix::unistd;
use std::cell::Cell;
use std::fs;
use std::path::PathBuf;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
struct MockTrace {
traces: UnboundedSender<(String, HashMap<String, zvariant::OwnedValue>)>,
}
#[zbus::interface(name = "com.steampowered.SteamOSLogSubmitter.Trace")]
impl MockTrace {
fn log_event(
&mut self,
trace: &str,
data: HashMap<&str, zvariant::Value<'_>>,
) -> zbus::fdo::Result<()> {
self.traces.send((
String::from(trace),
HashMap::from_iter(
data.iter()
.map(|(k, v)| (String::from(*k), v.try_to_owned().unwrap())),
),
));
Ok(())
}
}
#[tokio::test]
async fn handle_pid() {
let h = testing::start();
let path = h.test.path();
fs::create_dir_all(path.join("proc/1234")).expect("create_dir_all");
fs::write(path.join("proc/1234/comm"), "ftrace\n").expect("write comm");
fs::write(path.join("proc/1234/environ"), "SteamGameId=5678").expect("write environ");
fs::create_dir_all(path.join("proc/1235")).expect("create_dir_all");
fs::write(path.join("proc/1235/comm"), "ftrace\n").expect("write comm");
fs::create_dir_all(path.join("proc/1236")).expect("create_dir_all");
fs::write(path.join("proc/1236/environ"), "SteamGameId=5678").expect("write environ");
let mut map = HashMap::new();
assert!(Ftrace::handle_pid(&mut map, 1234).await.is_ok());
assert_eq!(
*map.get("comm").expect("comm"),
zvariant::Value::new("ftrace")
);
assert_eq!(
*map.get("appid").expect("appid"),
zvariant::Value::new(5678 as u64)
);
let mut map = HashMap::new();
assert!(Ftrace::handle_pid(&mut map, 1235).await.is_ok());
assert_eq!(
*map.get("comm").expect("comm"),
zvariant::Value::new("ftrace")
);
assert!(map.get("appid").is_none());
let mut map = HashMap::new();
assert!(Ftrace::handle_pid(&mut map, 1236).await.is_ok());
assert!(map.get("comm").is_none());
assert_eq!(
*map.get("appid").expect("appid"),
zvariant::Value::new(5678 as u64)
);
}
#[tokio::test]
async fn ftrace_init() {
let h = testing::start();
let path = h.test.path();
let tracefs = PathBuf::from(Ftrace::base());
fs::create_dir_all(tracefs.join("events/oom/mark_victim")).expect("create_dir_all");
unistd::mkfifo(
tracefs.join("trace_pipe").as_path(),
Mode::S_IRUSR | Mode::S_IWUSR,
)
.expect("trace_pipe");
let dbus = Connection::session().await.expect("dbus");
let ftrace = Ftrace::init(dbus).await.expect("ftrace");
assert_eq!(
fs::read_to_string(tracefs.join("events/oom/mark_victim/enable")).unwrap(),
"1"
);
}
#[tokio::test]
async fn ftrace_relay() {
let h = testing::start();
let path = h.test.path();
let tracefs = PathBuf::from(Ftrace::base());
fs::create_dir_all(tracefs.join("events/oom/mark_victim")).expect("create_dir_all");
unistd::mkfifo(
tracefs.join("trace_pipe").as_path(),
Mode::S_IRUSR | Mode::S_IWUSR,
)
.expect("trace_pipe");
fs::create_dir_all(path.join("proc/14351")).expect("create_dir_all");
fs::write(path.join("proc/14351/comm"), "ftrace\n").expect("write comm");
fs::write(path.join("proc/14351/environ"), "SteamGameId=5678").expect("write environ");
let (sender, mut receiver) = unbounded_channel();
let trace = MockTrace { traces: sender };
let dbus = zbus::connection::Builder::session()
.unwrap()
.name("com.steampowered.SteamOSLogSubmitter")
.unwrap()
.serve_at("/com/steampowered/SteamOSLogSubmitter/helpers/Trace", trace)
.unwrap()
.build()
.await
.expect("dbus");
let mut ftrace = Ftrace::init(dbus).await.expect("ftrace");
assert!(match receiver.try_recv() {
Empty => true,
_ => false,
});
ftrace
.handle_event(
" GamepadUI Input-4886 [003] .N.1. 23828.572941: mark_victim: pid=14351",
)
.await
.expect("event");
let (line, data) = match receiver.try_recv() {
Ok((line, data)) => (line, data),
_ => panic!("Test failed"),
};
assert_eq!(
line,
" GamepadUI Input-4886 [003] .N.1. 23828.572941: mark_victim: pid=14351"
);
assert_eq!(data.len(), 2);
assert_eq!(
data.get("appid").map(|v| v.downcast_ref()),
Some(Ok(5678 as u64))
);
assert_eq!(
data.get("comm").map(|v| v.downcast_ref()),
Some(Ok("ftrace"))
);
ftrace
.handle_event(" GamepadUI Input-4886 [003] .N.1. 23828.572941: split_lock_warn <-")
.await
.expect("event");
let (line, data) = match receiver.try_recv() {
Ok((line, data)) => (line, data),
_ => panic!("Test failed"),
};
assert_eq!(
line,
" GamepadUI Input-4886 [003] .N.1. 23828.572941: split_lock_warn <-"
);
assert_eq!(data.len(), 0);
}
}

137
src/sls/mod.rs Normal file
View file

@ -0,0 +1,137 @@
/* SPDX-License-Identifier: BSD-2-Clause */
pub mod ftrace;
use anyhow::Result;
use std::fmt::Debug;
use std::time::SystemTime;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::field::{Field, Visit};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use zbus::connection::Connection;
use crate::Service;
#[zbus::proxy(
interface = "com.steampowered.SteamOSLogSubmitter.Manager",
default_service = "com.steampowered.SteamOSLogSubmitter",
default_path = "/com/steampowered/SteamOSLogSubmitter/Manager"
)]
trait Daemon {
async fn log(
&self,
timestamp: f64,
module: &str,
level: u32,
message: &str,
) -> zbus::Result<()>;
}
struct StringVisitor {
string: String,
}
struct LogLine {
timestamp: f64,
module: String,
level: u32,
message: String,
}
pub struct LogReceiver
where
Self: 'static,
{
receiver: UnboundedReceiver<LogLine>,
sender: UnboundedSender<LogLine>,
proxy: DaemonProxy<'static>,
}
pub struct LogLayer {
queue: UnboundedSender<LogLine>,
}
impl Visit for StringVisitor {
fn record_debug(&mut self, _: &Field, value: &dyn Debug) {
self.string.push_str(format!("{value:?}").as_str());
}
}
impl LogReceiver {
pub async fn new(connection: Connection) -> Result<LogReceiver> {
let proxy = DaemonProxy::new(&connection).await?;
let (sender, receiver) = unbounded_channel();
Ok(LogReceiver {
receiver,
sender,
proxy,
})
}
}
impl Service for LogReceiver {
const NAME: &'static str = "SLS log receiver";
async fn run(&mut self) -> Result<()> {
while let Some(message) = self.receiver.recv().await {
let _ = self
.proxy
.log(
message.timestamp,
message.module.as_ref(),
message.level,
message.message.as_ref(),
)
.await;
}
Ok(())
}
}
impl LogLayer {
pub async fn new(receiver: &LogReceiver) -> LogLayer {
LogLayer {
queue: receiver.sender.clone(),
}
}
}
impl<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>> Layer<S> for LogLayer {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let target = event.metadata().target();
if !target.starts_with("steamos_workerd::sls") {
// Don't forward non-SLS-related logs to SLS
return;
}
let target = target
.split("::")
.skip(2)
.fold(String::from("steamos_workerd"), |prefix, suffix| {
prefix + "." + suffix
});
let level = match *event.metadata().level() {
Level::TRACE => 10,
Level::DEBUG => 10,
Level::INFO => 20,
Level::WARN => 30,
Level::ERROR => 40,
};
let mut builder = StringVisitor {
string: String::new(),
};
event.record(&mut builder);
let text = builder.string;
let now = SystemTime::now();
let time = match now.duration_since(SystemTime::UNIX_EPOCH) {
Ok(duration) => duration.as_secs_f64(),
Err(_) => 0.0,
};
let _ = self.queue.send(LogLine {
timestamp: time,
module: target,
level,
message: text,
});
}
}