Advanced Examples

Advanced HORUS patterns for complex robotics systems. These examples demonstrate priority-based safety systems, multi-process architectures, and cross-language communication.

Prerequisites: Complete Basic Examples first.


1. State Machine Node

Implement complex behavior using state machines - ideal for autonomous robots with multiple operating modes.

File: state_machine.rs

use horus::prelude::*;

#[derive(Debug, Clone, Copy, PartialEq)]
enum RobotState {
    Idle,
    Moving,
    ObstacleDetected,
    Rotating,
    Escaped,
}

struct StateMachineNode {
    state: RobotState,
    obstacle_sub: Topic<bool>,
    cmd_pub: Topic<CmdVel>,
    rotation_counter: u32,
}

impl StateMachineNode {
    fn new() -> Result<Self> {
        Ok(Self {
            state: RobotState::Idle,
            obstacle_sub: Topic::new("obstacle_detected")?,
            cmd_pub: Topic::new("cmd_vel")?,
            rotation_counter: 0,
        })
    }
}

impl Node for StateMachineNode {
    fn name(&self) -> &str { "StateMachineNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "State machine initialized - starting in IDLE state");
        Ok(())
    }

    fn tick(&mut self) {
        // Check for obstacles
        let obstacle = self.obstacle_sub.recv().unwrap_or(false);

        // Store previous state for logging
        let prev_state = self.state;

        // State machine logic
        self.state = match self.state {
            RobotState::Idle => {
                if !obstacle {
                    RobotState::Moving
                } else {
                    RobotState::Idle
                }
            }

            RobotState::Moving => {
                if obstacle {
                    self.cmd_pub.send(CmdVel::zero());  // Stop
                    RobotState::ObstacleDetected
                } else {
                    self.cmd_pub.send(CmdVel::new(1.0, 0.0));  // Forward
                    RobotState::Moving
                }
            }

            RobotState::ObstacleDetected => {
                self.rotation_counter = 0;
                RobotState::Rotating
            }

            RobotState::Rotating => {
                self.cmd_pub.send(CmdVel::new(0.0, 0.5));  // Rotate
                self.rotation_counter += 1;

                if self.rotation_counter > 50 {
                    RobotState::Escaped
                } else {
                    RobotState::Rotating
                }
            }

            RobotState::Escaped => {
                RobotState::Moving  // Resume moving
            }
        };

        // Log state transitions
        if self.state != prev_state {
            hlog!(info, "State transition: {:?} -> {:?}",
                prev_state, self.state);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Ensure robot is stopped
        self.cmd_pub.send(CmdVel::zero());
        hlog!(info, "State machine shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(StateMachineNode::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Run it:

horus run state_machine.rs

Key Concepts:

  • Enum for states: Idle, Moving, ObstacleDetected, Rotating, Escaped
  • Match expression handles state transitions
  • Each state defines behavior and next state
  • Log state transitions for debugging

2. Priority-Based Safety System

Use node priorities to ensure safety-critical tasks always run first - essential for production robotics.

File: safety_system.rs

use horus::prelude::*;

// CRITICAL PRIORITY: Emergency stop
struct EmergencyStopNode {
    battery_sub: Topic<BatteryState>,
    lidar_sub: Topic<f32>,  // Min obstacle distance
    estop_pub: Topic<bool>,
    estop_active: bool,
}

impl EmergencyStopNode {
    fn new() -> Result<Self> {
        Ok(Self {
            battery_sub: Topic::new("battery_state")?,
            lidar_sub: Topic::new("min_distance")?,
            estop_pub: Topic::new("emergency_stop")?,
            estop_active: false,
        })
    }
}

impl Node for EmergencyStopNode {
    fn name(&self) -> &str { "EmergencyStop" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, " Emergency stop system online - CRITICAL priority");
        Ok(())
    }

    fn tick(&mut self) {
        let mut should_stop = false;

        // Check battery
        if let Some(battery) = self.battery_sub.recv() {
            if battery.is_critical() {  // Below 10%
                should_stop = true;
                hlog!(error, " CRITICAL: Battery at {:.0}% - EMERGENCY STOP!",
                    battery.percentage);
            }
        }

        // Check obstacle distance
        if let Some(min_dist) = self.lidar_sub.recv() {
            if min_dist < 0.2 {  // 20cm
                should_stop = true;
                hlog!(error, " CRITICAL: Obstacle at {:.2}m - EMERGENCY STOP!",
                    min_dist);
            }
        }

        // Publish estop state
        if should_stop != self.estop_active {
            self.estop_pub.send(should_stop);
            self.estop_active = should_stop;
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Always activate estop on shutdown
        self.estop_pub.send(true);
        hlog!(warn, "Emergency stop system offline");
        Ok(())
    }
}

// HIGH PRIORITY: Motor controller
struct MotorController {
    estop_sub: Topic<bool>,
    cmd_sub: Topic<CmdVel>,
    motor_pub: Topic<CmdVel>,
    estop_active: bool,
}

impl MotorController {
    fn new() -> Result<Self> {
        Ok(Self {
            estop_sub: Topic::new("emergency_stop")?,
            cmd_sub: Topic::new("cmd_vel_request")?,
            motor_pub: Topic::new("cmd_vel_actual")?,
            estop_active: false,
        })
    }
}

impl Node for MotorController {
    fn name(&self) -> &str { "MotorController" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Motor controller online - HIGH priority");
        Ok(())
    }

    fn tick(&mut self) {
        // Check emergency stop FIRST
        if let Some(estop) = self.estop_sub.recv() {
            if estop != self.estop_active {
                self.estop_active = estop;
                if estop {
                    hlog!(warn, "Motors DISABLED - emergency stop active");
                } else {
                    hlog!(info, "Motors ENABLED - emergency stop cleared");
                }
            }
        }

        // Don't move if estop active
        if self.estop_active {
            self.motor_pub.send(CmdVel::zero());
            return;
        }

        // Process normal commands
        if let Some(cmd) = self.cmd_sub.recv() {
            self.motor_pub.send(cmd);
            hlog!(debug, "Motors: linear={:.2}, angular={:.2}",
                cmd.linear, cmd.angular);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Stop motors
        self.motor_pub.send(CmdVel::zero());
        hlog!(info, "Motor controller stopped");
        Ok(())
    }
}

// BACKGROUND PRIORITY: Data logging
struct LoggerNode {
    cmd_sub: Topic<CmdVel>,
    battery_sub: Topic<BatteryState>,
}

impl LoggerNode {
    fn new() -> Result<Self> {
        Ok(Self {
            cmd_sub: Topic::new("cmd_vel_actual")?,
            battery_sub: Topic::new("battery_state")?,
        })
    }
}

impl Node for LoggerNode {
    fn name(&self) -> &str { "Logger" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, " Logger online - BACKGROUND priority");
        Ok(())
    }

    fn tick(&mut self) {
        // Log velocity commands
        if let Some(cmd) = self.cmd_sub.recv() {
            hlog!(debug, "LOG: cmd_vel({:.2}, {:.2})",
                cmd.linear, cmd.angular);
        }

        // Log battery state
        if let Some(battery) = self.battery_sub.recv() {
            hlog!(debug, "LOG: battery({:.1}V, {:.0}%)",
                battery.voltage, battery.percentage);
        }

        // In production: write to file, database, etc.
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Logger stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Order 0 (Critical): Safety runs FIRST
    scheduler.add(EmergencyStopNode::new()?).order(0).build()?;

    // Order 1 (High): Control runs SECOND
    scheduler.add(MotorController::new()?).order(1).build()?;

    // Order 4 (Background): Logging runs LAST
    scheduler.add(LoggerNode::new()?).order(4).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run safety_system.rs

Key Concepts:

  • Priority 0 (Critical): Emergency stop - runs first, always
  • Priority 1 (High): Motor control - runs after safety checks
  • Priority 4 (Background): Logging - runs last, non-critical
  • Lower number = higher priority
  • Safety systems should always check estop before acting

3. Python Multi-Process System

Build a complete sensor monitoring system with Python nodes running as independent processes.

Project Structure

mkdir multi_node_system
cd multi_node_system
mkdir nodes

Sensor Node

nodes/sensor.py:

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
import random

class SensorNode(Node):
    def __init__(self):
        super().__init__("SensorNode")

    def init(self):
        self.temp_pub = Topic("temperature")

    def tick(self):
        # Generate realistic temperature with noise
        temperature = 20.0 + random.random() * 10.0
        self.temp_pub.send(temperature)
        print(f"Sensor: {temperature:.1f}°C")

scheduler = Scheduler()
scheduler.node(SensorNode()).order(0).rate(2).build()
scheduler.run()

Controller Node

nodes/controller.py:

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler

class ControllerNode(Node):
    def __init__(self):
        super().__init__("ControllerNode")

    def init(self):
        self.temp_sub = Topic("temperature")
        self.fan_pub = Topic("fan_control")

    def tick(self):
        temp = self.temp_sub.recv()
        if temp is not None:
            if temp > 25.0:
                fan_speed = min(100, int((temp - 20) * 10))
                self.fan_pub.send(fan_speed)
                print(f"Controller: Fan at {fan_speed}%")
            else:
                self.fan_pub.send(0)
                print(f"Controller: Temperature normal, fan off")

scheduler = Scheduler()
scheduler.node(ControllerNode()).order(1).rate(2).build()
scheduler.run()

Logger Node

nodes/logger.py:

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler
import datetime

class LoggerNode(Node):
    def __init__(self):
        super().__init__("LoggerNode")

    def init(self):
        self.temp_sub = Topic("temperature")
        self.fan_sub = Topic("fan_control")

    def tick(self):
        temp = self.temp_sub.recv()
        fan = self.fan_sub.recv()
        if temp is not None and fan is not None:
            timestamp = datetime.datetime.now().strftime("%H:%M:%S")
            status = "COOLING" if fan > 0 else "NORMAL"
            print(f"Logger [{timestamp}]: {temp:.1f}°C | Fan {fan}% | {status}")

scheduler = Scheduler()
scheduler.node(LoggerNode()).order(2).rate(1).build()
scheduler.run()

Run All Nodes Concurrently

# Make scripts executable
chmod +x nodes/*.py

# Run all nodes as separate processes
horus run "nodes/*.py"

Output:

Executing 3 files concurrently:
  1. nodes/controller.py (python)
  2. nodes/logger.py (python)
  3. nodes/sensor.py (python)

Phase 1: Building all files...
Phase 2: Starting all processes...
  Started [controller]
  Started [logger]
  Started [sensor]

All processes running. Press Ctrl+C to stop.

[sensor] Sensor: 23.4°C
[controller] Controller: Fan at 34%
[logger] Logger [15:30:45]: 23.4°C | Fan 34% | COOLING
[sensor] Sensor: 26.8°C
[controller] Controller: Fan at 68%
[sensor] Sensor: 21.2°C
[logger] Logger [15:30:46]: 21.2°C | Fan 12% | COOLING

Key Features:

  • Independent Processes: Each node runs in its own process
  • Shared Memory IPC: Nodes communicate via HORUS topics (/dev/shm/horus/)
  • Color-Coded Output: Each node has a unique color
  • Graceful Shutdown: Ctrl+C stops all processes cleanly
  • Zero Configuration: No launch files needed

4. Rust + Python Cross-Language System

Mix Rust and Python nodes in the same application.

Rust Sensor Node

nodes/rust_sensor.rs:

use horus::prelude::*;

pub struct TempSensor {
    temp_pub: Topic<f32>,
    counter: f32,
}

impl TempSensor {
    fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("temperature")?,
            counter: 0.0,
        })
    }
}

impl Node for TempSensor {
    fn name(&self) -> &str { "RustTempSensor" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Rust sensor online - high performance mode");
        Ok(())
    }

    fn tick(&mut self) {
        // Fast sensor simulation
        let temp = 20.0 + (self.counter.sin() * 5.0);
        self.temp_pub.send(temp);

        hlog!(debug, "Rust: {:.2}°C", temp);

        self.counter += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Rust sensor offline");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(TempSensor::new()?).order(0).build()?;
    scheduler.run()
}

Python Controller Node

nodes/py_controller.py:

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler

class PyController(Node):
    def __init__(self):
        super().__init__("PyController")

    def init(self):
        self.temp_sub = Topic("temperature")
        self.cmd_pub = Topic("actuator_cmd")

    def tick(self):
        temp = self.temp_sub.recv()
        if temp is not None:
            status = "HOT" if temp > 22.0 else "NORMAL"
            print(f"Python controller: {temp:.2f}°C - {status}")

            command = 1.0 if temp > 22.0 else 0.0
            self.cmd_pub.send(command)

scheduler = Scheduler()
scheduler.node(PyController()).order(0).rate(10).build()
scheduler.run()

Rust Actuator Node

nodes/rust_actuator.rs:

use horus::prelude::*;

struct Actuator {
    cmd_sub: Topic<f32>,
}

impl Node for Actuator {
    fn name(&self) -> &str { "RustActuator" }

    fn tick(&mut self) {
        if let Some(cmd) = self.cmd_sub.recv() {
            let action = if cmd > 0.5 { "COOLING" } else { "IDLE" };
            hlog!(info, "Actuator: {}", action);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Actuator stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(Actuator {
        cmd_sub: Topic::new("actuator_cmd")?,
    }).order(0).build()?;
    scheduler.run()
}

Run Mixed System

horus run "nodes/*"

HORUS automatically detects file types and compiles/runs appropriately!

Key Concepts:

  • Rust nodes: High performance, type safety
  • Python nodes: Rapid prototyping, easy scripting
  • Shared memory IPC works across languages
  • Zero-copy message passing
  • Sub-microsecond latency even across language boundaries

5. Advanced Python Features

Per-node rates, timestamp checking, and staleness detection.

File: advanced_python.py

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler, Imu
import time

class HighFreqSensor(Node):
    """100Hz IMU sensor"""
    def __init__(self):
        super().__init__("HighFreqSensor")

    def init(self):
        self.imu_pub = Topic(Imu)

    def tick(self):
        imu = Imu(
            accel_x=1.0, accel_y=0.0, accel_z=9.8,
            gyro_x=0.0, gyro_y=0.0, gyro_z=0.0
        )
        self.imu_pub.send(imu)

class Controller(Node):
    """50Hz controller"""
    def __init__(self):
        super().__init__("Controller")

    def init(self):
        self.imu_sub = Topic(Imu)
        self.cmd_pub = Topic("cmd_vel")

    def tick(self):
        imu = self.imu_sub.recv()
        if imu is not None:
            # Process IMU data
            accel_magnitude = (
                imu.accel_x**2 +
                imu.accel_y**2 +
                imu.accel_z**2
            ) ** 0.5

            print(f"Control: IMU magnitude = {accel_magnitude:.2f} m/s²")

            # Send command
            cmd = {"linear": 1.0, "angular": 0.5}
            self.cmd_pub.send(cmd)

class Logger(Node):
    """10Hz logger"""
    def __init__(self):
        super().__init__("Logger")

    def init(self):
        self.cmd_sub = Topic("cmd_vel")

    def tick(self):
        msg = self.cmd_sub.recv()
        if msg is not None:
            print(f"Logger: Received command: {msg}")

# Create scheduler with per-node rates
scheduler = Scheduler()
scheduler.node(HighFreqSensor()).order(0).rate(100).build()
scheduler.node(Controller()).order(1).rate(50).build()
scheduler.node(Logger()).order(2).rate(10).build()
scheduler.run()

Run it:

horus run advanced_python.py

Key Concepts:

  • Per-node rates: Each node runs at different frequency via .rate(Hz)
  • Typed messages: Use Topic(Imu) for cross-language compatible messages
  • Generic messages: Use Topic("name") for Python-only dict/list data
  • Priorities: Lower order number = higher priority, runs first each tick

When to Use Multi-Process vs Single-Process

Multi-Process (Concurrent Execution)

Use when:

  • Nodes need to run independently (like ROS)
  • Want fault isolation (one crash doesn't kill others)
  • Different nodes have vastly different rates
  • Testing distributed system architectures
  • Mixing languages (Rust + Python)

Command:

horus run "nodes/*.rs"
horus run "nodes/*.py"
horus run "nodes/*"  # Mix Rust and Python!

Single-Process

Use when:

  • All nodes in one file
  • Need maximum performance
  • Require deterministic execution order
  • Simpler deployment
  • Embedded systems with limited resources

Command:

horus run main.rs

Performance Notes

Multi-Process IPC Performance

  • Latency: 300ns - 1μs (shared memory)
  • Throughput: Thousands of messages/second
  • Scalability: Tested with 50+ concurrent processes
  • Memory: ~2-5MB overhead per process

Single-Process Performance

  • Latency: 50-200ns (in-memory)
  • Throughput: Millions of messages/second
  • Scalability: Hundreds of nodes in one process
  • Memory: Minimal overhead

Testing Multi-Node Systems

Create test script: test_system.py

#!/usr/bin/env python3
from horus import Node, Topic, Scheduler

class MockSensor(Node):
    def __init__(self):
        super().__init__("MockSensor")

    def init(self):
        self.temp_pub = Topic("temperature")

    def tick(self):
        self.temp_pub.send(30.0)  # Hot!

class TestController(Node):
    def __init__(self):
        super().__init__("TestController")
        self.fan_commands = []

    def init(self):
        self.temp_sub = Topic("temperature")
        self.fan_pub = Topic("fan_control")

    def tick(self):
        temp = self.temp_sub.recv()
        if temp is not None and temp > 25.0:
            fan_speed = min(100, int((temp - 20) * 10))
            self.fan_commands.append(fan_speed)
            self.fan_pub.send(fan_speed)

# Run test
scheduler = Scheduler()
sensor = MockSensor()
controller = TestController()
scheduler.node(sensor).order(0).build()
scheduler.node(controller).order(1).build()

# Use tick_once for testing (runs one tick cycle)
scheduler.tick_once()

# Verify
assert len(controller.fan_commands) > 0, "Controller should have sent fan commands"
assert controller.fan_commands[0] == 100, f"Expected fan at 100%, got {controller.fan_commands[0]}%"
print("Test passed!")

Next Steps

Need help?