From 69076acedc56d9aa0431c5d1c236290549f99534 Mon Sep 17 00:00:00 2001 From: Vicki Pfau Date: Thu, 25 Jul 2024 21:26:23 -0700 Subject: [PATCH] job: Add mirror relay test --- src/job.rs | 108 ++++++++++++++++++++++++++++++++++++++++++++++++- src/testing.rs | 16 +++++++- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/src/job.rs b/src/job.rs index 6da15ac..8bbcc0e 100644 --- a/src/job.rs +++ b/src/job.rs @@ -381,10 +381,16 @@ impl Service for JobManagerService { #[cfg(test)] pub(crate) mod test { use super::*; + use crate::proxy::JobProxy; use crate::testing; + use anyhow::anyhow; use nix::sys::signal::Signal; - use tokio::sync::oneshot; + use std::time::Duration; + use tokio::sync::{mpsc, oneshot}; + use tokio::task::JoinHandle; + use tokio::time::sleep; + use zbus::names::BusName; use zbus::ConnectionBuilder; #[tokio::test] @@ -489,4 +495,104 @@ pub(crate) mod test { -(Signal::SIGTERM as i32) ); } + + struct MockJob {} + + #[zbus::interface(name = "com.steampowered.SteamOSManager1.Job")] + impl MockJob { + pub async fn pause(&mut self) -> fdo::Result<()> { + Err(fdo::Error::Failed(String::from("pause"))) + } + + pub async fn resume(&mut self) -> fdo::Result<()> { + Err(fdo::Error::Failed(String::from("resume"))) + } + + pub async fn cancel(&mut self, _force: bool) -> fdo::Result<()> { + Err(fdo::Error::Failed(String::from("cancel"))) + } + + pub async fn wait(&mut self) -> fdo::Result { + Ok(-1) + } + } + + #[tokio::test] + async fn test_job_mirror_relay() { + let mut handle = testing::start(); + + let connection = handle.new_dbus().await.expect("connection"); + let address = handle.dbus_address().await.unwrap(); + connection + .request_name("com.steampowered.SteamOSManager1") + .await + .expect("reserve"); + + connection + .object_server() + .at(format!("{JOB_PREFIX}/0"), MockJob {}) + .await + .expect("at"); + + //sleep(Duration::from_millis(10)).await; + + let (tx, mut rx) = mpsc::channel(3); + let (fin_tx, fin_rx) = oneshot::channel(); + + let job: JoinHandle> = tokio::spawn(async move { + let connection = ConnectionBuilder::address(address) + .expect("address") + .build() + .await + .expect("build"); + let mut jm = JobManager::new(connection.clone()).await.expect("jm"); + + sleep(Duration::from_millis(10)).await; + + let path = jm + .mirror_job(&connection, format!("{JOB_PREFIX}/0")) + .await + .expect("mirror_job"); + let name = connection.unique_name().unwrap().clone(); + let proxy = JobProxy::builder(&connection) + .destination(BusName::Unique(name.into())) + .expect("destination") + .path(path) + .expect("path") + .build() + .await + .expect("build"); + + match proxy.pause().await.unwrap_err() { + zbus::Error::MethodError(_, Some(text), _) => tx.send(text).await?, + _ => bail!("pause"), + }; + match proxy.resume().await.unwrap_err() { + zbus::Error::MethodError(_, Some(text), _) => tx.send(text).await?, + _ => bail!("resume"), + }; + match proxy.cancel(false).await.unwrap_err() { + zbus::Error::MethodError(_, Some(text), _) => tx.send(text).await?, + _ => bail!("cancel"), + }; + + Ok(fin_rx.await?) + }); + + assert_eq!( + rx.recv().await.expect("rx"), + "org.freedesktop.DBus.Error.Failed: pause" + ); + assert_eq!( + rx.recv().await.expect("rx"), + "org.freedesktop.DBus.Error.Failed: resume" + ); + assert_eq!( + rx.recv().await.expect("rx"), + "org.freedesktop.DBus.Error.Failed: cancel" + ); + + fin_tx.send(()).expect("fin"); + job.await.expect("job").expect("job2"); + } } diff --git a/src/testing.rs b/src/testing.rs index 303e030..5a66052 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -8,11 +8,13 @@ use std::ffi::OsStr; use std::path::Path; use std::process::Stdio; use std::rc::Rc; +use std::str::FromStr; use std::time::Duration; use tempfile::{tempdir, TempDir}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; -use zbus::connection::{Builder, Connection}; +use tokio::sync::Mutex; +use zbus::{Address, Connection, ConnectionBuilder}; thread_local! { static TEST: RefCell>> = RefCell::new(None); @@ -57,6 +59,7 @@ pub fn start() -> TestHandle { base: tempdir().expect("Couldn't create test directory"), process_cb: Cell::new(|_, _| Err(anyhow!("No current process_cb"))), mock_dbus: Cell::new(None), + dbus_address: Mutex::new(None), }); *lock.borrow_mut() = Some(test.clone()); TestHandle { test } @@ -80,6 +83,7 @@ pub fn current() -> Rc { pub struct MockDBus { pub connection: Connection, + address: Address, process: Child, } @@ -87,6 +91,7 @@ pub struct Test { base: TempDir, pub process_cb: Cell Result<(i32, String)>>, pub mock_dbus: Cell>, + pub dbus_address: Mutex>, } pub struct TestHandle { @@ -113,10 +118,12 @@ impl MockDBus { .await? .ok_or(anyhow!("Failed to read address"))?; - let connection = Builder::address(address.trim_end())?.build().await?; + let address = Address::from_str(address.trim_end())?; + let connection = ConnectionBuilder::address(address.clone())?.build().await?; Ok(MockDBus { connection, + address, process, }) } @@ -152,9 +159,14 @@ impl TestHandle { pub async fn new_dbus(&mut self) -> Result { let dbus = MockDBus::new().await?; let connection = dbus.connection.clone(); + *self.test.dbus_address.lock().await = Some(dbus.address.clone()); self.test.mock_dbus.set(Some(dbus)); Ok(connection) } + + pub async fn dbus_address(&self) -> Option
{ + (*self.test.dbus_address.lock().await).clone() + } } impl Drop for TestHandle {