Tutorial 5: Hardware & Real-Time (Python)

Looking for the Rust versions? See Tutorial 5: Hardware Drivers and Tutorial: Real-Time Control.

Learn to load hardware drivers from horus.toml and configure real-time scheduling with budgets, deadlines, and miss policies.

What You'll Learn

  • Loading hardware drivers with horus.drivers.load()
  • Accessing driver configuration parameters
  • Setting budgets and deadlines for timing guarantees
  • Handling deadline misses with on_miss
  • Using compute=True for CPU-heavy nodes
  • Production scheduler configuration

Part 1: Hardware Drivers

Configure Drivers in horus.toml

# horus.toml
[package]
name = "my-robot"
version = "0.1.0"

[drivers]
arm = { type = "dynamixel", port = "/dev/ttyUSB0", baudrate = 1000000, servo_ids = [1, 2, 3] }
lidar = { type = "rplidar", port = "/dev/ttyUSB1", scan_mode = "standard" }
imu = { type = "i2c", bus = 1, address = "0x68" }

Load and Use Drivers

import horus

# Load drivers from horus.toml
hw = horus.drivers.load()
# Returns a HardwareSet with typed accessors (.dynamixel(), .rplidar(), .i2c(),
# .serial(), etc.) and introspection (.list(), .has(), .params()).
# See /rust/api/drivers for the full accessor list.

# Check what's available
print(f"Drivers: {hw.list()}")  # ["arm", "lidar", "imu"]

if hw.has("arm"):
    arm = hw.dynamixel("arm")
    port = arm.get_or("port", "/dev/ttyUSB0")
    servo_ids = arm.get_or("servo_ids", [1, 2, 3])
    print(f"Arm on {port}, servos: {servo_ids}")

if hw.has("lidar"):
    lidar = hw.rplidar("lidar")
    scan_mode = lidar.get_or("scan_mode", "standard")
    print(f"LiDAR in {scan_mode} mode")

def arm_tick(node):
    cmd = node.recv("arm.command")
    if cmd is not None:
        # Send to hardware via driver params
        node.send("arm.state", {"position": [0.0, 0.0, 0.0]})

arm_node = horus.Node(name="ArmController", tick=arm_tick, rate=100, order=0,
                      subs=["arm.command"], pubs=["arm.state"])
horus.run(arm_node)

Handling Missing Hardware

If horus.toml has no [drivers] section or hardware is unavailable, drivers.load() raises an exception. Wrap it for graceful fallback:

try:
    hw = horus.drivers.load()
except Exception as e:
    print(f"No hardware config: {e}")
    print("Running in simulation mode")
    hw = None

This lets the same codebase run on both real hardware and in simulation without changes.

Part 2: Real-Time Scheduling

Budget and Deadline

Set timing constraints to detect overruns:

import horus

us = horus.us  # 1e-6 (microseconds → seconds)
ms = horus.ms  # 1e-3 (milliseconds → seconds)
# horus.us = 1e-6 (microseconds), horus.ms = 1e-3 (milliseconds).
# These are plain float constants for readable duration math:
# budget=300 * horus.us means 300 microseconds.

# Motor controller: 1kHz with 800μs budget, 950μs deadline
motor = horus.Node(
    name="MotorCtrl",
    tick=motor_tick,
    rate=1000,
    order=0,
    budget=800 * us,       # must finish tick within 800μs
    deadline=950 * us,     # hard deadline at 950μs
    on_miss="safe_mode",   # enter safe state if deadline missed
    core=0,                # pin to CPU core 0
    # Pins this node's thread to CPU core 0. Requires CAP_SYS_NICE or root
    # on Linux. If the core doesn't exist or permissions are insufficient,
    # HORUS logs a warning and continues with default scheduling.
)

# LiDAR driver: 100Hz, skip missed ticks
lidar = horus.Node(
    name="LidarDriver",
    tick=lidar_tick,
    rate=100,
    order=1,
    on_miss="skip",        # skip tick on deadline miss
)

# Path planner: CPU-heavy, runs on thread pool
planner = horus.Node(
    name="PathPlanner",
    tick=planner_tick,
    rate=10,
    order=5,
    compute=True,          # runs on worker thread pool, not main loop
    on_miss="warn",        # just log a warning
)

# Production scheduler with watchdog and RT
horus.run(
    motor, lidar, planner,
    tick_rate=1000,
    rt=True,               # request SCHED_FIFO (graceful fallback)
    watchdog_ms=500,        # detect frozen nodes
    blackbox_mb=64,         # flight recorder for debugging
)

Deadline Miss Policies

PolicyBehaviorUse For
"warn"Log warning, continue normallyNon-critical nodes (logging, telemetry)
"skip"Skip this tick, resume next cycleSensor drivers that can miss a frame
"safe_mode"Call enter_safe_state() equivalent, continue tickingMotor controllers, actuators
"stop"Stop the entire schedulerSafety-critical nodes

Adaptive Quality with Budget

Use horus.budget_remaining() to do more work when time permits:

def planner_tick(node):
    # Always compute basic path
    path = compute_basic_path()

    # Only optimize if budget allows
    if horus.budget_remaining() > 2 * ms:
        path = optimize_path(path)

    # Only smooth if still have time
    if horus.budget_remaining() > 1 * ms:
        path = smooth_path(path)

    node.send("path", path)

Complete Production Example

import horus

us, ms = horus.us, horus.ms

def safety_tick(node):
    """Safety monitor — runs first every tick."""
    if node.has_msg("emergency_stop"):
        estop = node.recv("emergency_stop")
        if estop.get("engaged"):
            node.log_error("EMERGENCY STOP ENGAGED")
            node.request_stop()

def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd is not None:
        # Apply velocity command to motor
        node.send("motor.state", {"velocity": cmd.get("linear", 0.0)})

def motor_shutdown(node):
    node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
    node.log_info("Motors stopped safely")

safety = horus.Node(name="Safety", tick=safety_tick, rate=100, order=0,
                    subs=["emergency_stop"])
motor = horus.Node(name="Motor", tick=motor_tick, shutdown=motor_shutdown,
                   rate=100, order=1, budget=500 * us, on_miss="safe_mode",
                   subs=["cmd_vel"], pubs=["motor.state"])

horus.run(safety, motor, tick_rate=100, rt=True, watchdog_ms=500)

Next Steps


See Also