Tutorial 2: Motor Controller (Python)
Build a motor controller that subscribes to velocity commands, simulates motor physics, and publishes position/velocity feedback. Includes safe shutdown.
What You'll Learn
- Subscribing to command topics
- Publishing state feedback
- Managing state between ticks
- Multiple topics per node (sub + pub)
- Safe shutdown for actuators
Step 1: Create the Project
horus new motor-tutorial -p
cd motor-tutorial
Step 2: Write the Code
Replace main.py:
import horus
import math
# ── Commander Node ───────────────────────────────────────────
def make_commander():
"""Generates sine-wave velocity commands."""
t = [0.0]
def tick(node):
t[0] += horus.dt()
velocity = 2.0 * math.sin(t[0] * 0.5) # oscillate ±2 rad/s
node.send("motor.cmd", {"velocity": velocity, "max_torque": 5.0})
return horus.Node(name="Commander", tick=tick, rate=100, order=0,
pubs=["motor.cmd"])
# ── Motor Controller Node ───────────────────────────────────
def make_motor():
"""Simulates a motor: integrates velocity → position."""
state = {"position": 0.0, "velocity": 0.0}
def tick(node):
cmd = node.recv("motor.cmd")
if cmd is not None:
state["velocity"] = cmd["velocity"]
# Integrate velocity → position
dt = horus.dt()
state["position"] += state["velocity"] * dt
# Publish state feedback
node.send("motor.state", {
"position": state["position"],
"velocity": state["velocity"],
"timestamp": horus.now(),
})
def shutdown(node):
# SAFETY: stop the motor before exiting
state["velocity"] = 0.0
node.send("motor.cmd", {"velocity": 0.0, "max_torque": 0.0})
print("Motor stopped safely")
return horus.Node(name="MotorController", tick=tick, shutdown=shutdown,
rate=100, order=1,
subs=["motor.cmd"], pubs=["motor.state"])
# ── Display Node ────────────────────────────────────────────
def make_display():
"""Prints motor state every 50 samples."""
count = [0]
def tick(node):
msg = node.recv("motor.state")
if msg is not None:
count[0] += 1
if count[0] % 50 == 0:
print(f"[#{count[0]}] pos={msg['position']:.2f} rad"
f" vel={msg['velocity']:.2f} rad/s")
return horus.Node(name="StateDisplay", tick=tick, rate=100, order=2,
subs=["motor.state"])
# ── Main ────────────────────────────────────────────────────
print("Starting motor controller tutorial...\n")
horus.run(make_commander(), make_motor(), make_display())
Step 3: Run It
horus run
Starting motor controller tutorial...
[#50] pos=0.12 rad vel=0.50 rad/s
[#100] pos=0.95 rad vel=1.73 rad/s
[#150] pos=2.84 rad vel=1.98 rad/s
[#200] pos=4.76 rad vel=0.97 rad/s
Press Ctrl+C — you'll see "Motor stopped safely" confirming the shutdown callback ran.
Understanding the Code
Safe Shutdown
The shutdown callback is critical for actuator nodes. When you press Ctrl+C, HORUS calls shutdown() on every node before exiting:
def shutdown(node):
state["velocity"] = 0.0
node.send("motor.cmd", {"velocity": 0.0, "max_torque": 0.0})
print("Motor stopped safely")
Without this, a real motor would continue at its last commanded velocity.
Data Flow
Commander (order=0) → motor.cmd → MotorController (order=1) → motor.state → Display (order=2)
Lower order numbers run first each tick, ensuring the commander publishes before the controller reads.
State Between Ticks
The state dict persists across ticks via closure. The motor integrates velocity into position every tick:
state["position"] += state["velocity"] * dt
Using a Class for State
For complex nodes, a class is cleaner than closures. Here is a complete motor controller rewritten as a class:
import horus
class MotorController:
def __init__(self):
self.kp = 1.5
self.target_speed = 0.0
def tick(self, node):
cmd = node.recv("cmd_vel")
if cmd:
self.target_speed = cmd.linear
error = self.target_speed - self.current_speed()
node.send("motor_cmd", {"rpm": error * self.kp})
def shutdown(self, node):
node.send("motor_cmd", {"rpm": 0.0})
node.log_info("Motors zeroed")
def current_speed(self):
# In a real system, read from an encoder
return 0.0
motor = MotorController()
node = horus.Node(
name="motor",
pubs=["motor_cmd"],
subs=["cmd_vel"],
tick=motor.tick,
shutdown=motor.shutdown,
rate=100
)
horus.run(node)
Python automatically binds self when you pass motor.tick as the callback. The scheduler calls tick(node) which becomes motor.tick(node) with self bound. This means your tick function receives both the instance state (self) and the HORUS node handle (node) without any extra wiring.
Lifecycle: Shutdown Guarantees
The scheduler calls shutdown(node) on Ctrl+C, SIGTERM, or when the run duration expires. If shutdown raises an exception, it is caught and logged — other nodes still shut down normally. This means you can safely send final commands (like zeroing motors) without worrying about one node's failure preventing cleanup of the others.
Next Steps
- Tutorial 3: Full Robot (Python) — combine sensors and motors into a complete system
- Tutorial 2 (Rust) — same tutorial in Rust
- Common Mistakes — avoid shutdown pitfalls
See Also
- Motor Controller (Rust) — Rust version
- Python Bindings — Python API reference