Testing HORUS Applications

You need to verify that your HORUS nodes work correctly before deploying to a real robot. Here is how to write unit tests, integration tests, and simulation tests using Rust's built-in test framework and HORUS testing utilities.

When To Use This

  • Writing unit tests for individual node logic
  • Testing multi-node pipelines with real shared memory IPC
  • Using tick_once() for deterministic single-tick testing
  • Setting up CI/CD test pipelines with horus test
  • Recording and replaying sessions for regression testing

Use Debugging Workflows instead if you need to diagnose a problem in a running system rather than write tests.

Prerequisites

  • A HORUS project with horus.toml (see Quick Start)
  • Familiarity with Nodes and Topics
  • Rust testing basics (#[test], #[cfg(test)], assert!)

Testing Strategies

StrategyWhen to UseComplexity
Unit test (single node)Test node logic in isolationLow
Integration test (multi-node)Test nodes communicating through topicsMedium
Business logic extractionTest pure functions without topicsLowest
tick_once() testingDeterministic single-tick scheduler testsMedium
Record/replayReproduce bugs and regression testHigh

Unit Testing a Single Node

Test individual node behavior in isolation.

Example: Testing a Temperature Sensor

File: src/main.rs

// simplified
use horus::prelude::*;

// The node we want to test
pub struct TemperatureSensor {
    temp_pub: Topic<f32>,
    reading: f32,
}

impl TemperatureSensor {
    pub fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("temperature")?,
            reading: 20.0,
        })
    }

    // Make this public so tests can inspect it
    pub fn get_reading(&self) -> f32 {
        self.reading
    }
}

impl Node for TemperatureSensor {
    fn name(&self) -> &str {
        "TemperatureSensor"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Sensor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        // Increment reading each tick
        self.reading += 0.5;

        // Publish temperature
        self.temp_pub.send(self.reading);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(TemperatureSensor::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

// ============================================================================
// TESTS
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_sensor_initialization() {
        let sensor = TemperatureSensor::new().unwrap();
        assert_eq!(sensor.get_reading(), 20.0);
    }

    #[test]
    fn test_sensor_tick_increments_reading() {
        let mut sensor = TemperatureSensor::new().unwrap();
        for i in 1..=5 {
            sensor.tick();
            let expected = 20.0 + (i as f32 * 0.5);
            assert_eq!(sensor.get_reading(), expected);
        }
    }
}

Run the Tests

horus test

Expected Output:

running 2 tests
test tests::test_sensor_initialization ... ok
test tests::test_sensor_tick_increments_reading ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured

Key Testing Patterns

1. Test Node Creation:

// simplified
#[test]
fn test_node_creation() {
    let node = MyNode::new().unwrap();
    assert_eq!(node.some_field, expected_value);
}

2. Test Initialization:

// simplified
#[test]
fn test_init() {
    let mut node = MyNode::new().unwrap();
    assert!(node.init().is_ok());
}

3. Test Tick Logic:

// simplified
#[test]
fn test_tick() {
    let mut node = MyNode::new().unwrap();
    node.tick();
    // Verify state changes
    assert_eq!(node.counter, 1);
}

4. Test Shutdown:

// simplified
#[test]
fn test_shutdown() {
    let mut node = MyNode::new().unwrap();
    assert!(node.shutdown().is_ok());
}

Testing Multiple Nodes Together

Test nodes communicating through topics.

Example: Publisher-Subscriber Test

File: src/main.rs

// simplified
use horus::prelude::*;
use std::sync::{Arc, Mutex};

// Publisher node
pub struct PublisherNode {
    data_pub: Topic<f32>,
}

impl PublisherNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            data_pub: Topic::new("test_data")?,
        })
    }
}

impl Node for PublisherNode {
    fn name(&self) -> &str {
        "PublisherNode"
    }

    fn tick(&mut self) {
        self.data_pub.send(42.0);
    }
}

// Subscriber node that stores received data
pub struct SubscriberNode {
    data_sub: Topic<f32>,
    received: Arc<Mutex<Vec<f32>>>,
}

impl SubscriberNode {
    pub fn new(received: Arc<Mutex<Vec<f32>>>) -> Result<Self> {
        Ok(Self {
            data_sub: Topic::new("test_data")?,
            received,
        })
    }
}

impl Node for SubscriberNode {
    fn name(&self) -> &str {
        "SubscriberNode"
    }

    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            self.received.lock().unwrap().push(data);
        }
    }
}

fn main() -> Result<()> {
    let received = Arc::new(Mutex::new(Vec::new()));

    let mut scheduler = Scheduler::new();
    scheduler.add(PublisherNode::new()?).order(0).build()?;
    scheduler.add(SubscriberNode::new(received)?).order(1).build()?;
    scheduler.run()?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_pubsub_communication() {
        // Shared storage for received messages
        let received = Arc::new(Mutex::new(Vec::new()));

        // Create publisher and subscriber
        let mut pub_node = PublisherNode::new().unwrap();
        let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();

        // Publish a message
        pub_node.tick();

        // Small delay to allow IPC (shared memory needs time to propagate)
        thread::sleep(Duration::from_millis(10));

        // Subscriber receives the message
        sub_node.tick();

        // Verify message was received
        let data = received.lock().unwrap();
        assert_eq!(data.len(), 1);
        assert_eq!(data[0], 42.0);
    }

    #[test]
    fn test_multiple_messages() {
        let received = Arc::new(Mutex::new(Vec::new()));

        let mut pub_node = PublisherNode::new().unwrap();
        let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();

        // Publish 5 messages
        for _ in 0..5 {
            pub_node.tick();
            thread::sleep(Duration::from_millis(5));
            sub_node.tick();
        }

        // Verify all messages received
        let data = received.lock().unwrap();
        assert_eq!(data.len(), 5);
        for value in data.iter() {
            assert_eq!(*value, 42.0);
        }
    }
}

Run Integration Tests

horus test test_pubsub_communication --test-threads 1

Why --test-threads 1?

  • Prevents tests from running in parallel (this is the default for horus test)
  • Avoids shared memory conflicts between tests
  • Ensures deterministic behavior

Expected Output:

running 2 tests
test tests::test_pubsub_communication ... ok
test tests::test_multiple_messages ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured

Testing Business Logic in Isolation

Test node logic without real Topic connections.

Example: Extracting Testable Logic

// simplified
use horus::prelude::*;

// Node that processes temperature data
pub struct TemperatureProcessor {
    input_sub: Topic<f32>,
    output_pub: Topic<f32>,
}

impl TemperatureProcessor {
    pub fn new() -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("input_temp")?,
            output_pub: Topic::new("output_temp")?,
        })
    }

    // Public method for testing business logic
    pub fn process_temperature(&self, temp: f32) -> f32 {
        // Convert Celsius to Fahrenheit
        temp * 9.0 / 5.0 + 32.0
    }
}

impl Node for TemperatureProcessor {
    fn name(&self) -> &str {
        "TemperatureProcessor"
    }

    fn tick(&mut self) {
        if let Some(celsius) = self.input_sub.recv() {
            let fahrenheit = self.process_temperature(celsius);
            self.output_pub.send(fahrenheit);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_temperature_conversion_logic() {
        // Test business logic WITHOUT Topic
        let processor = TemperatureProcessor::new().unwrap();

        // Test known conversions
        assert_eq!(processor.process_temperature(0.0), 32.0);
        assert_eq!(processor.process_temperature(100.0), 212.0);
        assert_eq!(processor.process_temperature(-40.0), -40.0);
    }

    #[test]
    fn test_with_mock_data() {
        let mut processor = TemperatureProcessor::new().unwrap();

        // We can't easily mock Topic, but we can test the logic
        // by calling process_temperature directly
        let celsius_readings = vec![0.0, 10.0, 20.0, 30.0, 100.0];
        let expected_fahrenheit = vec![32.0, 50.0, 68.0, 86.0, 212.0];

        for (celsius, expected) in celsius_readings.iter().zip(expected_fahrenheit.iter()) {
            let result = processor.process_temperature(*celsius);
            assert_eq!(result, *expected);
        }
    }
}

Testing Strategy Without Topic Mocks

Since HORUS Topics use real shared memory, full mocking is complex. Instead:

1. Extract Business Logic:

// simplified
// Good: Business logic in testable method
pub fn process_temperature(&self, temp: f32) -> f32 {
    temp * 9.0 / 5.0 + 32.0
}

// Test this directly without Topic
#[test]
fn test_logic() {
    let node = TemperatureProcessor::new().unwrap();
    assert_eq!(node.process_temperature(0.0), 32.0);
}

2. Test Tick with Real Topics:

// simplified
// Topics are lightweight — use real ones in tests
#[test]
fn test_with_real_topic() {
    let mut node = MyNode::new().unwrap();
    node.tick();
    // Verify behavior
}

3. Use Shared State for Verification:

// simplified
// Store results in node for verification
pub struct TestNode {
    pub last_result: Option<f32>,
}

#[test]
fn test_result() {
    let mut node = TestNode::new();
    node.tick();
    assert_eq!(node.last_result, Some(42.0));
}

Complete Testing Example

A fully tested 3-node system.

File: src/main.rs

// simplified
use horus::prelude::*;
use std::sync::{Arc, Mutex};

// Node 1: Generate numbers
pub struct GeneratorNode {
    output_pub: Topic<u32>,
    counter: u32,
}

impl GeneratorNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            output_pub: Topic::new("numbers")?,
            counter: 0,
        })
    }
}

impl Node for GeneratorNode {
    fn name(&self) -> &str { "GeneratorNode" }

    fn tick(&mut self) {
        self.counter += 1;
        self.output_pub.send(self.counter);
    }
}

// Node 2: Double the numbers
pub struct DoublerNode {
    input_sub: Topic<u32>,
    output_pub: Topic<u32>,
}

impl DoublerNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("numbers")?,
            output_pub: Topic::new("doubled")?,
        })
    }
}

impl Node for DoublerNode {
    fn name(&self) -> &str { "DoublerNode" }

    fn tick(&mut self) {
        if let Some(n) = self.input_sub.recv() {
            self.output_pub.send(n * 2);
        }
    }
}

// Node 3: Collect results
pub struct CollectorNode {
    input_sub: Topic<u32>,
    collected: Arc<Mutex<Vec<u32>>>,
}

impl CollectorNode {
    pub fn new(collected: Arc<Mutex<Vec<u32>>>) -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("doubled")?,
            collected,
        })
    }
}

impl Node for CollectorNode {
    fn name(&self) -> &str { "CollectorNode" }

    fn tick(&mut self) {
        if let Some(n) = self.input_sub.recv() {
            self.collected.lock().unwrap().push(n);
        }
    }
}

fn main() -> Result<()> {
    let collected = Arc::new(Mutex::new(Vec::new()));

    let mut scheduler = Scheduler::new();
    scheduler.add(GeneratorNode::new()?).order(0).build()?;
    scheduler.add(DoublerNode::new()?).order(1).build()?;
    scheduler.add(CollectorNode::new(collected)?).order(2).build()?;

    scheduler.run()?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_generator_node() {
        let mut node = GeneratorNode::new().unwrap();

        // Initial state
        assert_eq!(node.counter, 0);

        // After 3 ticks
        for _ in 0..3 {
            node.tick();
        }

        assert_eq!(node.counter, 3);
    }

    #[test]
    fn test_pipeline() {
        let collected = Arc::new(Mutex::new(Vec::new()));

        let mut gen = GeneratorNode::new().unwrap();
        let mut dbl = DoublerNode::new().unwrap();
        let mut col = CollectorNode::new(Arc::clone(&collected)).unwrap();

        // Run 5 iterations of the pipeline
        for _ in 0..5 {
            gen.tick();
            thread::sleep(Duration::from_millis(5));
            dbl.tick();
            thread::sleep(Duration::from_millis(5));
            col.tick();
        }

        // Verify results: 1*2=2, 2*2=4, 3*2=6, 4*2=8, 5*2=10
        let results = collected.lock().unwrap();
        assert_eq!(*results, vec![2, 4, 6, 8, 10]);
    }
}

Run All Tests

horus test --test-threads 1

Output:

running 2 tests
test tests::test_generator_node ... ok
test tests::test_pipeline ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured

Best Practices

1. Test Business Logic Separately

Extract pure functions for easy testing:

// simplified
// Good: Pure function (easy to test)
fn calculate_velocity(distance: f32, time: f32) -> f32 {
    distance / time
}

#[test]
fn test_velocity() {
    assert_eq!(calculate_velocity(100.0, 10.0), 10.0);
}

2. Use Arc for Shared Test Data

Share data between nodes for verification:

// simplified
let results = Arc::new(Mutex::new(Vec::new()));
let node = TestNode::new(Arc::clone(&results))?;

// Later in test
assert_eq!(results.lock().unwrap().len(), 5);

3. Add Small Delays for IPC

Shared memory needs time to propagate:

// simplified
pub_node.tick();
thread::sleep(Duration::from_millis(10));  // Allow IPC
sub_node.tick();

4. Run Tests Sequentially

horus test defaults to single-threaded execution to prevent shared memory conflicts. If you use --parallel, ensure each test uses unique topic names:

horus test    # Already single-threaded by default

5. Test Edge Cases

// simplified
#[test]
fn test_no_messages() {
    let mut node = SubscriberNode::new().unwrap();
    node.tick();  // Should handle gracefully
}

#[test]
fn test_invalid_data() {
    let result = node.process(-1.0);
    assert!(result.is_err());
}

Running Tests with horus test

Basic Commands

# Run all tests (defaults to single-threaded for shared memory safety)
horus test

# Run specific test by name filter
horus test test_sensor_initialization

# Run tests matching a pattern
horus test sensor

# Show test output (println!, hlog!, etc.)
horus test --nocapture

# Run with multiple threads (override default single-threaded mode)
horus test --parallel
horus test --test-threads 4

# Run in release mode (optimized build)
horus test --release

# Skip the build step (use existing build artifacts)
horus test --no-build

# Skip shared memory cleanup after tests
horus test --no-cleanup

# Verbose output
horus test -v

# Run integration tests (tests marked #[ignore])
horus test --integration

# Enable simulation drivers (no hardware required)
horus test --sim

Test Organization

// simplified
#[cfg(test)]
mod tests {
    use super::*;

    mod unit_tests {
        use super::*;

        #[test]
        fn test_creation() { /* ... */ }
    }

    mod integration_tests {
        use super::*;

        // Mark integration tests with #[ignore] — run with `horus test --integration`
        #[test]
        #[ignore]
        fn test_full_pipeline() { /* ... */ }
    }
}

Single-Tick Testing with tick_once()

For simulation and fine-grained testing, tick_once() executes exactly one scheduler tick cycle and returns. This gives you full control over the execution loop.

tick_once()

Execute all registered nodes exactly once in priority order:

// simplified
#[test]
fn test_single_tick_behavior() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();

    // Execute one tick — all nodes run once in priority order
    scheduler.tick_once().unwrap();

    // Verify state after exactly one tick
    // ...
}

tick()

Execute only specific nodes by name. Non-existent names are silently ignored:

// simplified
#[test]
fn test_selective_tick() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
    scheduler.add(LoggerNode::new().unwrap()).order(2).build().unwrap();

    // Tick only the sensor — control and logger are skipped
    scheduler.tick(&["SensorNode"]).unwrap();

    // Tick sensor + control, skip logger
    scheduler.tick(&["SensorNode", "ControlNode"]).unwrap();
}

Simulation Loop Pattern

Use tick_once() to integrate HORUS nodes into a simulation loop:

// simplified
fn run_simulation() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(MotorController::new()?).order(0).build()?;
    scheduler.add(SensorFusion::new()?).order(1).build()?;

    let dt = Duration::from_millis(10); // 100 Hz simulation

    for step in 0..1000 {
        // 1. Step physics simulation
        sim.step(dt);

        // 2. Run HORUS nodes for this timestep
        scheduler.tick_once()?;

        // 3. Render or collect results
        sim.render();
    }

    Ok(())
}

Lazy Initialization

tick_once() and tick() lazily initialize nodes on the first call — you don't need to call run() or any separate init method. The first tick_once() call runs init() on all nodes, then executes the tick.

Time-Limited Test Runs

Use scheduler.run_for() to run tests for a fixed duration:

// simplified
#[test]
fn test_system_runs_for_one_second() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();

    // Run for exactly 1 second, then shutdown gracefully
    scheduler.run_for(std::time::Duration::from_secs(1)).unwrap();
}

Record/Replay Testing

HORUS supports recording node execution and replaying it later for deterministic debugging. This is useful for reproducing bugs and regression testing.

Recording a Session

Record all node inputs/outputs during a run:

# Record while running
horus run --record

Recordings are saved to ~/.horus/recordings/<session>/ in binary format (.horus files). Each node gets its own recording file:

~/.horus/recordings/my_session/
├── sensor_node@abc123.horus       # Node recording
├── control_node@def456.horus      # Node recording
└── scheduler@main789.horus        # Scheduler execution order

Replaying in Tests

Use scheduler.add_replay() to replay a recorded node:

// simplified
use std::path::PathBuf;

#[test]
fn test_replay_crash_scenario() {
    let mut scheduler = Scheduler::new();

    // Replay the motor node from a crash recording
    scheduler.add_replay(
        PathBuf::from("~/.horus/recordings/crash/motor_node@abc123.horus"),
        1,  // priority
    ).unwrap();

    // Add a live node to test against the recorded data
    scheduler.add(DiagnosticNode::new().unwrap()).order(2).build().unwrap();

    scheduler.run_for(std::time::Duration::from_secs(5)).unwrap();
}

Replay Modes

ModeDescription
Full replayReplay all nodes from a scheduler recording
Mixed replayReplay some nodes while others run live
Range replayReplay only a specific tick range

Managing Recordings

# List recording sessions
ls ~/.horus/recordings/

# Recordings auto-cap at 100MB per node
# Delete old recordings to free space

Common Errors

SymptomCauseFix
Tests fail randomlyShared memory conflicts from parallel testsUse horus test --test-threads 1 (default) or unique topic names per test
"Topic not found" errorsTopic created in one test leaks into anotherUse unique topic names: Topic::new("test_topic_1")?
Messages not receivedIPC needs time to propagate through shared memoryAdd thread::sleep(Duration::from_millis(10)) between send and recv
Test hangs foreverscheduler.run() blocks indefinitelyUse scheduler.run_for(Duration::from_secs(1)) or tick_once() instead
Compilation errors in testsMissing use super::* or use horus::prelude::*Ensure test module imports parent scope and prelude
Shared memory permission errorPrevious test run crashed without cleanupRun horus clean --shm before testing

See Also