Tutorial 3: Full Robot System (Python)
Looking for the Rust version? See Tutorial 3: Full Robot Integration.
Combine an IMU sensor, velocity commander, motor controller, and state estimator into a complete robot system with multi-rate scheduling.
What You'll Learn
- Composing 4+ nodes into a working system
- Multi-rate scheduling (different nodes at different frequencies)
- Data fusion (subscribing to multiple topics)
- Monitoring with
horus monitorandhorus topic echo
Architecture
ImuSensor (100Hz) ──→ imu.data ──→ StateEstimator (100Hz) ──→ robot.pose
Commander (10Hz) ──→ motor.cmd ──→ MotorController (50Hz) ──→ motor.state ──→ StateEstimator
The Code
import horus
import math
us = horus.us # microsecond constant for budget/deadline
# ── IMU Sensor (100 Hz) ─────────────────────────────────────
def make_imu():
t = [0.0]
def tick(node):
t[0] += horus.dt()
node.send("imu.data", {
"accel_x": 0.1 * math.sin(t[0] * 2.0),
"accel_y": 0.05 * math.cos(t[0] * 3.0),
"accel_z": 9.81,
"gyro_yaw": 0.05, # constant rotation
"timestamp": horus.now(),
})
return horus.Node(name="ImuSensor", tick=tick, rate=100, order=0,
pubs=["imu.data"])
# ── Commander (10 Hz) ────────────────────────────────────────
def make_commander():
t = [0.0]
def tick(node):
t[0] += horus.dt()
node.send("motor.cmd", {
"velocity": 1.0 + 0.5 * math.sin(t[0] * 0.3),
})
return horus.Node(name="Commander", tick=tick, rate=10, order=1,
pubs=["motor.cmd"])
# ── Motor Controller (50 Hz) ────────────────────────────────
def make_motor():
state = {"velocity": 0.0, "position": 0.0}
def tick(node):
cmd = node.recv("motor.cmd")
if cmd is not None:
state["velocity"] = cmd["velocity"]
state["position"] += state["velocity"] * horus.dt()
node.send("motor.state", {
"position": state["position"],
"velocity": state["velocity"],
})
def shutdown(node):
state["velocity"] = 0.0
print("Motor stopped safely")
return horus.Node(name="MotorController", tick=tick, shutdown=shutdown,
rate=50, order=2,
subs=["motor.cmd"], pubs=["motor.state"])
# ── State Estimator (100 Hz) ────────────────────────────────
def make_estimator():
"""Fuses IMU gyro + motor velocity into a robot pose estimate."""
pose = {"x": 0.0, "y": 0.0, "heading": 0.0}
def tick(node):
dt = horus.dt()
# Fuse IMU data (heading from gyro)
imu = node.recv("imu.data")
if imu is not None:
pose["heading"] += imu["gyro_yaw"] * dt
# Fuse motor state (position from velocity)
motor = node.recv("motor.state")
if motor is not None:
pose["x"] += motor["velocity"] * math.cos(pose["heading"]) * dt
pose["y"] += motor["velocity"] * math.sin(pose["heading"]) * dt
node.send("robot.pose", {
"x": pose["x"],
"y": pose["y"],
"heading": pose["heading"],
"timestamp": horus.now(),
})
return horus.Node(name="StateEstimator", tick=tick, rate=100, order=3,
subs=["imu.data", "motor.state"], pubs=["robot.pose"])
# ── Main ────────────────────────────────────────────────────
print("Starting full robot system...")
print(" ImuSensor: 100 Hz (order 0)")
print(" Commander: 10 Hz (order 1)")
print(" MotorController: 50 Hz (order 2)")
print(" StateEstimator: 100 Hz (order 3)")
print()
horus.run(
make_imu(),
make_commander(),
make_motor(),
make_estimator(),
tick_rate=100,
watchdog_ms=500,
)
Run and Monitor
# Terminal 1: Run the robot
horus run
# Terminal 2: Monitor topics
horus topic echo robot.pose
# Terminal 3: Full monitoring dashboard
horus monitor
Key Concepts
Multi-Rate Scheduling
Each node runs at its own rate. The scheduler handles the timing:
make_imu() # rate=100 — ticks 100 times per second
make_commander() # rate=10 — ticks 10 times per second
make_motor() # rate=50 — ticks 50 times per second
make_estimator() # rate=100 — ticks 100 times per second
Data Fusion
The state estimator subscribes to two topics and fuses them:
imu = node.recv("imu.data") # gyro → heading
motor = node.recv("motor.state") # velocity → position
Always call recv() on every subscribed topic every tick — even if you don't need the data yet. This prevents stale data accumulation.
Watchdog
watchdog_ms=500 detects frozen nodes. If any node's tick takes longer than 500ms, the safety monitor triggers.
Using the Scheduler Directly
For production configs with RT features:
scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500, rt=True)
scheduler.add(make_imu())
scheduler.add(make_commander())
scheduler.add(make_motor())
scheduler.add(make_estimator())
scheduler.run()
Next Steps
- Tutorial 4: Custom Messages (Python) — define your own message types
- Tutorial 3 (Rust) — same tutorial in Rust
- Scheduler API — advanced scheduling
See Also
- Full Robot (Rust) — Rust version
- Python Bindings — Python API reference