Testing HORUS Applications

Learn how to test your HORUS nodes and applications with complete, runnable examples using Rust's built-in test framework.

Why Test HORUS Nodes?

Testing ensures:

  • Nodes work in isolation before integration
  • Message passing is correct (right topics, right types)
  • Lifecycle methods behave properly (init, tick, shutdown)
  • Edge cases are handled (no messages, invalid data, etc.)
  • Refactoring doesn't break functionality

Testing Strategies

1. Unit Testing a Single Node

Test node behavior without running the scheduler.

2. Integration Testing Multiple Nodes

Test nodes communicating through the Topic.

3. Testing Business Logic in Isolation

Extract and test business logic without Topic dependencies.

Unit Testing a Single Node

Test individual node behavior in isolation.

Example: Testing a Temperature Sensor

File: src/main.rs

use horus::prelude::*;

// The node we want to test
pub struct TemperatureSensor {
    temp_pub: Topic<f32>,
    reading: f32,
}

impl TemperatureSensor {
    pub fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("temperature")?,
            reading: 20.0,
        })
    }

    // Make this public so tests can inspect it
    pub fn get_reading(&self) -> f32 {
        self.reading
    }
}

impl Node for TemperatureSensor {
    fn name(&self) -> &str {
        "TemperatureSensor"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Sensor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        // Increment reading each tick
        self.reading += 0.5;

        // Publish temperature
        self.temp_pub.send(self.reading);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(TemperatureSensor::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

// ============================================================================
// TESTS
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_sensor_initialization() {
        // Test that sensor initializes with correct default value
        let sensor = TemperatureSensor::new().unwrap();
        assert_eq!(sensor.get_reading(), 20.0);
    }

    #[test]
    fn test_sensor_init_lifecycle() {
        let mut sensor = TemperatureSensor::new().unwrap();

        // Test init() method
        let result = sensor.init();
        assert!(result.is_ok());
    }

    #[test]
    fn test_sensor_tick_increments_reading() {
        let mut sensor = TemperatureSensor::new().unwrap();

        // Run tick 5 times
        for i in 1..=5 {
            sensor.tick();

            // Verify reading increments by 0.5 each tick
            let expected = 20.0 + (i as f32 * 0.5);
            assert_eq!(sensor.get_reading(), expected);
        }
    }

    #[test]
    fn test_sensor_shutdown() {
        let mut sensor = TemperatureSensor::new().unwrap();

        // Test shutdown() method
        let result = sensor.shutdown();
        assert!(result.is_ok());
    }
}

Run the Tests

horus test

Expected Output:

running 4 tests
test tests::test_sensor_initialization ... ok
test tests::test_sensor_init_lifecycle ... ok
test tests::test_sensor_tick_increments_reading ... ok
test tests::test_sensor_shutdown ... ok

test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured

Key Testing Patterns

1. Test Node Creation:

#[test]
fn test_node_creation() {
    let node = MyNode::new().unwrap();
    assert_eq!(node.some_field, expected_value);
}

2. Test Initialization:

#[test]
fn test_init() {
    let mut node = MyNode::new().unwrap();
    assert!(node.init().is_ok());
}

3. Test Tick Logic:

#[test]
fn test_tick() {
    let mut node = MyNode::new().unwrap();
    node.tick();
    // Verify state changes
    assert_eq!(node.counter, 1);
}

4. Test Shutdown:

#[test]
fn test_shutdown() {
    let mut node = MyNode::new().unwrap();
    assert!(node.shutdown().is_ok());
}

Testing Multiple Nodes Together

Test nodes communicating through topics.

Example: Publisher-Subscriber Test

File: src/main.rs

use horus::prelude::*;
use std::sync::{Arc, Mutex};

// Publisher node
pub struct PublisherNode {
    data_pub: Topic<f32>,
}

impl PublisherNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            data_pub: Topic::new("test_data")?,
        })
    }
}

impl Node for PublisherNode {
    fn name(&self) -> &str {
        "PublisherNode"
    }

    fn tick(&mut self) {
        self.data_pub.send(42.0);
    }
}

// Subscriber node that stores received data
pub struct SubscriberNode {
    data_sub: Topic<f32>,
    received: Arc<Mutex<Vec<f32>>>,
}

impl SubscriberNode {
    pub fn new(received: Arc<Mutex<Vec<f32>>>) -> Result<Self> {
        Ok(Self {
            data_sub: Topic::new("test_data")?,
            received,
        })
    }
}

impl Node for SubscriberNode {
    fn name(&self) -> &str {
        "SubscriberNode"
    }

    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            self.received.lock().unwrap().push(data);
        }
    }
}

fn main() -> Result<()> {
    let received = Arc::new(Mutex::new(Vec::new()));

    let mut scheduler = Scheduler::new();
    scheduler.add(PublisherNode::new()?).order(0).build()?;
    scheduler.add(SubscriberNode::new(received)?).order(1).build()?;
    scheduler.run()?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_pubsub_communication() {
        // Shared storage for received messages
        let received = Arc::new(Mutex::new(Vec::new()));

        // Create publisher and subscriber
        let mut pub_node = PublisherNode::new().unwrap();
        let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();

        // Publish a message
        pub_node.tick();

        // Small delay to allow IPC (shared memory needs time to propagate)
        thread::sleep(Duration::from_millis(10));

        // Subscriber receives the message
        sub_node.tick();

        // Verify message was received
        let data = received.lock().unwrap();
        assert_eq!(data.len(), 1);
        assert_eq!(data[0], 42.0);
    }

    #[test]
    fn test_multiple_messages() {
        let received = Arc::new(Mutex::new(Vec::new()));

        let mut pub_node = PublisherNode::new().unwrap();
        let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();

        // Publish 5 messages
        for _ in 0..5 {
            pub_node.tick();
            thread::sleep(Duration::from_millis(5));
            sub_node.tick();
        }

        // Verify all messages received
        let data = received.lock().unwrap();
        assert_eq!(data.len(), 5);
        for value in data.iter() {
            assert_eq!(*value, 42.0);
        }
    }
}

Run Integration Tests

horus test test_pubsub_communication --test-threads 1

Why --test-threads 1?

  • Prevents tests from running in parallel (this is the default for horus test)
  • Avoids shared memory conflicts between tests
  • Ensures deterministic behavior

Expected Output:

running 2 tests
test tests::test_pubsub_communication ... ok
test tests::test_multiple_messages ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured

Testing Business Logic in Isolation

Test node logic without real Topic connections.

Example: Extracting Testable Logic

use horus::prelude::*;

// Node that processes temperature data
pub struct TemperatureProcessor {
    input_sub: Topic<f32>,
    output_pub: Topic<f32>,
}

impl TemperatureProcessor {
    pub fn new() -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("input_temp")?,
            output_pub: Topic::new("output_temp")?,
        })
    }

    // Public method for testing business logic
    pub fn process_temperature(&self, temp: f32) -> f32 {
        // Convert Celsius to Fahrenheit
        temp * 9.0 / 5.0 + 32.0
    }
}

impl Node for TemperatureProcessor {
    fn name(&self) -> &str {
        "TemperatureProcessor"
    }

    fn tick(&mut self) {
        if let Some(celsius) = self.input_sub.recv() {
            let fahrenheit = self.process_temperature(celsius);
            self.output_pub.send(fahrenheit);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_temperature_conversion_logic() {
        // Test business logic WITHOUT Topic
        let processor = TemperatureProcessor::new().unwrap();

        // Test known conversions
        assert_eq!(processor.process_temperature(0.0), 32.0);
        assert_eq!(processor.process_temperature(100.0), 212.0);
        assert_eq!(processor.process_temperature(-40.0), -40.0);
    }

    #[test]
    fn test_with_mock_data() {
        let mut processor = TemperatureProcessor::new().unwrap();

        // We can't easily mock Topic, but we can test the logic
        // by calling process_temperature directly
        let celsius_readings = vec![0.0, 10.0, 20.0, 30.0, 100.0];
        let expected_fahrenheit = vec![32.0, 50.0, 68.0, 86.0, 212.0];

        for (celsius, expected) in celsius_readings.iter().zip(expected_fahrenheit.iter()) {
            let result = processor.process_temperature(*celsius);
            assert_eq!(result, *expected);
        }
    }
}

Testing Strategy Without Topic Mocks

Since HORUS Topics use real shared memory, full mocking is complex. Instead:

1. Extract Business Logic:

// Good: Business logic in testable method
pub fn process_temperature(&self, temp: f32) -> f32 {
    temp * 9.0 / 5.0 + 32.0
}

// Test this directly without Topic
#[test]
fn test_logic() {
    let node = TemperatureProcessor::new().unwrap();
    assert_eq!(node.process_temperature(0.0), 32.0);
}

2. Test Tick with Real Topics:

// Topics are lightweight — use real ones in tests
#[test]
fn test_with_real_topic() {
    let mut node = MyNode::new().unwrap();
    node.tick();
    // Verify behavior
}

3. Use Shared State for Verification:

// Store results in node for verification
pub struct TestNode {
    pub last_result: Option<f32>,
}

#[test]
fn test_result() {
    let mut node = TestNode::new();
    node.tick();
    assert_eq!(node.last_result, Some(42.0));
}

Complete Testing Example

A fully tested 3-node system.

File: src/main.rs

use horus::prelude::*;
use std::sync::{Arc, Mutex};

// Node 1: Generate numbers
pub struct GeneratorNode {
    output_pub: Topic<u32>,
    counter: u32,
}

impl GeneratorNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            output_pub: Topic::new("numbers")?,
            counter: 0,
        })
    }
}

impl Node for GeneratorNode {
    fn name(&self) -> &str { "GeneratorNode" }

    fn tick(&mut self) {
        self.counter += 1;
        self.output_pub.send(self.counter);
    }
}

// Node 2: Double the numbers
pub struct DoublerNode {
    input_sub: Topic<u32>,
    output_pub: Topic<u32>,
}

impl DoublerNode {
    pub fn new() -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("numbers")?,
            output_pub: Topic::new("doubled")?,
        })
    }
}

impl Node for DoublerNode {
    fn name(&self) -> &str { "DoublerNode" }

    fn tick(&mut self) {
        if let Some(n) = self.input_sub.recv() {
            self.output_pub.send(n * 2);
        }
    }
}

// Node 3: Collect results
pub struct CollectorNode {
    input_sub: Topic<u32>,
    collected: Arc<Mutex<Vec<u32>>>,
}

impl CollectorNode {
    pub fn new(collected: Arc<Mutex<Vec<u32>>>) -> Result<Self> {
        Ok(Self {
            input_sub: Topic::new("doubled")?,
            collected,
        })
    }
}

impl Node for CollectorNode {
    fn name(&self) -> &str { "CollectorNode" }

    fn tick(&mut self) {
        if let Some(n) = self.input_sub.recv() {
            self.collected.lock().unwrap().push(n);
        }
    }
}

fn main() -> Result<()> {
    let collected = Arc::new(Mutex::new(Vec::new()));

    let mut scheduler = Scheduler::new();
    scheduler.add(GeneratorNode::new()?).order(0).build()?;
    scheduler.add(DoublerNode::new()?).order(1).build()?;
    scheduler.add(CollectorNode::new(collected)?).order(2).build()?;

    scheduler.run()?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_generator_node() {
        let mut node = GeneratorNode::new().unwrap();

        // Initial state
        assert_eq!(node.counter, 0);

        // After 3 ticks
        for _ in 0..3 {
            node.tick();
        }

        assert_eq!(node.counter, 3);
    }

    #[test]
    fn test_pipeline() {
        let collected = Arc::new(Mutex::new(Vec::new()));

        let mut gen = GeneratorNode::new().unwrap();
        let mut dbl = DoublerNode::new().unwrap();
        let mut col = CollectorNode::new(Arc::clone(&collected)).unwrap();

        // Run 5 iterations of the pipeline
        for _ in 0..5 {
            gen.tick();
            thread::sleep(Duration::from_millis(5));
            dbl.tick();
            thread::sleep(Duration::from_millis(5));
            col.tick();
        }

        // Verify results: 1*2=2, 2*2=4, 3*2=6, 4*2=8, 5*2=10
        let results = collected.lock().unwrap();
        assert_eq!(*results, vec![2, 4, 6, 8, 10]);
    }
}

Run All Tests

horus test --test-threads 1

Output:

running 2 tests
test tests::test_generator_node ... ok
test tests::test_pipeline ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured

Best Practices

1. Test Business Logic Separately

Extract pure functions for easy testing:

// Good: Pure function (easy to test)
fn calculate_velocity(distance: f32, time: f32) -> f32 {
    distance / time
}

#[test]
fn test_velocity() {
    assert_eq!(calculate_velocity(100.0, 10.0), 10.0);
}

2. Use Arc for Shared Test Data

Share data between nodes for verification:

let results = Arc::new(Mutex::new(Vec::new()));
let node = TestNode::new(Arc::clone(&results))?;

// Later in test
assert_eq!(results.lock().unwrap().len(), 5);

3. Add Small Delays for IPC

Shared memory needs time to propagate:

pub_node.tick();
thread::sleep(Duration::from_millis(10));  // Allow IPC
sub_node.tick();

4. Run Tests Sequentially

horus test defaults to single-threaded execution to prevent shared memory conflicts. If you use --parallel, ensure each test uses unique topic names:

horus test    # Already single-threaded by default

5. Test Edge Cases

#[test]
fn test_no_messages() {
    let mut node = SubscriberNode::new().unwrap();
    node.tick();  // Should handle gracefully
}

#[test]
fn test_invalid_data() {
    let result = node.process(-1.0);
    assert!(result.is_err());
}

Running Tests with horus test

Basic Commands

# Run all tests (defaults to single-threaded for shared memory safety)
horus test

# Run specific test by name filter
horus test test_sensor_initialization

# Run tests matching a pattern
horus test sensor

# Show test output (println!, hlog!, etc.)
horus test --nocapture

# Run with multiple threads (override default single-threaded mode)
horus test --parallel
horus test --test-threads 4

# Run in release mode (optimized build)
horus test --release

# Skip the build step (use existing build artifacts)
horus test --no-build

# Skip shared memory cleanup after tests
horus test --no-cleanup

# Verbose output
horus test -v

# Run integration tests (tests marked #[ignore])
horus test --integration

# Enable simulation drivers (no hardware required)
horus test --sim

Test Organization

#[cfg(test)]
mod tests {
    use super::*;

    mod unit_tests {
        use super::*;

        #[test]
        fn test_creation() { /* ... */ }
    }

    mod integration_tests {
        use super::*;

        // Mark integration tests with #[ignore] — run with `horus test --integration`
        #[test]
        #[ignore]
        fn test_full_pipeline() { /* ... */ }
    }
}

Single-Tick Testing with tick_once()

For simulation and fine-grained testing, tick_once() executes exactly one scheduler tick cycle and returns. This gives you full control over the execution loop.

tick_once()

Execute all registered nodes exactly once in priority order:

#[test]
fn test_single_tick_behavior() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();

    // Execute one tick — all nodes run once in priority order
    scheduler.tick_once().unwrap();

    // Verify state after exactly one tick
    // ...
}

tick_once_nodes()

Execute only specific nodes by name. Non-existent names are silently ignored:

#[test]
fn test_selective_tick() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
    scheduler.add(LoggerNode::new().unwrap()).order(2).build().unwrap();

    // Tick only the sensor — control and logger are skipped
    scheduler.tick_once_nodes(&["SensorNode"]).unwrap();

    // Tick sensor + control, skip logger
    scheduler.tick_once_nodes(&["SensorNode", "ControlNode"]).unwrap();
}

Simulation Loop Pattern

Use tick_once() to integrate HORUS nodes into a simulation loop:

fn run_simulation() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(MotorController::new()?).order(0).build()?;
    scheduler.add(SensorFusion::new()?).order(1).build()?;

    let dt = Duration::from_millis(10); // 100 Hz simulation

    for step in 0..1000 {
        // 1. Step physics simulation
        sim.step(dt);

        // 2. Run HORUS nodes for this timestep
        scheduler.tick_once()?;

        // 3. Render or collect results
        sim.render();
    }

    Ok(())
}

Lazy Initialization

tick_once() and tick_once_nodes() lazily initialize nodes on the first call — you don't need to call run() or any separate init method. The first tick_once() call runs init() on all nodes, then executes the tick.

Time-Limited Test Runs

Use scheduler.run_for() to run tests for a fixed duration:

#[test]
fn test_system_runs_for_one_second() {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
    scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();

    // Run for exactly 1 second, then shutdown gracefully
    scheduler.run_for(std::time::Duration::from_secs(1)).unwrap();
}

Record/Replay Testing

HORUS supports recording node execution and replaying it later for deterministic debugging. This is useful for reproducing bugs and regression testing.

Recording a Session

Record all node inputs/outputs during a run:

# Record while running
horus run --record

Recordings are saved to ~/.horus/recordings/<session>/ in binary format (.horus files). Each node gets its own recording file:

~/.horus/recordings/my_session/
├── sensor_node@abc123.horus       # Node recording
├── control_node@def456.horus      # Node recording
└── scheduler@main789.horus        # Scheduler execution order

Replaying in Tests

Use scheduler.add_replay() to replay a recorded node:

use std::path::PathBuf;

#[test]
fn test_replay_crash_scenario() {
    let mut scheduler = Scheduler::new();

    // Replay the motor node from a crash recording
    scheduler.add_replay(
        PathBuf::from("~/.horus/recordings/crash/motor_node@abc123.horus"),
        1,  // priority
    ).unwrap();

    // Add a live node to test against the recorded data
    scheduler.add(DiagnosticNode::new().unwrap()).order(2).build().unwrap();

    scheduler.run_for(std::time::Duration::from_secs(5)).unwrap();
}

Replay Modes

ModeDescription
Full replayReplay all nodes from a scheduler recording
Mixed replayReplay some nodes while others run live
Range replayReplay only a specific tick range

Managing Recordings

# List recording sessions
ls ~/.horus/recordings/

# Recordings auto-cap at 100MB per node
# Delete old recordings to free space

Troubleshooting Tests

Issue: Tests Fail Randomly

Cause: Shared memory conflicts from parallel tests

Fix:

# horus test defaults to single-threaded, but if you used --parallel:
horus test --test-threads 1

Issue: "Topic not found" Errors

Cause: Topic created in one test affects another

Fix: Use unique topic names per test:

Topic::new("test_topic_1")?  // Test 1
Topic::new("test_topic_2")?  // Test 2

Issue: Messages Not Received

Cause: IPC needs time to propagate

Fix: Add small delay:

thread::sleep(Duration::from_millis(10));

Next Steps