PID Controller (Python)
A reusable PID controller node that subscribes to a setpoint and measured value, computes a control output, and publishes the result. Includes anti-windup and output clamping.
Problem
You need a closed-loop controller (speed, position, temperature, etc.) that runs at a fixed rate with deterministic timesteps.
When To Use
- Motor speed control (RPM tracking)
- Position holding (arm joints, pan-tilt)
- Temperature regulation
- Any setpoint-tracking control loop
Prerequisites
- HORUS installed (Installation Guide)
- Familiarity with Nodes
horus.toml
[package]
name = "pid-controller-py"
version = "0.1.0"
language = "python"
Complete Code
import horus
from horus import Node, run, us, ms
# ── PID gains ─────────────────────────────────────────
KP = 2.0
KI = 0.5
KD = 0.1
OUTPUT_MIN = -1.0
OUTPUT_MAX = 1.0
INTEGRAL_MAX = 10.0 # anti-windup limit
# ── State ─────────────────────────────────────────────
integral = [0.0]
prev_error = [0.0]
def pid_init(node):
integral[0] = 0.0
prev_error[0] = 0.0
node.log_info(f"PID: kp={KP}, ki={KI}, kd={KD}")
def pid_tick(node):
setpoint = node.recv("setpoint")
measured = node.recv("measured")
if setpoint is None or measured is None:
return
sp = setpoint.get("value", 0.0) if isinstance(setpoint, dict) else float(setpoint)
mv = measured.get("value", 0.0) if isinstance(measured, dict) else float(measured)
# Error
error = sp - mv
# dt from framework clock — fixed in deterministic mode, real in production
dt = horus.dt()
# Proportional
p_term = KP * error
# Integral with anti-windup
integral[0] += error * dt
integral[0] = max(-INTEGRAL_MAX, min(INTEGRAL_MAX, integral[0]))
i_term = KI * integral[0]
# Derivative (on error, not measurement — simpler but noisier)
d_term = KD * (error - prev_error[0]) / dt if dt > 0 else 0.0
prev_error[0] = error
# IMPORTANT: clamp output to actuator limits
output = p_term + i_term + d_term
output = max(OUTPUT_MIN, min(OUTPUT_MAX, output))
node.send("control", {
"output": output,
"error": error,
"p": p_term,
"i": i_term,
"d": d_term,
})
pid = Node(
name="PID",
init=pid_init,
tick=pid_tick,
rate=100,
order=5,
budget=200 * us,
subs=["setpoint", "measured"],
pubs=["control"],
)
run(pid, tick_rate=100)
Expected Output
[HORUS] Scheduler running — tick_rate: 100 Hz
[HORUS] Node "PID" started (BestEffort, 100 Hz, budget: 200μs)
[HORUS] PID: kp=2.0, ki=0.5, kd=0.1
Key Points
horus.dt()gives the actual timestep — adapts to rate changes, fixed in deterministic mode for reproducible behavior- Anti-windup clamps the integral term to prevent saturation after long errors
- Output clamping prevents actuator damage (motors, heaters, etc.)
- Derivative on error is simple but amplifies noise — for noisy measurements, use derivative on measurement:
d_term = -KD * (mv - prev_mv) / dt - Deterministic mode (
deterministic=True) makesdt()return a fixed1/rate, so PID behavior is identical across runs
Variations
Derivative on Measurement (less noise):
d_term = -KD * (mv - prev_measured[0]) / dt if dt > 0 else 0.0
prev_measured[0] = mv
Runtime Gain Tuning (via Params):
import horus
params = horus.Params()
params.set("kp", 2.0)
params.set("ki", 0.5)
def pid_tick(node):
kp = params.get_or("kp", 2.0)
ki = params.get_or("ki", 0.5)
# ... use kp, ki in calculation
Cascaded PID (position → velocity → torque):
# Outer loop: position → velocity setpoint (10 Hz)
pos_pid = Node(name="pos_pid", tick=pos_tick, rate=10, order=0,
subs=["pos_setpoint", "pos_measured"], pubs=["vel_setpoint"])
# Inner loop: velocity → torque command (100 Hz)
vel_pid = Node(name="vel_pid", tick=vel_tick, rate=100, order=1,
subs=["vel_setpoint", "vel_measured"], pubs=["torque_cmd"])
run(pos_pid, vel_pid, tick_rate=100)
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Output oscillates wildly | Kd too high or noisy measurement | Lower Kd, use derivative-on-measurement, or low-pass filter |
| Output saturates at max/min | Ki too high or sustained error | Lower Ki or increase INTEGRAL_MAX with caution |
| Slow response to setpoint changes | Kp too low | Increase Kp (but watch for oscillation) |
| Different behavior in deterministic mode | Using time.time() instead of horus.dt() | Always use horus.dt() for timestep |
| Integral windup after long error | INTEGRAL_MAX too large | Reduce anti-windup limit |
See Also
- PID Controller (Rust) — Rust version
- Differential Drive (Python) — Uses PID for wheel speed control
- Clock API —
horus.dt()and deterministic mode - Rate & Params — Runtime gain tuning with Params