Tutorial 5: Hardware & Real-Time (Python)
Looking for the Rust versions? See Tutorial 5: Hardware Drivers and Tutorial: Real-Time Control.
Learn to load hardware drivers from horus.toml and configure real-time scheduling with budgets, deadlines, and miss policies.
What You'll Learn
- Loading hardware drivers with
horus.drivers.load() - Accessing driver configuration parameters
- Setting budgets and deadlines for timing guarantees
- Handling deadline misses with
on_miss - Using
compute=Truefor CPU-heavy nodes - Production scheduler configuration
Part 1: Hardware Drivers
Configure Drivers in horus.toml
# horus.toml
[package]
name = "my-robot"
version = "0.1.0"
[drivers]
arm = { type = "dynamixel", port = "/dev/ttyUSB0", baudrate = 1000000, servo_ids = [1, 2, 3] }
lidar = { type = "rplidar", port = "/dev/ttyUSB1", scan_mode = "standard" }
imu = { type = "i2c", bus = 1, address = "0x68" }
Load and Use Drivers
import horus
# Load drivers from horus.toml
hw = horus.drivers.load()
# Returns a HardwareSet with typed accessors (.dynamixel(), .rplidar(), .i2c(),
# .serial(), etc.) and introspection (.list(), .has(), .params()).
# See /rust/api/drivers for the full accessor list.
# Check what's available
print(f"Drivers: {hw.list()}") # ["arm", "lidar", "imu"]
if hw.has("arm"):
arm = hw.dynamixel("arm")
port = arm.get_or("port", "/dev/ttyUSB0")
servo_ids = arm.get_or("servo_ids", [1, 2, 3])
print(f"Arm on {port}, servos: {servo_ids}")
if hw.has("lidar"):
lidar = hw.rplidar("lidar")
scan_mode = lidar.get_or("scan_mode", "standard")
print(f"LiDAR in {scan_mode} mode")
def arm_tick(node):
cmd = node.recv("arm.command")
if cmd is not None:
# Send to hardware via driver params
node.send("arm.state", {"position": [0.0, 0.0, 0.0]})
arm_node = horus.Node(name="ArmController", tick=arm_tick, rate=100, order=0,
subs=["arm.command"], pubs=["arm.state"])
horus.run(arm_node)
Handling Missing Hardware
If horus.toml has no [drivers] section or hardware is unavailable, drivers.load() raises an exception. Wrap it for graceful fallback:
try:
hw = horus.drivers.load()
except Exception as e:
print(f"No hardware config: {e}")
print("Running in simulation mode")
hw = None
This lets the same codebase run on both real hardware and in simulation without changes.
Part 2: Real-Time Scheduling
Budget and Deadline
Set timing constraints to detect overruns:
import horus
us = horus.us # 1e-6 (microseconds → seconds)
ms = horus.ms # 1e-3 (milliseconds → seconds)
# horus.us = 1e-6 (microseconds), horus.ms = 1e-3 (milliseconds).
# These are plain float constants for readable duration math:
# budget=300 * horus.us means 300 microseconds.
# Motor controller: 1kHz with 800μs budget, 950μs deadline
motor = horus.Node(
name="MotorCtrl",
tick=motor_tick,
rate=1000,
order=0,
budget=800 * us, # must finish tick within 800μs
deadline=950 * us, # hard deadline at 950μs
on_miss="safe_mode", # enter safe state if deadline missed
core=0, # pin to CPU core 0
# Pins this node's thread to CPU core 0. Requires CAP_SYS_NICE or root
# on Linux. If the core doesn't exist or permissions are insufficient,
# HORUS logs a warning and continues with default scheduling.
)
# LiDAR driver: 100Hz, skip missed ticks
lidar = horus.Node(
name="LidarDriver",
tick=lidar_tick,
rate=100,
order=1,
on_miss="skip", # skip tick on deadline miss
)
# Path planner: CPU-heavy, runs on thread pool
planner = horus.Node(
name="PathPlanner",
tick=planner_tick,
rate=10,
order=5,
compute=True, # runs on worker thread pool, not main loop
on_miss="warn", # just log a warning
)
# Production scheduler with watchdog and RT
horus.run(
motor, lidar, planner,
tick_rate=1000,
rt=True, # request SCHED_FIFO (graceful fallback)
watchdog_ms=500, # detect frozen nodes
blackbox_mb=64, # flight recorder for debugging
)
Deadline Miss Policies
| Policy | Behavior | Use For |
|---|---|---|
"warn" | Log warning, continue normally | Non-critical nodes (logging, telemetry) |
"skip" | Skip this tick, resume next cycle | Sensor drivers that can miss a frame |
"safe_mode" | Call enter_safe_state() equivalent, continue ticking | Motor controllers, actuators |
"stop" | Stop the entire scheduler | Safety-critical nodes |
Adaptive Quality with Budget
Use horus.budget_remaining() to do more work when time permits:
def planner_tick(node):
# Always compute basic path
path = compute_basic_path()
# Only optimize if budget allows
if horus.budget_remaining() > 2 * ms:
path = optimize_path(path)
# Only smooth if still have time
if horus.budget_remaining() > 1 * ms:
path = smooth_path(path)
node.send("path", path)
Complete Production Example
import horus
us, ms = horus.us, horus.ms
def safety_tick(node):
"""Safety monitor — runs first every tick."""
if node.has_msg("emergency_stop"):
estop = node.recv("emergency_stop")
if estop.get("engaged"):
node.log_error("EMERGENCY STOP ENGAGED")
node.request_stop()
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is not None:
# Apply velocity command to motor
node.send("motor.state", {"velocity": cmd.get("linear", 0.0)})
def motor_shutdown(node):
node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
node.log_info("Motors stopped safely")
safety = horus.Node(name="Safety", tick=safety_tick, rate=100, order=0,
subs=["emergency_stop"])
motor = horus.Node(name="Motor", tick=motor_tick, shutdown=motor_shutdown,
rate=100, order=1, budget=500 * us, on_miss="safe_mode",
subs=["cmd_vel"], pubs=["motor.state"])
horus.run(safety, motor, tick_rate=100, rt=True, watchdog_ms=500)
Next Steps
- Real-Time Concepts — what real-time means for robotics
- Safety Monitor — graduated degradation and watchdog
- Choosing a Language — when to use Python vs Rust for RT
- Python API Reference — full scheduling kwargs
See Also
- Hardware Drivers (Rust) — Rust version
- Python Bindings — Python API reference