Testing HORUS Applications
You need to verify that your HORUS nodes work correctly before deploying to a real robot. Here is how to write unit tests, integration tests, and simulation tests using Rust's built-in test framework and HORUS testing utilities.
When To Use This
- Writing unit tests for individual node logic
- Testing multi-node pipelines with real shared memory IPC
- Using
tick_once()for deterministic single-tick testing - Setting up CI/CD test pipelines with
horus test - Recording and replaying sessions for regression testing
Use Debugging Workflows instead if you need to diagnose a problem in a running system rather than write tests.
Prerequisites
- A HORUS project with
horus.toml(see Quick Start) - Familiarity with Nodes and Topics
- Rust testing basics (
#[test],#[cfg(test)],assert!)
Testing Strategies
| Strategy | When to Use | Complexity |
|---|---|---|
| Unit test (single node) | Test node logic in isolation | Low |
| Integration test (multi-node) | Test nodes communicating through topics | Medium |
| Business logic extraction | Test pure functions without topics | Lowest |
tick_once() testing | Deterministic single-tick scheduler tests | Medium |
| Record/replay | Reproduce bugs and regression test | High |
Unit Testing a Single Node
Test individual node behavior in isolation.
Example: Testing a Temperature Sensor
File: src/main.rs
// simplified
use horus::prelude::*;
// The node we want to test
pub struct TemperatureSensor {
temp_pub: Topic<f32>,
reading: f32,
}
impl TemperatureSensor {
pub fn new() -> Result<Self> {
Ok(Self {
temp_pub: Topic::new("temperature")?,
reading: 20.0,
})
}
// Make this public so tests can inspect it
pub fn get_reading(&self) -> f32 {
self.reading
}
}
impl Node for TemperatureSensor {
fn name(&self) -> &str {
"TemperatureSensor"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Sensor initialized");
Ok(())
}
fn tick(&mut self) {
// Increment reading each tick
self.reading += 0.5;
// Publish temperature
self.temp_pub.send(self.reading);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor shutdown");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(TemperatureSensor::new()?).order(0).build()?;
scheduler.run()?;
Ok(())
}
// ============================================================================
// TESTS
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sensor_initialization() {
let sensor = TemperatureSensor::new().unwrap();
assert_eq!(sensor.get_reading(), 20.0);
}
#[test]
fn test_sensor_tick_increments_reading() {
let mut sensor = TemperatureSensor::new().unwrap();
for i in 1..=5 {
sensor.tick();
let expected = 20.0 + (i as f32 * 0.5);
assert_eq!(sensor.get_reading(), expected);
}
}
}
Run the Tests
horus test
Expected Output:
running 2 tests
test tests::test_sensor_initialization ... ok
test tests::test_sensor_tick_increments_reading ... ok
test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured
Key Testing Patterns
1. Test Node Creation:
// simplified
#[test]
fn test_node_creation() {
let node = MyNode::new().unwrap();
assert_eq!(node.some_field, expected_value);
}
2. Test Initialization:
// simplified
#[test]
fn test_init() {
let mut node = MyNode::new().unwrap();
assert!(node.init().is_ok());
}
3. Test Tick Logic:
// simplified
#[test]
fn test_tick() {
let mut node = MyNode::new().unwrap();
node.tick();
// Verify state changes
assert_eq!(node.counter, 1);
}
4. Test Shutdown:
// simplified
#[test]
fn test_shutdown() {
let mut node = MyNode::new().unwrap();
assert!(node.shutdown().is_ok());
}
Testing Multiple Nodes Together
Test nodes communicating through topics.
Example: Publisher-Subscriber Test
File: src/main.rs
// simplified
use horus::prelude::*;
use std::sync::{Arc, Mutex};
// Publisher node
pub struct PublisherNode {
data_pub: Topic<f32>,
}
impl PublisherNode {
pub fn new() -> Result<Self> {
Ok(Self {
data_pub: Topic::new("test_data")?,
})
}
}
impl Node for PublisherNode {
fn name(&self) -> &str {
"PublisherNode"
}
fn tick(&mut self) {
self.data_pub.send(42.0);
}
}
// Subscriber node that stores received data
pub struct SubscriberNode {
data_sub: Topic<f32>,
received: Arc<Mutex<Vec<f32>>>,
}
impl SubscriberNode {
pub fn new(received: Arc<Mutex<Vec<f32>>>) -> Result<Self> {
Ok(Self {
data_sub: Topic::new("test_data")?,
received,
})
}
}
impl Node for SubscriberNode {
fn name(&self) -> &str {
"SubscriberNode"
}
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
self.received.lock().unwrap().push(data);
}
}
}
fn main() -> Result<()> {
let received = Arc::new(Mutex::new(Vec::new()));
let mut scheduler = Scheduler::new();
scheduler.add(PublisherNode::new()?).order(0).build()?;
scheduler.add(SubscriberNode::new(received)?).order(1).build()?;
scheduler.run()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_pubsub_communication() {
// Shared storage for received messages
let received = Arc::new(Mutex::new(Vec::new()));
// Create publisher and subscriber
let mut pub_node = PublisherNode::new().unwrap();
let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();
// Publish a message
pub_node.tick();
// Small delay to allow IPC (shared memory needs time to propagate)
thread::sleep(Duration::from_millis(10));
// Subscriber receives the message
sub_node.tick();
// Verify message was received
let data = received.lock().unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0], 42.0);
}
#[test]
fn test_multiple_messages() {
let received = Arc::new(Mutex::new(Vec::new()));
let mut pub_node = PublisherNode::new().unwrap();
let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();
// Publish 5 messages
for _ in 0..5 {
pub_node.tick();
thread::sleep(Duration::from_millis(5));
sub_node.tick();
}
// Verify all messages received
let data = received.lock().unwrap();
assert_eq!(data.len(), 5);
for value in data.iter() {
assert_eq!(*value, 42.0);
}
}
}
Run Integration Tests
horus test test_pubsub_communication --test-threads 1
Why --test-threads 1?
- Prevents tests from running in parallel (this is the default for
horus test) - Avoids shared memory conflicts between tests
- Ensures deterministic behavior
Expected Output:
running 2 tests
test tests::test_pubsub_communication ... ok
test tests::test_multiple_messages ... ok
test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured
Testing Business Logic in Isolation
Test node logic without real Topic connections.
Example: Extracting Testable Logic
// simplified
use horus::prelude::*;
// Node that processes temperature data
pub struct TemperatureProcessor {
input_sub: Topic<f32>,
output_pub: Topic<f32>,
}
impl TemperatureProcessor {
pub fn new() -> Result<Self> {
Ok(Self {
input_sub: Topic::new("input_temp")?,
output_pub: Topic::new("output_temp")?,
})
}
// Public method for testing business logic
pub fn process_temperature(&self, temp: f32) -> f32 {
// Convert Celsius to Fahrenheit
temp * 9.0 / 5.0 + 32.0
}
}
impl Node for TemperatureProcessor {
fn name(&self) -> &str {
"TemperatureProcessor"
}
fn tick(&mut self) {
if let Some(celsius) = self.input_sub.recv() {
let fahrenheit = self.process_temperature(celsius);
self.output_pub.send(fahrenheit);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_temperature_conversion_logic() {
// Test business logic WITHOUT Topic
let processor = TemperatureProcessor::new().unwrap();
// Test known conversions
assert_eq!(processor.process_temperature(0.0), 32.0);
assert_eq!(processor.process_temperature(100.0), 212.0);
assert_eq!(processor.process_temperature(-40.0), -40.0);
}
#[test]
fn test_with_mock_data() {
let mut processor = TemperatureProcessor::new().unwrap();
// We can't easily mock Topic, but we can test the logic
// by calling process_temperature directly
let celsius_readings = vec![0.0, 10.0, 20.0, 30.0, 100.0];
let expected_fahrenheit = vec![32.0, 50.0, 68.0, 86.0, 212.0];
for (celsius, expected) in celsius_readings.iter().zip(expected_fahrenheit.iter()) {
let result = processor.process_temperature(*celsius);
assert_eq!(result, *expected);
}
}
}
Testing Strategy Without Topic Mocks
Since HORUS Topics use real shared memory, full mocking is complex. Instead:
1. Extract Business Logic:
// simplified
// Good: Business logic in testable method
pub fn process_temperature(&self, temp: f32) -> f32 {
temp * 9.0 / 5.0 + 32.0
}
// Test this directly without Topic
#[test]
fn test_logic() {
let node = TemperatureProcessor::new().unwrap();
assert_eq!(node.process_temperature(0.0), 32.0);
}
2. Test Tick with Real Topics:
// simplified
// Topics are lightweight — use real ones in tests
#[test]
fn test_with_real_topic() {
let mut node = MyNode::new().unwrap();
node.tick();
// Verify behavior
}
3. Use Shared State for Verification:
// simplified
// Store results in node for verification
pub struct TestNode {
pub last_result: Option<f32>,
}
#[test]
fn test_result() {
let mut node = TestNode::new();
node.tick();
assert_eq!(node.last_result, Some(42.0));
}
Complete Testing Example
A fully tested 3-node system.
File: src/main.rs
// simplified
use horus::prelude::*;
use std::sync::{Arc, Mutex};
// Node 1: Generate numbers
pub struct GeneratorNode {
output_pub: Topic<u32>,
counter: u32,
}
impl GeneratorNode {
pub fn new() -> Result<Self> {
Ok(Self {
output_pub: Topic::new("numbers")?,
counter: 0,
})
}
}
impl Node for GeneratorNode {
fn name(&self) -> &str { "GeneratorNode" }
fn tick(&mut self) {
self.counter += 1;
self.output_pub.send(self.counter);
}
}
// Node 2: Double the numbers
pub struct DoublerNode {
input_sub: Topic<u32>,
output_pub: Topic<u32>,
}
impl DoublerNode {
pub fn new() -> Result<Self> {
Ok(Self {
input_sub: Topic::new("numbers")?,
output_pub: Topic::new("doubled")?,
})
}
}
impl Node for DoublerNode {
fn name(&self) -> &str { "DoublerNode" }
fn tick(&mut self) {
if let Some(n) = self.input_sub.recv() {
self.output_pub.send(n * 2);
}
}
}
// Node 3: Collect results
pub struct CollectorNode {
input_sub: Topic<u32>,
collected: Arc<Mutex<Vec<u32>>>,
}
impl CollectorNode {
pub fn new(collected: Arc<Mutex<Vec<u32>>>) -> Result<Self> {
Ok(Self {
input_sub: Topic::new("doubled")?,
collected,
})
}
}
impl Node for CollectorNode {
fn name(&self) -> &str { "CollectorNode" }
fn tick(&mut self) {
if let Some(n) = self.input_sub.recv() {
self.collected.lock().unwrap().push(n);
}
}
}
fn main() -> Result<()> {
let collected = Arc::new(Mutex::new(Vec::new()));
let mut scheduler = Scheduler::new();
scheduler.add(GeneratorNode::new()?).order(0).build()?;
scheduler.add(DoublerNode::new()?).order(1).build()?;
scheduler.add(CollectorNode::new(collected)?).order(2).build()?;
scheduler.run()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_generator_node() {
let mut node = GeneratorNode::new().unwrap();
// Initial state
assert_eq!(node.counter, 0);
// After 3 ticks
for _ in 0..3 {
node.tick();
}
assert_eq!(node.counter, 3);
}
#[test]
fn test_pipeline() {
let collected = Arc::new(Mutex::new(Vec::new()));
let mut gen = GeneratorNode::new().unwrap();
let mut dbl = DoublerNode::new().unwrap();
let mut col = CollectorNode::new(Arc::clone(&collected)).unwrap();
// Run 5 iterations of the pipeline
for _ in 0..5 {
gen.tick();
thread::sleep(Duration::from_millis(5));
dbl.tick();
thread::sleep(Duration::from_millis(5));
col.tick();
}
// Verify results: 1*2=2, 2*2=4, 3*2=6, 4*2=8, 5*2=10
let results = collected.lock().unwrap();
assert_eq!(*results, vec![2, 4, 6, 8, 10]);
}
}
Run All Tests
horus test --test-threads 1
Output:
running 2 tests
test tests::test_generator_node ... ok
test tests::test_pipeline ... ok
test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured
Best Practices
1. Test Business Logic Separately
Extract pure functions for easy testing:
// simplified
// Good: Pure function (easy to test)
fn calculate_velocity(distance: f32, time: f32) -> f32 {
distance / time
}
#[test]
fn test_velocity() {
assert_eq!(calculate_velocity(100.0, 10.0), 10.0);
}
2. Use Arc for Shared Test Data
Share data between nodes for verification:
// simplified
let results = Arc::new(Mutex::new(Vec::new()));
let node = TestNode::new(Arc::clone(&results))?;
// Later in test
assert_eq!(results.lock().unwrap().len(), 5);
3. Add Small Delays for IPC
Shared memory needs time to propagate:
// simplified
pub_node.tick();
thread::sleep(Duration::from_millis(10)); // Allow IPC
sub_node.tick();
4. Run Tests Sequentially
horus test defaults to single-threaded execution to prevent shared memory conflicts. If you use --parallel, ensure each test uses unique topic names:
horus test # Already single-threaded by default
5. Test Edge Cases
// simplified
#[test]
fn test_no_messages() {
let mut node = SubscriberNode::new().unwrap();
node.tick(); // Should handle gracefully
}
#[test]
fn test_invalid_data() {
let result = node.process(-1.0);
assert!(result.is_err());
}
Running Tests with horus test
Basic Commands
# Run all tests (defaults to single-threaded for shared memory safety)
horus test
# Run specific test by name filter
horus test test_sensor_initialization
# Run tests matching a pattern
horus test sensor
# Show test output (println!, hlog!, etc.)
horus test --nocapture
# Run with multiple threads (override default single-threaded mode)
horus test --parallel
horus test --test-threads 4
# Run in release mode (optimized build)
horus test --release
# Skip the build step (use existing build artifacts)
horus test --no-build
# Skip shared memory cleanup after tests
horus test --no-cleanup
# Verbose output
horus test -v
# Run integration tests (tests marked #[ignore])
horus test --integration
# Enable simulation drivers (no hardware required)
horus test --sim
Test Organization
// simplified
#[cfg(test)]
mod tests {
use super::*;
mod unit_tests {
use super::*;
#[test]
fn test_creation() { /* ... */ }
}
mod integration_tests {
use super::*;
// Mark integration tests with #[ignore] — run with `horus test --integration`
#[test]
#[ignore]
fn test_full_pipeline() { /* ... */ }
}
}
Single-Tick Testing with tick_once()
For simulation and fine-grained testing, tick_once() executes exactly one scheduler tick cycle and returns. This gives you full control over the execution loop.
tick_once()
Execute all registered nodes exactly once in priority order:
// simplified
#[test]
fn test_single_tick_behavior() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
// Execute one tick — all nodes run once in priority order
scheduler.tick_once().unwrap();
// Verify state after exactly one tick
// ...
}
tick()
Execute only specific nodes by name. Non-existent names are silently ignored:
// simplified
#[test]
fn test_selective_tick() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
scheduler.add(LoggerNode::new().unwrap()).order(2).build().unwrap();
// Tick only the sensor — control and logger are skipped
scheduler.tick(&["SensorNode"]).unwrap();
// Tick sensor + control, skip logger
scheduler.tick(&["SensorNode", "ControlNode"]).unwrap();
}
Simulation Loop Pattern
Use tick_once() to integrate HORUS nodes into a simulation loop:
// simplified
fn run_simulation() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(MotorController::new()?).order(0).build()?;
scheduler.add(SensorFusion::new()?).order(1).build()?;
let dt = Duration::from_millis(10); // 100 Hz simulation
for step in 0..1000 {
// 1. Step physics simulation
sim.step(dt);
// 2. Run HORUS nodes for this timestep
scheduler.tick_once()?;
// 3. Render or collect results
sim.render();
}
Ok(())
}
Lazy Initialization
tick_once() and tick() lazily initialize nodes on the first call — you don't need to call run() or any separate init method. The first tick_once() call runs init() on all nodes, then executes the tick.
Time-Limited Test Runs
Use scheduler.run_for() to run tests for a fixed duration:
// simplified
#[test]
fn test_system_runs_for_one_second() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
// Run for exactly 1 second, then shutdown gracefully
scheduler.run_for(std::time::Duration::from_secs(1)).unwrap();
}
Record/Replay Testing
HORUS supports recording node execution and replaying it later for deterministic debugging. This is useful for reproducing bugs and regression testing.
Recording a Session
Record all node inputs/outputs during a run:
# Record while running
horus run --record
Recordings are saved to ~/.horus/recordings/<session>/ in binary format (.horus files). Each node gets its own recording file:
~/.horus/recordings/my_session/
├── sensor_node@abc123.horus # Node recording
├── control_node@def456.horus # Node recording
└── scheduler@main789.horus # Scheduler execution order
Replaying in Tests
Use scheduler.add_replay() to replay a recorded node:
// simplified
use std::path::PathBuf;
#[test]
fn test_replay_crash_scenario() {
let mut scheduler = Scheduler::new();
// Replay the motor node from a crash recording
scheduler.add_replay(
PathBuf::from("~/.horus/recordings/crash/motor_node@abc123.horus"),
1, // priority
).unwrap();
// Add a live node to test against the recorded data
scheduler.add(DiagnosticNode::new().unwrap()).order(2).build().unwrap();
scheduler.run_for(std::time::Duration::from_secs(5)).unwrap();
}
Replay Modes
| Mode | Description |
|---|---|
| Full replay | Replay all nodes from a scheduler recording |
| Mixed replay | Replay some nodes while others run live |
| Range replay | Replay only a specific tick range |
Managing Recordings
# List recording sessions
ls ~/.horus/recordings/
# Recordings auto-cap at 100MB per node
# Delete old recordings to free space
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Tests fail randomly | Shared memory conflicts from parallel tests | Use horus test --test-threads 1 (default) or unique topic names per test |
| "Topic not found" errors | Topic created in one test leaks into another | Use unique topic names: Topic::new("test_topic_1")? |
| Messages not received | IPC needs time to propagate through shared memory | Add thread::sleep(Duration::from_millis(10)) between send and recv |
| Test hangs forever | scheduler.run() blocks indefinitely | Use scheduler.run_for(Duration::from_secs(1)) or tick_once() instead |
| Compilation errors in tests | Missing use super::* or use horus::prelude::* | Ensure test module imports parent scope and prelude |
| Shared memory permission error | Previous test run crashed without cleanup | Run horus clean --shm before testing |
See Also
- CLI Reference: horus test — Full test command options and flags
- Scheduler API: tick_once() — Single-tick execution for deterministic testing
- Deterministic Mode — Reproducible test runs with fixed ordering
- Debugging Workflows — Step-by-step diagnosis for runtime issues
- Record/Replay — Record sessions for regression testing