Servo Controller (Python)
Controls a bus of servos (Dynamixel, hobby PWM, etc.). Subscribes to ServoCommand messages, enforces joint limits, writes to hardware, publishes JointState feedback. Returns all servos to home position on shutdown.
Problem
You need to control multiple servos on a bus with joint limit enforcement and safe shutdown from Python.
When To Use
- Robot arms with 3-12 servos on a shared bus
- Pan-tilt camera mounts
- Any multi-actuator system requiring coordinated position control
Prerequisites
- HORUS installed (Installation Guide)
- Python 3.8+ with
horuspackage
horus.toml
[package]
name = "servo-controller-py"
version = "0.1.0"
description = "Multi-servo bus controller with safe shutdown"
language = "python"
Complete Code
#!/usr/bin/env python3
"""Multi-servo controller with joint limits and safe shutdown."""
import math
import horus
from horus import Node, ServoCommand, JointState, us, ms
# ── Configuration ────────────────────────────────────────────
NUM_SERVOS = 6
HOME_POSITION = 0.0 # radians — safe resting position
JOINT_LIMIT = math.pi # +/- pi radians
TEMP_WARNING = 70.0 # celsius
SERVO_NAMES = [f"joint_{i}" for i in range(NUM_SERVOS)]
# ── State ────────────────────────────────────────────────────
positions = [HOME_POSITION] * NUM_SERVOS
velocities = [0.0] * NUM_SERVOS
temperatures = [35.0] * NUM_SERVOS
# ── Hardware stubs ───────────────────────────────────────────
def write_servo(servo_id, position):
"""Write position to a single servo — replace with real driver.
For Dynamixel: use dynamixel_sdk
For hobby PWM: use RPi.GPIO or pigpio
"""
if 0 <= servo_id < NUM_SERVOS:
positions[servo_id] = position
def read_feedback():
"""Read feedback from all servos — replace with real driver."""
return positions[:], velocities[:], temperatures[:]
# ── Node callbacks ───────────────────────────────────────────
def servo_tick(node):
# IMPORTANT: always recv() every tick to drain the buffer
cmd = node.recv("servo.command")
if cmd is not None:
# SAFETY: enforce joint limits to prevent mechanical damage
clamped_pos = max(-JOINT_LIMIT, min(JOINT_LIMIT, cmd.position))
write_servo(cmd.servo_id, clamped_pos)
# Read current state from all servos
pos, vel, temp = read_feedback()
# WARNING: check for overheating servos
for i, t in enumerate(temp):
if t > TEMP_WARNING:
print(f"WARNING: servo {i} temperature {t:.1f}C exceeds limit")
# Publish joint state feedback
feedback = JointState(
names=SERVO_NAMES,
positions=pos,
velocities=vel,
efforts=[0.0] * NUM_SERVOS,
)
node.send("servo.feedback", feedback)
def servo_init(node):
print(f"ServoController: initialized {NUM_SERVOS} servos")
# Read current positions from hardware before accepting commands
pos, _, _ = read_feedback()
for i, p in enumerate(pos):
positions[i] = p
def servo_shutdown(node):
# SAFETY: return ALL servos to home position before exiting
for i in range(NUM_SERVOS):
write_servo(i, HOME_POSITION)
feedback = JointState(
names=SERVO_NAMES,
positions=[HOME_POSITION] * NUM_SERVOS,
velocities=[0.0] * NUM_SERVOS,
efforts=[0.0] * NUM_SERVOS,
)
node.send("servo.feedback", feedback)
print("ServoController: all servos returned to home position")
# ── Main ─────────────────────────────────────────────────────
servo_node = Node(
name="ServoController",
tick=servo_tick,
init=servo_init,
shutdown=servo_shutdown,
rate=100, # 100 Hz servo update rate
order=0,
subs=["servo.command"],
pubs=["servo.feedback"],
budget=800 * us, # 800 us budget for bus communication
on_miss="warn",
)
if __name__ == "__main__":
horus.run(servo_node)
Expected Output
[HORUS] Scheduler running — tick_rate: 1000 Hz
ServoController: initialized 6 servos
[HORUS] Node "ServoController" started (100 Hz)
^C
ServoController: all servos returned to home position
[HORUS] Shutting down...
[HORUS] Node "ServoController" shutdown complete
Key Points
ServoCommandhasservo_id,position,speed, andenablefields — one command per servo per tickJointStatepublishes all servo positions/velocities in a single message with named joints- Joint limit clamping in
tick()prevents hardware damage regardless of upstream commands - Temperature monitoring catches overheating before servo damage
shutdown()returns to home — critical for robot arms that hold pose under gravityinit()reads current positions — prevents jerking to an unexpected position on startup- 800 us budget accounts for serial bus latency (Dynamixel at 1Mbps takes ~500 us for 6 servos)
Variations
- Batch commands: Accept all servo positions in a single
JointCommandmessage instead of per-servoServoCommand - Velocity mode: Replace position commands with velocity targets for wheeled joints
- Trajectory interpolation: Accept waypoints and interpolate between them over time
- Current limiting: Track servo current draw and reduce torque if limits are exceeded
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Servos jerk on startup | No initial position read before first command | Read current positions in init() before accepting commands |
| Overheating warnings constantly | Servo loaded beyond continuous rating | Reduce duty cycle or upgrade servos |
| Positions overshoot limits | Clamp range too wide for physical joint | Tighten clamp values to match hardware stops |
| Bus timeout errors | Baud rate mismatch or cable issue | Verify baud rate matches servo firmware |
| Servos do not return to home on Ctrl+C | shutdown() not implemented | Always implement shutdown() for actuator nodes |
See Also
- Servo Controller (Rust) — Rust version of this recipe
- JointState — Joint position feedback type