Testing HORUS Applications
Learn how to test your HORUS nodes and applications with complete, runnable examples using Rust's built-in test framework.
Why Test HORUS Nodes?
Testing ensures:
- Nodes work in isolation before integration
- Message passing is correct (right topics, right types)
- Lifecycle methods behave properly (init, tick, shutdown)
- Edge cases are handled (no messages, invalid data, etc.)
- Refactoring doesn't break functionality
Testing Strategies
1. Unit Testing a Single Node
Test node behavior without running the scheduler.
2. Integration Testing Multiple Nodes
Test nodes communicating through the Topic.
3. Testing Business Logic in Isolation
Extract and test business logic without Topic dependencies.
Unit Testing a Single Node
Test individual node behavior in isolation.
Example: Testing a Temperature Sensor
File: src/main.rs
use horus::prelude::*;
// The node we want to test
pub struct TemperatureSensor {
temp_pub: Topic<f32>,
reading: f32,
}
impl TemperatureSensor {
pub fn new() -> Result<Self> {
Ok(Self {
temp_pub: Topic::new("temperature")?,
reading: 20.0,
})
}
// Make this public so tests can inspect it
pub fn get_reading(&self) -> f32 {
self.reading
}
}
impl Node for TemperatureSensor {
fn name(&self) -> &str {
"TemperatureSensor"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Sensor initialized");
Ok(())
}
fn tick(&mut self) {
// Increment reading each tick
self.reading += 0.5;
// Publish temperature
self.temp_pub.send(self.reading);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor shutdown");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(TemperatureSensor::new()?).order(0).build()?;
scheduler.run()?;
Ok(())
}
// ============================================================================
// TESTS
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sensor_initialization() {
// Test that sensor initializes with correct default value
let sensor = TemperatureSensor::new().unwrap();
assert_eq!(sensor.get_reading(), 20.0);
}
#[test]
fn test_sensor_init_lifecycle() {
let mut sensor = TemperatureSensor::new().unwrap();
// Test init() method
let result = sensor.init();
assert!(result.is_ok());
}
#[test]
fn test_sensor_tick_increments_reading() {
let mut sensor = TemperatureSensor::new().unwrap();
// Run tick 5 times
for i in 1..=5 {
sensor.tick();
// Verify reading increments by 0.5 each tick
let expected = 20.0 + (i as f32 * 0.5);
assert_eq!(sensor.get_reading(), expected);
}
}
#[test]
fn test_sensor_shutdown() {
let mut sensor = TemperatureSensor::new().unwrap();
// Test shutdown() method
let result = sensor.shutdown();
assert!(result.is_ok());
}
}
Run the Tests
horus test
Expected Output:
running 4 tests
test tests::test_sensor_initialization ... ok
test tests::test_sensor_init_lifecycle ... ok
test tests::test_sensor_tick_increments_reading ... ok
test tests::test_sensor_shutdown ... ok
test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured
Key Testing Patterns
1. Test Node Creation:
#[test]
fn test_node_creation() {
let node = MyNode::new().unwrap();
assert_eq!(node.some_field, expected_value);
}
2. Test Initialization:
#[test]
fn test_init() {
let mut node = MyNode::new().unwrap();
assert!(node.init().is_ok());
}
3. Test Tick Logic:
#[test]
fn test_tick() {
let mut node = MyNode::new().unwrap();
node.tick();
// Verify state changes
assert_eq!(node.counter, 1);
}
4. Test Shutdown:
#[test]
fn test_shutdown() {
let mut node = MyNode::new().unwrap();
assert!(node.shutdown().is_ok());
}
Testing Multiple Nodes Together
Test nodes communicating through topics.
Example: Publisher-Subscriber Test
File: src/main.rs
use horus::prelude::*;
use std::sync::{Arc, Mutex};
// Publisher node
pub struct PublisherNode {
data_pub: Topic<f32>,
}
impl PublisherNode {
pub fn new() -> Result<Self> {
Ok(Self {
data_pub: Topic::new("test_data")?,
})
}
}
impl Node for PublisherNode {
fn name(&self) -> &str {
"PublisherNode"
}
fn tick(&mut self) {
self.data_pub.send(42.0);
}
}
// Subscriber node that stores received data
pub struct SubscriberNode {
data_sub: Topic<f32>,
received: Arc<Mutex<Vec<f32>>>,
}
impl SubscriberNode {
pub fn new(received: Arc<Mutex<Vec<f32>>>) -> Result<Self> {
Ok(Self {
data_sub: Topic::new("test_data")?,
received,
})
}
}
impl Node for SubscriberNode {
fn name(&self) -> &str {
"SubscriberNode"
}
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
self.received.lock().unwrap().push(data);
}
}
}
fn main() -> Result<()> {
let received = Arc::new(Mutex::new(Vec::new()));
let mut scheduler = Scheduler::new();
scheduler.add(PublisherNode::new()?).order(0).build()?;
scheduler.add(SubscriberNode::new(received)?).order(1).build()?;
scheduler.run()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_pubsub_communication() {
// Shared storage for received messages
let received = Arc::new(Mutex::new(Vec::new()));
// Create publisher and subscriber
let mut pub_node = PublisherNode::new().unwrap();
let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();
// Publish a message
pub_node.tick();
// Small delay to allow IPC (shared memory needs time to propagate)
thread::sleep(Duration::from_millis(10));
// Subscriber receives the message
sub_node.tick();
// Verify message was received
let data = received.lock().unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0], 42.0);
}
#[test]
fn test_multiple_messages() {
let received = Arc::new(Mutex::new(Vec::new()));
let mut pub_node = PublisherNode::new().unwrap();
let mut sub_node = SubscriberNode::new(Arc::clone(&received)).unwrap();
// Publish 5 messages
for _ in 0..5 {
pub_node.tick();
thread::sleep(Duration::from_millis(5));
sub_node.tick();
}
// Verify all messages received
let data = received.lock().unwrap();
assert_eq!(data.len(), 5);
for value in data.iter() {
assert_eq!(*value, 42.0);
}
}
}
Run Integration Tests
horus test test_pubsub_communication --test-threads 1
Why --test-threads 1?
- Prevents tests from running in parallel (this is the default for
horus test) - Avoids shared memory conflicts between tests
- Ensures deterministic behavior
Expected Output:
running 2 tests
test tests::test_pubsub_communication ... ok
test tests::test_multiple_messages ... ok
test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured
Testing Business Logic in Isolation
Test node logic without real Topic connections.
Example: Extracting Testable Logic
use horus::prelude::*;
// Node that processes temperature data
pub struct TemperatureProcessor {
input_sub: Topic<f32>,
output_pub: Topic<f32>,
}
impl TemperatureProcessor {
pub fn new() -> Result<Self> {
Ok(Self {
input_sub: Topic::new("input_temp")?,
output_pub: Topic::new("output_temp")?,
})
}
// Public method for testing business logic
pub fn process_temperature(&self, temp: f32) -> f32 {
// Convert Celsius to Fahrenheit
temp * 9.0 / 5.0 + 32.0
}
}
impl Node for TemperatureProcessor {
fn name(&self) -> &str {
"TemperatureProcessor"
}
fn tick(&mut self) {
if let Some(celsius) = self.input_sub.recv() {
let fahrenheit = self.process_temperature(celsius);
self.output_pub.send(fahrenheit);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_temperature_conversion_logic() {
// Test business logic WITHOUT Topic
let processor = TemperatureProcessor::new().unwrap();
// Test known conversions
assert_eq!(processor.process_temperature(0.0), 32.0);
assert_eq!(processor.process_temperature(100.0), 212.0);
assert_eq!(processor.process_temperature(-40.0), -40.0);
}
#[test]
fn test_with_mock_data() {
let mut processor = TemperatureProcessor::new().unwrap();
// We can't easily mock Topic, but we can test the logic
// by calling process_temperature directly
let celsius_readings = vec![0.0, 10.0, 20.0, 30.0, 100.0];
let expected_fahrenheit = vec![32.0, 50.0, 68.0, 86.0, 212.0];
for (celsius, expected) in celsius_readings.iter().zip(expected_fahrenheit.iter()) {
let result = processor.process_temperature(*celsius);
assert_eq!(result, *expected);
}
}
}
Testing Strategy Without Topic Mocks
Since HORUS Topics use real shared memory, full mocking is complex. Instead:
1. Extract Business Logic:
// Good: Business logic in testable method
pub fn process_temperature(&self, temp: f32) -> f32 {
temp * 9.0 / 5.0 + 32.0
}
// Test this directly without Topic
#[test]
fn test_logic() {
let node = TemperatureProcessor::new().unwrap();
assert_eq!(node.process_temperature(0.0), 32.0);
}
2. Test Tick with Real Topics:
// Topics are lightweight — use real ones in tests
#[test]
fn test_with_real_topic() {
let mut node = MyNode::new().unwrap();
node.tick();
// Verify behavior
}
3. Use Shared State for Verification:
// Store results in node for verification
pub struct TestNode {
pub last_result: Option<f32>,
}
#[test]
fn test_result() {
let mut node = TestNode::new();
node.tick();
assert_eq!(node.last_result, Some(42.0));
}
Complete Testing Example
A fully tested 3-node system.
File: src/main.rs
use horus::prelude::*;
use std::sync::{Arc, Mutex};
// Node 1: Generate numbers
pub struct GeneratorNode {
output_pub: Topic<u32>,
counter: u32,
}
impl GeneratorNode {
pub fn new() -> Result<Self> {
Ok(Self {
output_pub: Topic::new("numbers")?,
counter: 0,
})
}
}
impl Node for GeneratorNode {
fn name(&self) -> &str { "GeneratorNode" }
fn tick(&mut self) {
self.counter += 1;
self.output_pub.send(self.counter);
}
}
// Node 2: Double the numbers
pub struct DoublerNode {
input_sub: Topic<u32>,
output_pub: Topic<u32>,
}
impl DoublerNode {
pub fn new() -> Result<Self> {
Ok(Self {
input_sub: Topic::new("numbers")?,
output_pub: Topic::new("doubled")?,
})
}
}
impl Node for DoublerNode {
fn name(&self) -> &str { "DoublerNode" }
fn tick(&mut self) {
if let Some(n) = self.input_sub.recv() {
self.output_pub.send(n * 2);
}
}
}
// Node 3: Collect results
pub struct CollectorNode {
input_sub: Topic<u32>,
collected: Arc<Mutex<Vec<u32>>>,
}
impl CollectorNode {
pub fn new(collected: Arc<Mutex<Vec<u32>>>) -> Result<Self> {
Ok(Self {
input_sub: Topic::new("doubled")?,
collected,
})
}
}
impl Node for CollectorNode {
fn name(&self) -> &str { "CollectorNode" }
fn tick(&mut self) {
if let Some(n) = self.input_sub.recv() {
self.collected.lock().unwrap().push(n);
}
}
}
fn main() -> Result<()> {
let collected = Arc::new(Mutex::new(Vec::new()));
let mut scheduler = Scheduler::new();
scheduler.add(GeneratorNode::new()?).order(0).build()?;
scheduler.add(DoublerNode::new()?).order(1).build()?;
scheduler.add(CollectorNode::new(collected)?).order(2).build()?;
scheduler.run()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_generator_node() {
let mut node = GeneratorNode::new().unwrap();
// Initial state
assert_eq!(node.counter, 0);
// After 3 ticks
for _ in 0..3 {
node.tick();
}
assert_eq!(node.counter, 3);
}
#[test]
fn test_pipeline() {
let collected = Arc::new(Mutex::new(Vec::new()));
let mut gen = GeneratorNode::new().unwrap();
let mut dbl = DoublerNode::new().unwrap();
let mut col = CollectorNode::new(Arc::clone(&collected)).unwrap();
// Run 5 iterations of the pipeline
for _ in 0..5 {
gen.tick();
thread::sleep(Duration::from_millis(5));
dbl.tick();
thread::sleep(Duration::from_millis(5));
col.tick();
}
// Verify results: 1*2=2, 2*2=4, 3*2=6, 4*2=8, 5*2=10
let results = collected.lock().unwrap();
assert_eq!(*results, vec![2, 4, 6, 8, 10]);
}
}
Run All Tests
horus test --test-threads 1
Output:
running 2 tests
test tests::test_generator_node ... ok
test tests::test_pipeline ... ok
test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured
Best Practices
1. Test Business Logic Separately
Extract pure functions for easy testing:
// Good: Pure function (easy to test)
fn calculate_velocity(distance: f32, time: f32) -> f32 {
distance / time
}
#[test]
fn test_velocity() {
assert_eq!(calculate_velocity(100.0, 10.0), 10.0);
}
2. Use Arc for Shared Test Data
Share data between nodes for verification:
let results = Arc::new(Mutex::new(Vec::new()));
let node = TestNode::new(Arc::clone(&results))?;
// Later in test
assert_eq!(results.lock().unwrap().len(), 5);
3. Add Small Delays for IPC
Shared memory needs time to propagate:
pub_node.tick();
thread::sleep(Duration::from_millis(10)); // Allow IPC
sub_node.tick();
4. Run Tests Sequentially
horus test defaults to single-threaded execution to prevent shared memory conflicts. If you use --parallel, ensure each test uses unique topic names:
horus test # Already single-threaded by default
5. Test Edge Cases
#[test]
fn test_no_messages() {
let mut node = SubscriberNode::new().unwrap();
node.tick(); // Should handle gracefully
}
#[test]
fn test_invalid_data() {
let result = node.process(-1.0);
assert!(result.is_err());
}
Running Tests with horus test
Basic Commands
# Run all tests (defaults to single-threaded for shared memory safety)
horus test
# Run specific test by name filter
horus test test_sensor_initialization
# Run tests matching a pattern
horus test sensor
# Show test output (println!, hlog!, etc.)
horus test --nocapture
# Run with multiple threads (override default single-threaded mode)
horus test --parallel
horus test --test-threads 4
# Run in release mode (optimized build)
horus test --release
# Skip the build step (use existing build artifacts)
horus test --no-build
# Skip shared memory cleanup after tests
horus test --no-cleanup
# Verbose output
horus test -v
# Run integration tests (tests marked #[ignore])
horus test --integration
# Enable simulation drivers (no hardware required)
horus test --sim
Test Organization
#[cfg(test)]
mod tests {
use super::*;
mod unit_tests {
use super::*;
#[test]
fn test_creation() { /* ... */ }
}
mod integration_tests {
use super::*;
// Mark integration tests with #[ignore] — run with `horus test --integration`
#[test]
#[ignore]
fn test_full_pipeline() { /* ... */ }
}
}
Single-Tick Testing with tick_once()
For simulation and fine-grained testing, tick_once() executes exactly one scheduler tick cycle and returns. This gives you full control over the execution loop.
tick_once()
Execute all registered nodes exactly once in priority order:
#[test]
fn test_single_tick_behavior() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
// Execute one tick — all nodes run once in priority order
scheduler.tick_once().unwrap();
// Verify state after exactly one tick
// ...
}
tick_once_nodes()
Execute only specific nodes by name. Non-existent names are silently ignored:
#[test]
fn test_selective_tick() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
scheduler.add(LoggerNode::new().unwrap()).order(2).build().unwrap();
// Tick only the sensor — control and logger are skipped
scheduler.tick_once_nodes(&["SensorNode"]).unwrap();
// Tick sensor + control, skip logger
scheduler.tick_once_nodes(&["SensorNode", "ControlNode"]).unwrap();
}
Simulation Loop Pattern
Use tick_once() to integrate HORUS nodes into a simulation loop:
fn run_simulation() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(MotorController::new()?).order(0).build()?;
scheduler.add(SensorFusion::new()?).order(1).build()?;
let dt = Duration::from_millis(10); // 100 Hz simulation
for step in 0..1000 {
// 1. Step physics simulation
sim.step(dt);
// 2. Run HORUS nodes for this timestep
scheduler.tick_once()?;
// 3. Render or collect results
sim.render();
}
Ok(())
}
Lazy Initialization
tick_once() and tick_once_nodes() lazily initialize nodes on the first call — you don't need to call run() or any separate init method. The first tick_once() call runs init() on all nodes, then executes the tick.
Time-Limited Test Runs
Use scheduler.run_for() to run tests for a fixed duration:
#[test]
fn test_system_runs_for_one_second() {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new().unwrap()).order(0).build().unwrap();
scheduler.add(ControlNode::new().unwrap()).order(1).build().unwrap();
// Run for exactly 1 second, then shutdown gracefully
scheduler.run_for(std::time::Duration::from_secs(1)).unwrap();
}
Record/Replay Testing
HORUS supports recording node execution and replaying it later for deterministic debugging. This is useful for reproducing bugs and regression testing.
Recording a Session
Record all node inputs/outputs during a run:
# Record while running
horus run --record
Recordings are saved to ~/.horus/recordings/<session>/ in binary format (.horus files). Each node gets its own recording file:
~/.horus/recordings/my_session/
├── sensor_node@abc123.horus # Node recording
├── control_node@def456.horus # Node recording
└── scheduler@main789.horus # Scheduler execution order
Replaying in Tests
Use scheduler.add_replay() to replay a recorded node:
use std::path::PathBuf;
#[test]
fn test_replay_crash_scenario() {
let mut scheduler = Scheduler::new();
// Replay the motor node from a crash recording
scheduler.add_replay(
PathBuf::from("~/.horus/recordings/crash/motor_node@abc123.horus"),
1, // priority
).unwrap();
// Add a live node to test against the recorded data
scheduler.add(DiagnosticNode::new().unwrap()).order(2).build().unwrap();
scheduler.run_for(std::time::Duration::from_secs(5)).unwrap();
}
Replay Modes
| Mode | Description |
|---|---|
| Full replay | Replay all nodes from a scheduler recording |
| Mixed replay | Replay some nodes while others run live |
| Range replay | Replay only a specific tick range |
Managing Recordings
# List recording sessions
ls ~/.horus/recordings/
# Recordings auto-cap at 100MB per node
# Delete old recordings to free space
Troubleshooting Tests
Issue: Tests Fail Randomly
Cause: Shared memory conflicts from parallel tests
Fix:
# horus test defaults to single-threaded, but if you used --parallel:
horus test --test-threads 1
Issue: "Topic not found" Errors
Cause: Topic created in one test affects another
Fix: Use unique topic names per test:
Topic::new("test_topic_1")? // Test 1
Topic::new("test_topic_2")? // Test 2
Issue: Messages Not Received
Cause: IPC needs time to propagate
Fix: Add small delay:
thread::sleep(Duration::from_millis(10));
Next Steps
- Examples - See complete tested applications
- Core Concepts - Understand node lifecycle
- Monitor - Debug with visual monitoring
- CLI Reference - Full
horus testcommand reference