From f741627fc295fdcafe38eea18e746416a4656548 Mon Sep 17 00:00:00 2001 From: Vicki Pfau Date: Thu, 15 May 2025 23:22:03 -0700 Subject: [PATCH] inputplumber: Handle interface adding in spawned task The receiver in the poller could fill up if several interfaces were added before we finished testing one, which could cause the bus to deadlock. This appears as one of the proxies hanging on a reply, but that's specifically because the bus was backlogged waiting for the receiver to be drained. By splitting the tasks, we can continue draining the receiver while waiting for the proxy separately. --- src/inputplumber.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/inputplumber.rs b/src/inputplumber.rs index a3544ef..92fb23e 100644 --- a/src/inputplumber.rs +++ b/src/inputplumber.rs @@ -6,10 +6,12 @@ */ use anyhow::Result; +use tokio::spawn; use tokio_stream::StreamExt; use tracing::{debug, info}; use zbus::fdo::{InterfacesAdded, ObjectManagerProxy}; use zbus::names::OwnedInterfaceName; +use zbus::proxy::CacheProperties; use zbus::zvariant::ObjectPath; use zbus::Connection; @@ -36,6 +38,7 @@ trait Target { fn device_type(&self) -> Result; } +#[derive(Clone, Debug)] pub struct DeckService { connection: Connection, composite_device_iface_name: OwnedInterfaceName, @@ -101,6 +104,7 @@ impl DeckService { return Ok(()); } let proxy = CompositeDeviceProxy::builder(&self.connection) + .cache_properties(CacheProperties::No) .path(path)? .build() .await?; @@ -132,7 +136,15 @@ impl Service for DeckService { loop { tokio::select! { - Some(iface) = iface_added.next() => self.make_deck_from_ifaces_added(iface).await?, + Some(iface) = iface_added.next() => { + // This needs to be done in a separate task to prevent the + // signal listener from filling up. We just clone `self` + // for this since it doesn't hold any state. + let ctx = self.clone(); + spawn(async move { + ctx.make_deck_from_ifaces_added(iface).await + }); + } } } }