Tutorial: Real-Time Control (Python)
Prerequisites
- Quick Start (Python) completed
- Basic familiarity with
horus.Node()andhorus.run()
What You'll Build
A robot controller with four nodes running at different rates and execution classes:
- IMU Sensor at 100 Hz --- real-time, reads accelerometer and gyroscope data
- PID Controller at 50 Hz --- real-time, computes velocity commands from sensor feedback
- Path Planner at 10 Hz --- compute class, runs on a thread pool
- Safety Monitor at 100 Hz --- real-time, highest priority, stops the system on dangerous commands
Time estimate: ~20 minutes
Step 1: Four Nodes, No Real-Time
Start with four basic nodes. All use the default configuration --- no budgets, no deadlines, no safety policies yet.
import horus
import math
# ---- IMU Sensor (100 Hz) ----
def make_imu():
t = [0.0]
def tick(node):
t[0] += horus.dt()
imu = horus.Imu(
accel_x=0.1 * math.sin(t[0] * 2.0),
accel_y=0.05 * math.cos(t[0] * 3.0),
accel_z=9.81,
gyro_x=0.01,
gyro_y=0.0,
gyro_z=0.05,
)
node.send("imu.data", imu)
return horus.Node(
name="imu_sensor",
tick=tick,
rate=100,
order=0,
pubs=[horus.Imu],
)
# ---- PID Controller (50 Hz) ----
def make_controller():
integral = [0.0]
target_speed = 0.5
def tick(node):
imu = node.recv("imu.data")
if imu is None:
return
error = target_speed - imu.accel_x
integral[0] += error * horus.dt()
command = 2.0 * error + 0.1 * integral[0]
cmd = horus.CmdVel(linear=command, angular=0.0)
node.send("cmd_vel", cmd)
return horus.Node(
name="pid_controller",
tick=tick,
rate=50,
order=10,
subs=[horus.Imu],
pubs=[horus.CmdVel],
)
# ---- Path Planner (10 Hz) ----
def make_planner():
waypoints = [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0)]
idx = [0]
def tick(node):
odom = node.recv("odom")
if odom is None:
return
# Pick the next waypoint
wx, wy = waypoints[idx[0] % len(waypoints)]
dx = wx - odom.x
dy = wy - odom.y
dist = math.sqrt(dx * dx + dy * dy)
if dist < 0.2:
idx[0] += 1
heading = math.atan2(dy, dx)
node.send("plan.target", {
"heading": heading,
"distance": dist,
"waypoint": idx[0] % len(waypoints),
})
return horus.Node(
name="path_planner",
tick=tick,
rate=10,
order=50,
subs=["odom"],
pubs=["plan.target"],
)
# ---- Safety Monitor (100 Hz) ----
def make_safety():
def tick(node):
cmd = node.recv("cmd_vel")
if cmd is None:
return
if abs(cmd.linear) > 2.0:
node.log_warning(f"Unsafe velocity: {cmd.linear:.2f} m/s")
node.request_stop()
if abs(cmd.angular) > 1.5:
node.log_warning(f"Unsafe angular velocity: {cmd.angular:.2f} rad/s")
node.request_stop()
return horus.Node(
name="safety_monitor",
tick=tick,
rate=100,
order=1,
subs=[horus.CmdVel],
)
# ---- Run ----
horus.run(
make_imu(),
make_controller(),
make_planner(),
make_safety(),
tick_rate=100,
)
Run this with horus run. All four nodes tick on the scheduler with no timing enforcement. The IMU publishes sensor data, the PID controller computes velocity commands, the planner picks waypoints, and the safety monitor watches for dangerous velocities. It works, but nothing prevents a slow tick from cascading delays across the system.
Step 2: Add Rates and Auto-Derived Timing
Setting rate= on a node does three things automatically:
- Sets the tick frequency
- Derives a default budget (80% of the period)
- Derives a default deadline (95% of the period)
The node is also promoted to the Rt execution class --- it gets a dedicated thread with timing enforcement.
Our nodes already have rate= set, so they are already RT nodes with auto-derived budgets and deadlines:
| Node | Rate | Period | Auto Budget (80%) | Auto Deadline (95%) |
|---|---|---|---|---|
| IMU Sensor | 100 Hz | 10 ms | 8 ms | 9.5 ms |
| PID Controller | 50 Hz | 20 ms | 16 ms | 19 ms |
| Path Planner | 10 Hz | 100 ms | 80 ms | 95 ms |
| Safety Monitor | 100 Hz | 10 ms | 8 ms | 9.5 ms |
These defaults are reasonable starting points. You do not need to specify budgets and deadlines manually unless profiling shows the auto-derived values are too loose or too tight.
Step 3: Add Deadline Miss Policies
If the PID controller misses a deadline, the motors receive stale commands and the robot drifts. If the safety monitor misses a deadline, dangerous commands go unchecked. Each node needs a policy for what happens on overrun:
| Policy | String value | What happens | Best for |
|---|---|---|---|
| Warn | "warn" | Log a warning, continue normally | Development, non-critical nodes |
| Skip | "skip" | Drop the late tick, run next on schedule | Sensors where one missed reading is acceptable |
| SafeMode | "safe_mode" | Call enter_safe_state() on the node | Motor controllers, actuators |
| Stop | "stop" | Shut down the entire scheduler | Safety monitors --- last line of defense |
Update the node constructors with miss policies:
import horus
us = horus.us # 1e-6
ms = horus.ms # 1e-3
# IMU: skip missed ticks --- one stale reading is fine
imu = horus.Node(
name="imu_sensor",
tick=imu_tick,
rate=100,
order=0,
on_miss="skip",
pubs=[horus.Imu],
)
# PID controller: enter safe state on deadline miss
controller = horus.Node(
name="pid_controller",
tick=controller_tick,
rate=50,
order=10,
budget=10 * ms,
deadline=18 * ms,
on_miss="safe_mode",
subs=[horus.Imu],
pubs=[horus.CmdVel],
)
# Safety monitor: stop everything if it misses a deadline
safety = horus.Node(
name="safety_monitor",
tick=safety_tick,
rate=100,
order=1,
budget=2 * ms,
deadline=5 * ms,
on_miss="stop",
subs=[horus.CmdVel],
)
The PID controller now has explicit budget=10*ms and deadline=18*ms, overriding the auto-derived values. The safety monitor has a tight budget=2*ms --- it does minimal work per tick and must finish fast.
Step 4: Move the Planner to Compute
The path planner runs algorithms that can take 10-50 ms. If it runs on the RT thread, a slow planning cycle blocks the PID controller. Use compute=True to move it to a thread pool:
planner = horus.Node(
name="path_planner",
tick=planner_tick,
rate=10,
order=50,
compute=True, # runs on worker thread pool, not RT thread
subs=["odom"],
pubs=["plan.target"],
)
With compute=True, the planner runs on a separate thread pool. It cannot block the 50 Hz PID controller or the 100 Hz safety monitor. Note that rate=10 on a Compute node is a frequency cap --- it limits how often the planner ticks, but there is no budget or deadline enforcement. This is the right choice: path planning takes variable time, and a 50 ms planning cycle is normal, not a failure.
rate= on a Compute node does not make it RT. It only limits tick frequency. A rate=10, compute=True node ticks at most 10 times per second with no timing enforcement. Drop compute=True if you need deadline enforcement.
Step 5: Configure the Scheduler for Real-Time
Enable OS-level real-time scheduling on the scheduler:
sched = horus.Scheduler(
tick_rate=100,
rt=True, # request SCHED_FIFO + mlockall
watchdog_ms=500, # detect frozen nodes
max_deadline_misses=5, # stop after 5 consecutive misses
)
rt=Truerequests real-time OS scheduling. Falls back gracefully if permissions are unavailable --- the same code runs on a developer laptop (without RT) and a production robot (with RT).watchdog_ms=500detects nodes that stop responding. Graduated response: warning at 500 ms, unhealthy at 1000 ms, isolated at 1500 ms.max_deadline_misses=5stops the scheduler after 5 total deadline misses --- a system-wide safety net.
Step 6: Complete System
Here is the full program with all nodes configured, real-time scheduling, and safety policies:
import horus
import math
import gc
us = horus.us
ms = horus.ms
# ---- IMU Sensor (100 Hz) ----------------------------------------
def make_imu():
t = [0.0]
def tick(node):
t[0] += horus.dt()
imu = horus.Imu(
accel_x=0.1 * math.sin(t[0] * 2.0),
accel_y=0.05 * math.cos(t[0] * 3.0),
accel_z=9.81,
gyro_x=0.01,
gyro_y=0.0,
gyro_z=0.05,
)
node.send("imu.data", imu)
return horus.Node(
name="imu_sensor",
tick=tick,
rate=100,
order=0,
on_miss="skip", # one stale reading is acceptable
pubs=[horus.Imu],
)
# ---- PID Controller (50 Hz) -------------------------------------
def make_controller():
gc.disable() # no GC pauses in the control loop
integral = [0.0]
target_speed = 0.5
def tick(node):
imu = node.recv("imu.data")
if imu is None:
return
# PID compute --- no allocations in the fast path
error = target_speed - imu.accel_x
integral[0] += error * horus.dt()
command = 2.0 * error + 0.1 * integral[0]
cmd = horus.CmdVel(linear=command, angular=0.0)
node.send("cmd_vel", cmd)
# Collect GC only if budget allows
if horus.budget_remaining() > 0.003:
gc.collect(generation=0)
def enter_safe_state(node):
# Called when deadline is missed (on_miss="safe_mode")
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
node.log_warning("Controller entered safe state --- zero velocity")
return horus.Node(
name="pid_controller",
tick=tick,
shutdown=enter_safe_state,
rate=50,
order=10,
budget=10 * ms,
deadline=18 * ms,
on_miss="safe_mode", # zero velocity on deadline miss
subs=[horus.Imu],
pubs=[horus.CmdVel],
)
# ---- Path Planner (10 Hz, Compute) ------------------------------
def make_planner():
waypoints = [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0)]
idx = [0]
def tick(node):
odom = node.recv("odom")
if odom is None:
return
wx, wy = waypoints[idx[0] % len(waypoints)]
dx = wx - odom.x
dy = wy - odom.y
dist = math.sqrt(dx * dx + dy * dy)
if dist < 0.2:
idx[0] += 1
heading = math.atan2(dy, dx)
node.send("plan.target", {
"heading": heading,
"distance": dist,
"waypoint": idx[0] % len(waypoints),
})
return horus.Node(
name="path_planner",
tick=tick,
rate=10,
order=50,
compute=True, # thread pool --- does not block RT nodes
subs=["odom"],
pubs=["plan.target"],
)
# ---- Safety Monitor (100 Hz) ------------------------------------
def make_safety():
def tick(node):
cmd = node.recv("cmd_vel")
if cmd is None:
return
if abs(cmd.linear) > 2.0:
node.log_warning(f"Unsafe velocity: {cmd.linear:.2f} m/s")
node.request_stop()
if abs(cmd.angular) > 1.5:
node.log_warning(f"Unsafe angular rate: {cmd.angular:.2f} rad/s")
node.request_stop()
return horus.Node(
name="safety_monitor",
tick=tick,
rate=100,
order=1, # runs right after IMU, before controller
budget=2 * ms,
deadline=5 * ms,
on_miss="stop", # stop everything if safety check is late
priority=95, # highest priority RT thread
subs=[horus.CmdVel],
)
# ---- Main -------------------------------------------------------
print("Starting multi-rate robot controller")
print(" IMU Sensor: 100 Hz (Rt, on_miss=skip)")
print(" PID Controller: 50 Hz (Rt, on_miss=safe_mode)")
print(" Path Planner: 10 Hz (Compute)")
print(" Safety Monitor: 100 Hz (Rt, on_miss=stop, priority=95)")
print()
sched = horus.Scheduler(
tick_rate=100,
rt=True,
watchdog_ms=500,
max_deadline_misses=5,
)
sched.add(make_imu())
sched.add(make_controller())
sched.add(make_planner())
sched.add(make_safety())
sched.run()
Press Ctrl+C to stop. The timing report shows budget and deadline statistics for each node.
Step 7: Verify Topic Rates
After starting the system, open a second terminal and check that each node is publishing at the expected rate:
# Check IMU is publishing at ~100 Hz
horus topic hz imu.data
# Expected: ~100.0 Hz
# Check PID controller output at ~50 Hz
horus topic hz cmd_vel
# Expected: ~50.0 Hz
# Check planner output at ~10 Hz
horus topic hz plan.target
# Expected: ~10.0 Hz
If a topic's measured rate is significantly lower than expected, a node is missing deadlines or being throttled. Check the scheduler logs for deadline miss warnings.
You can also echo a topic to see live data:
horus topic echo cmd_vel
horus topic echo imu.data
Or use the full monitoring dashboard:
horus monitor
Python Real-Time: What Works, What Doesn't
Python runs on CPython, which has a Global Interpreter Lock (GIL) and a garbage collector. Both affect timing predictability.
Practical Frequency Limits
| Frequency | Period | Python viable? | Notes |
|---|---|---|---|
| 1-10 Hz | 100-1000 ms | Yes | Huge budget, GIL overhead is negligible |
| 10-50 Hz | 20-100 ms | Yes | Plenty of time for Python + ML inference |
| 50-100 Hz | 10-20 ms | Yes, with care | Budget is tight but achievable for simple logic |
| 100-500 Hz | 2-10 ms | Marginal | GC pauses (1-5 ms) can blow the budget |
| 500+ Hz | <2 ms | No | GIL + GC make consistent timing impossible |
The practical ceiling for Python RT is about 100 Hz. At 100 Hz, your tick budget is 8 ms. A typical Python tick() doing sensor reads and simple math takes 0.1-2 ms, leaving plenty of margin. At 500 Hz, the budget drops to 1.6 ms, where a single garbage collection pause blows through the deadline.
GC Mitigation
The PID controller in Step 6 uses gc.disable() and manual gc.collect(generation=0) during budget headroom. This pattern eliminates GC pauses from the critical path:
gc.disable() # no automatic GC
def tick(node):
# ... fast-path control logic, no allocations ...
if horus.budget_remaining() > 0.003: # 3 ms headroom
gc.collect(generation=0) # minor collection only
Only use this for nodes where GC pauses are unacceptable. For most nodes at 10-50 Hz, the default garbage collector works fine.
When to Use Rust Instead
For this tutorial's architecture:
- IMU at 100 Hz --- Python works, but you are near the ceiling. If timing jitter is a problem, move to Rust.
- PID at 50 Hz --- Python is comfortable. 20 ms period gives plenty of budget.
- Planner at 10 Hz --- Python is ideal. Complex algorithms, ML inference, data structures --- Python's strengths.
- Safety at 100 Hz --- Python works for monitoring. For hard safety-critical nodes, consider Rust.
For motor control at 1 kHz+, use the Rust Real-Time Control tutorial. Python cannot reliably sustain sub-millisecond tick budgets.
Key Takeaways
rate=implies RT --- auto-derives budget (80%), deadline (95%), and promotes to Rt execution class. Override with explicitbudget=anddeadline=as needed.- Safety is explicit --- always set
on_miss=for safety-critical nodes."safe_mode"zeros outputs."stop"halts the scheduler. - Separate by execution class --- keep control loops on RT threads, move heavy computation to
compute=True. rt=Truedegrades gracefully --- request RT on the Scheduler, checksched.has_full_rt()in production.- Budget and deadline are in seconds --- use
horus.us(1e-6) andhorus.ms(1e-3) for readability.budget=300means 300 seconds, not 300 microseconds. - Python is fine for 10-100 Hz --- sensor fusion, navigation, ML inference, safety monitoring. Use Rust for 500+ Hz motor loops.
Next Steps
- Deterministic Mode --- reproducible simulations with fixed timesteps
- Safety Monitor --- graduated watchdog and health states
- Testing and Deterministic Mode (Python) ---
tick_once(), deterministic mode, and pytest patterns - Real-Time Control (Rust) --- same tutorial in Rust, with 1 kHz motor control
See Also
- Real-Time Systems (Python) --- budget, deadline, and miss policy reference
- Execution Classes (Python) --- how RT auto-detection works
- Scheduler Deep-Dive (Python) --- full scheduler reference
- Python Bindings --- complete API surface