Advanced Examples
Advanced HORUS patterns for complex robotics systems. These examples demonstrate priority-based safety systems, multi-process architectures, and cross-language communication.
Prerequisites: Complete Basic Examples first.
1. State Machine Node
Implement complex behavior using state machines - ideal for autonomous robots with multiple operating modes.
File: state_machine.rs
use horus::prelude::*;
#[derive(Debug, Clone, Copy, PartialEq)]
enum RobotState {
Idle,
Moving,
ObstacleDetected,
Rotating,
Escaped,
}
struct StateMachineNode {
state: RobotState,
obstacle_sub: Topic<bool>,
cmd_pub: Topic<CmdVel>,
rotation_counter: u32,
}
impl StateMachineNode {
fn new() -> Result<Self> {
Ok(Self {
state: RobotState::Idle,
obstacle_sub: Topic::new("obstacle_detected")?,
cmd_pub: Topic::new("cmd_vel")?,
rotation_counter: 0,
})
}
}
impl Node for StateMachineNode {
fn name(&self) -> &str { "StateMachineNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "State machine initialized - starting in IDLE state");
Ok(())
}
fn tick(&mut self) {
// Check for obstacles
let obstacle = self.obstacle_sub.recv().unwrap_or(false);
// Store previous state for logging
let prev_state = self.state;
// State machine logic
self.state = match self.state {
RobotState::Idle => {
if !obstacle {
RobotState::Moving
} else {
RobotState::Idle
}
}
RobotState::Moving => {
if obstacle {
self.cmd_pub.send(CmdVel::zero()); // Stop
RobotState::ObstacleDetected
} else {
self.cmd_pub.send(CmdVel::new(1.0, 0.0)); // Forward
RobotState::Moving
}
}
RobotState::ObstacleDetected => {
self.rotation_counter = 0;
RobotState::Rotating
}
RobotState::Rotating => {
self.cmd_pub.send(CmdVel::new(0.0, 0.5)); // Rotate
self.rotation_counter += 1;
if self.rotation_counter > 50 {
RobotState::Escaped
} else {
RobotState::Rotating
}
}
RobotState::Escaped => {
RobotState::Moving // Resume moving
}
};
// Log state transitions
if self.state != prev_state {
hlog!(info, "State transition: {:?} -> {:?}",
prev_state, self.state);
}
}
fn shutdown(&mut self) -> Result<()> {
// Ensure robot is stopped
self.cmd_pub.send(CmdVel::zero());
hlog!(info, "State machine shutdown");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(StateMachineNode::new()?).order(0).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run state_machine.rs
Key Concepts:
- Enum for states:
Idle,Moving,ObstacleDetected,Rotating,Escaped - Match expression handles state transitions
- Each state defines behavior and next state
- Log state transitions for debugging
2. Priority-Based Safety System
Use node priorities to ensure safety-critical tasks always run first - essential for production robotics.
File: safety_system.rs
use horus::prelude::*;
// CRITICAL PRIORITY: Emergency stop
struct EmergencyStopNode {
battery_sub: Topic<BatteryState>,
lidar_sub: Topic<f32>, // Min obstacle distance
estop_pub: Topic<bool>,
estop_active: bool,
}
impl EmergencyStopNode {
fn new() -> Result<Self> {
Ok(Self {
battery_sub: Topic::new("battery_state")?,
lidar_sub: Topic::new("min_distance")?,
estop_pub: Topic::new("emergency_stop")?,
estop_active: false,
})
}
}
impl Node for EmergencyStopNode {
fn name(&self) -> &str { "EmergencyStop" }
fn init(&mut self) -> Result<()> {
hlog!(info, " Emergency stop system online - CRITICAL priority");
Ok(())
}
fn tick(&mut self) {
let mut should_stop = false;
// Check battery
if let Some(battery) = self.battery_sub.recv() {
if battery.is_critical() { // Below 10%
should_stop = true;
hlog!(error, " CRITICAL: Battery at {:.0}% - EMERGENCY STOP!",
battery.percentage);
}
}
// Check obstacle distance
if let Some(min_dist) = self.lidar_sub.recv() {
if min_dist < 0.2 { // 20cm
should_stop = true;
hlog!(error, " CRITICAL: Obstacle at {:.2}m - EMERGENCY STOP!",
min_dist);
}
}
// Publish estop state
if should_stop != self.estop_active {
self.estop_pub.send(should_stop);
self.estop_active = should_stop;
}
}
fn shutdown(&mut self) -> Result<()> {
// Always activate estop on shutdown
self.estop_pub.send(true);
hlog!(warn, "Emergency stop system offline");
Ok(())
}
}
// HIGH PRIORITY: Motor controller
struct MotorController {
estop_sub: Topic<bool>,
cmd_sub: Topic<CmdVel>,
motor_pub: Topic<CmdVel>,
estop_active: bool,
}
impl MotorController {
fn new() -> Result<Self> {
Ok(Self {
estop_sub: Topic::new("emergency_stop")?,
cmd_sub: Topic::new("cmd_vel_request")?,
motor_pub: Topic::new("cmd_vel_actual")?,
estop_active: false,
})
}
}
impl Node for MotorController {
fn name(&self) -> &str { "MotorController" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Motor controller online - HIGH priority");
Ok(())
}
fn tick(&mut self) {
// Check emergency stop FIRST
if let Some(estop) = self.estop_sub.recv() {
if estop != self.estop_active {
self.estop_active = estop;
if estop {
hlog!(warn, "Motors DISABLED - emergency stop active");
} else {
hlog!(info, "Motors ENABLED - emergency stop cleared");
}
}
}
// Don't move if estop active
if self.estop_active {
self.motor_pub.send(CmdVel::zero());
return;
}
// Process normal commands
if let Some(cmd) = self.cmd_sub.recv() {
self.motor_pub.send(cmd);
hlog!(debug, "Motors: linear={:.2}, angular={:.2}",
cmd.linear, cmd.angular);
}
}
fn shutdown(&mut self) -> Result<()> {
// Stop motors
self.motor_pub.send(CmdVel::zero());
hlog!(info, "Motor controller stopped");
Ok(())
}
}
// BACKGROUND PRIORITY: Data logging
struct LoggerNode {
cmd_sub: Topic<CmdVel>,
battery_sub: Topic<BatteryState>,
}
impl LoggerNode {
fn new() -> Result<Self> {
Ok(Self {
cmd_sub: Topic::new("cmd_vel_actual")?,
battery_sub: Topic::new("battery_state")?,
})
}
}
impl Node for LoggerNode {
fn name(&self) -> &str { "Logger" }
fn init(&mut self) -> Result<()> {
hlog!(info, " Logger online - BACKGROUND priority");
Ok(())
}
fn tick(&mut self) {
// Log velocity commands
if let Some(cmd) = self.cmd_sub.recv() {
hlog!(debug, "LOG: cmd_vel({:.2}, {:.2})",
cmd.linear, cmd.angular);
}
// Log battery state
if let Some(battery) = self.battery_sub.recv() {
hlog!(debug, "LOG: battery({:.1}V, {:.0}%)",
battery.voltage, battery.percentage);
}
// In production: write to file, database, etc.
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Logger stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Order 0 (Critical): Safety runs FIRST
scheduler.add(EmergencyStopNode::new()?).order(0).build()?;
// Order 1 (High): Control runs SECOND
scheduler.add(MotorController::new()?).order(1).build()?;
// Order 4 (Background): Logging runs LAST
scheduler.add(LoggerNode::new()?).order(4).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run safety_system.rs
Key Concepts:
- Priority 0 (Critical): Emergency stop - runs first, always
- Priority 1 (High): Motor control - runs after safety checks
- Priority 4 (Background): Logging - runs last, non-critical
- Lower number = higher priority
- Safety systems should always check estop before acting
3. Python Multi-Process System
Build a complete sensor monitoring system with Python nodes running as independent processes.
Project Structure
mkdir multi_node_system
cd multi_node_system
mkdir nodes
Sensor Node
nodes/sensor.py:
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
import random
class SensorNode(Node):
def __init__(self):
super().__init__("SensorNode")
def init(self):
self.temp_pub = Topic("temperature")
def tick(self):
# Generate realistic temperature with noise
temperature = 20.0 + random.random() * 10.0
self.temp_pub.send(temperature)
print(f"Sensor: {temperature:.1f}°C")
scheduler = Scheduler()
scheduler.node(SensorNode()).order(0).rate(2).build()
scheduler.run()
Controller Node
nodes/controller.py:
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
class ControllerNode(Node):
def __init__(self):
super().__init__("ControllerNode")
def init(self):
self.temp_sub = Topic("temperature")
self.fan_pub = Topic("fan_control")
def tick(self):
temp = self.temp_sub.recv()
if temp is not None:
if temp > 25.0:
fan_speed = min(100, int((temp - 20) * 10))
self.fan_pub.send(fan_speed)
print(f"Controller: Fan at {fan_speed}%")
else:
self.fan_pub.send(0)
print(f"Controller: Temperature normal, fan off")
scheduler = Scheduler()
scheduler.node(ControllerNode()).order(1).rate(2).build()
scheduler.run()
Logger Node
nodes/logger.py:
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
import datetime
class LoggerNode(Node):
def __init__(self):
super().__init__("LoggerNode")
def init(self):
self.temp_sub = Topic("temperature")
self.fan_sub = Topic("fan_control")
def tick(self):
temp = self.temp_sub.recv()
fan = self.fan_sub.recv()
if temp is not None and fan is not None:
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
status = "COOLING" if fan > 0 else "NORMAL"
print(f"Logger [{timestamp}]: {temp:.1f}°C | Fan {fan}% | {status}")
scheduler = Scheduler()
scheduler.node(LoggerNode()).order(2).rate(1).build()
scheduler.run()
Run All Nodes Concurrently
# Make scripts executable
chmod +x nodes/*.py
# Run all nodes as separate processes
horus run "nodes/*.py"
Output:
Executing 3 files concurrently:
1. nodes/controller.py (python)
2. nodes/logger.py (python)
3. nodes/sensor.py (python)
Phase 1: Building all files...
Phase 2: Starting all processes...
Started [controller]
Started [logger]
Started [sensor]
All processes running. Press Ctrl+C to stop.
[sensor] Sensor: 23.4°C
[controller] Controller: Fan at 34%
[logger] Logger [15:30:45]: 23.4°C | Fan 34% | COOLING
[sensor] Sensor: 26.8°C
[controller] Controller: Fan at 68%
[sensor] Sensor: 21.2°C
[logger] Logger [15:30:46]: 21.2°C | Fan 12% | COOLING
Key Features:
- Independent Processes: Each node runs in its own process
- Shared Memory IPC: Nodes communicate via HORUS topics (
/dev/shm/horus/) - Color-Coded Output: Each node has a unique color
- Graceful Shutdown: Ctrl+C stops all processes cleanly
- Zero Configuration: No launch files needed
4. Rust + Python Cross-Language System
Mix Rust and Python nodes in the same application.
Rust Sensor Node
nodes/rust_sensor.rs:
use horus::prelude::*;
pub struct TempSensor {
temp_pub: Topic<f32>,
counter: f32,
}
impl TempSensor {
fn new() -> Result<Self> {
Ok(Self {
temp_pub: Topic::new("temperature")?,
counter: 0.0,
})
}
}
impl Node for TempSensor {
fn name(&self) -> &str { "RustTempSensor" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Rust sensor online - high performance mode");
Ok(())
}
fn tick(&mut self) {
// Fast sensor simulation
let temp = 20.0 + (self.counter.sin() * 5.0);
self.temp_pub.send(temp);
hlog!(debug, "Rust: {:.2}°C", temp);
self.counter += 0.1;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Rust sensor offline");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(TempSensor::new()?).order(0).build()?;
scheduler.run()
}
Python Controller Node
nodes/py_controller.py:
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
class PyController(Node):
def __init__(self):
super().__init__("PyController")
def init(self):
self.temp_sub = Topic("temperature")
self.cmd_pub = Topic("actuator_cmd")
def tick(self):
temp = self.temp_sub.recv()
if temp is not None:
status = "HOT" if temp > 22.0 else "NORMAL"
print(f"Python controller: {temp:.2f}°C - {status}")
command = 1.0 if temp > 22.0 else 0.0
self.cmd_pub.send(command)
scheduler = Scheduler()
scheduler.node(PyController()).order(0).rate(10).build()
scheduler.run()
Rust Actuator Node
nodes/rust_actuator.rs:
use horus::prelude::*;
struct Actuator {
cmd_sub: Topic<f32>,
}
impl Node for Actuator {
fn name(&self) -> &str { "RustActuator" }
fn tick(&mut self) {
if let Some(cmd) = self.cmd_sub.recv() {
let action = if cmd > 0.5 { "COOLING" } else { "IDLE" };
hlog!(info, "Actuator: {}", action);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Actuator stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(Actuator {
cmd_sub: Topic::new("actuator_cmd")?,
}).order(0).build()?;
scheduler.run()
}
Run Mixed System
horus run "nodes/*"
HORUS automatically detects file types and compiles/runs appropriately!
Key Concepts:
- Rust nodes: High performance, type safety
- Python nodes: Rapid prototyping, easy scripting
- Shared memory IPC works across languages
- Zero-copy message passing
- Sub-microsecond latency even across language boundaries
5. Advanced Python Features
Per-node rates, timestamp checking, and staleness detection.
File: advanced_python.py
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler, Imu
import time
class HighFreqSensor(Node):
"""100Hz IMU sensor"""
def __init__(self):
super().__init__("HighFreqSensor")
def init(self):
self.imu_pub = Topic(Imu)
def tick(self):
imu = Imu(
accel_x=1.0, accel_y=0.0, accel_z=9.8,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.0
)
self.imu_pub.send(imu)
class Controller(Node):
"""50Hz controller"""
def __init__(self):
super().__init__("Controller")
def init(self):
self.imu_sub = Topic(Imu)
self.cmd_pub = Topic("cmd_vel")
def tick(self):
imu = self.imu_sub.recv()
if imu is not None:
# Process IMU data
accel_magnitude = (
imu.accel_x**2 +
imu.accel_y**2 +
imu.accel_z**2
) ** 0.5
print(f"Control: IMU magnitude = {accel_magnitude:.2f} m/s²")
# Send command
cmd = {"linear": 1.0, "angular": 0.5}
self.cmd_pub.send(cmd)
class Logger(Node):
"""10Hz logger"""
def __init__(self):
super().__init__("Logger")
def init(self):
self.cmd_sub = Topic("cmd_vel")
def tick(self):
msg = self.cmd_sub.recv()
if msg is not None:
print(f"Logger: Received command: {msg}")
# Create scheduler with per-node rates
scheduler = Scheduler()
scheduler.node(HighFreqSensor()).order(0).rate(100).build()
scheduler.node(Controller()).order(1).rate(50).build()
scheduler.node(Logger()).order(2).rate(10).build()
scheduler.run()
Run it:
horus run advanced_python.py
Key Concepts:
- Per-node rates: Each node runs at different frequency via
.rate(Hz) - Typed messages: Use
Topic(Imu)for cross-language compatible messages - Generic messages: Use
Topic("name")for Python-only dict/list data - Priorities: Lower order number = higher priority, runs first each tick
When to Use Multi-Process vs Single-Process
Multi-Process (Concurrent Execution)
Use when:
- Nodes need to run independently (like ROS)
- Want fault isolation (one crash doesn't kill others)
- Different nodes have vastly different rates
- Testing distributed system architectures
- Mixing languages (Rust + Python)
Command:
horus run "nodes/*.rs"
horus run "nodes/*.py"
horus run "nodes/*" # Mix Rust and Python!
Single-Process
Use when:
- All nodes in one file
- Need maximum performance
- Require deterministic execution order
- Simpler deployment
- Embedded systems with limited resources
Command:
horus run main.rs
Performance Notes
Multi-Process IPC Performance
- Latency: 300ns - 1μs (shared memory)
- Throughput: Thousands of messages/second
- Scalability: Tested with 50+ concurrent processes
- Memory: ~2-5MB overhead per process
Single-Process Performance
- Latency: 50-200ns (in-memory)
- Throughput: Millions of messages/second
- Scalability: Hundreds of nodes in one process
- Memory: Minimal overhead
Testing Multi-Node Systems
Create test script: test_system.py
#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
class MockSensor(Node):
def __init__(self):
super().__init__("MockSensor")
def init(self):
self.temp_pub = Topic("temperature")
def tick(self):
self.temp_pub.send(30.0) # Hot!
class TestController(Node):
def __init__(self):
super().__init__("TestController")
self.fan_commands = []
def init(self):
self.temp_sub = Topic("temperature")
self.fan_pub = Topic("fan_control")
def tick(self):
temp = self.temp_sub.recv()
if temp is not None and temp > 25.0:
fan_speed = min(100, int((temp - 20) * 10))
self.fan_commands.append(fan_speed)
self.fan_pub.send(fan_speed)
# Run test
scheduler = Scheduler()
sensor = MockSensor()
controller = TestController()
scheduler.node(sensor).order(0).build()
scheduler.node(controller).order(1).build()
# Use tick_once for testing (runs one tick cycle)
scheduler.tick_once()
# Verify
assert len(controller.fan_commands) > 0, "Controller should have sent fan commands"
assert controller.fan_commands[0] == 100, f"Expected fan at 100%, got {controller.fan_commands[0]}%"
print("Test passed!")
Next Steps
- Performance Optimization - Tune for maximum throughput
- Python Bindings Reference - Complete Python API
- Package Management - Share and reuse nodes
Need help?
- Troubleshooting - Debug common issues
- Monitor - Monitor your system in real-time