PID Controller (Python)

A reusable PID controller node that subscribes to a setpoint and measured value, computes a control output, and publishes the result. Includes anti-windup and output clamping.

Problem

You need a closed-loop controller (speed, position, temperature, etc.) that runs at a fixed rate with deterministic timesteps.

When To Use

  • Motor speed control (RPM tracking)
  • Position holding (arm joints, pan-tilt)
  • Temperature regulation
  • Any setpoint-tracking control loop

Prerequisites

horus.toml

[package]
name = "pid-controller-py"
version = "0.1.0"
language = "python"

Complete Code

import horus
from horus import Node, run, us, ms

# ── PID gains ─────────────────────────────────────────
KP = 2.0
KI = 0.5
KD = 0.1
OUTPUT_MIN = -1.0
OUTPUT_MAX = 1.0
INTEGRAL_MAX = 10.0  # anti-windup limit

# ── State ─────────────────────────────────────────────
integral = [0.0]
prev_error = [0.0]

def pid_init(node):
    integral[0] = 0.0
    prev_error[0] = 0.0
    node.log_info(f"PID: kp={KP}, ki={KI}, kd={KD}")

def pid_tick(node):
    setpoint = node.recv("setpoint")
    measured = node.recv("measured")

    if setpoint is None or measured is None:
        return

    sp = setpoint.get("value", 0.0) if isinstance(setpoint, dict) else float(setpoint)
    mv = measured.get("value", 0.0) if isinstance(measured, dict) else float(measured)

    # Error
    error = sp - mv

    # dt from framework clock — fixed in deterministic mode, real in production
    dt = horus.dt()

    # Proportional
    p_term = KP * error

    # Integral with anti-windup
    integral[0] += error * dt
    integral[0] = max(-INTEGRAL_MAX, min(INTEGRAL_MAX, integral[0]))
    i_term = KI * integral[0]

    # Derivative (on error, not measurement — simpler but noisier)
    d_term = KD * (error - prev_error[0]) / dt if dt > 0 else 0.0
    prev_error[0] = error

    # IMPORTANT: clamp output to actuator limits
    output = p_term + i_term + d_term
    output = max(OUTPUT_MIN, min(OUTPUT_MAX, output))

    node.send("control", {
        "output": output,
        "error": error,
        "p": p_term,
        "i": i_term,
        "d": d_term,
    })

pid = Node(
    name="PID",
    init=pid_init,
    tick=pid_tick,
    rate=100,
    order=5,
    budget=200 * us,
    subs=["setpoint", "measured"],
    pubs=["control"],
)

run(pid, tick_rate=100)

Expected Output

[HORUS] Scheduler running — tick_rate: 100 Hz
[HORUS] Node "PID" started (BestEffort, 100 Hz, budget: 200μs)
[HORUS] PID: kp=2.0, ki=0.5, kd=0.1

Key Points

  • horus.dt() gives the actual timestep — adapts to rate changes, fixed in deterministic mode for reproducible behavior
  • Anti-windup clamps the integral term to prevent saturation after long errors
  • Output clamping prevents actuator damage (motors, heaters, etc.)
  • Derivative on error is simple but amplifies noise — for noisy measurements, use derivative on measurement: d_term = -KD * (mv - prev_mv) / dt
  • Deterministic mode (deterministic=True) makes dt() return a fixed 1/rate, so PID behavior is identical across runs

Variations

Derivative on Measurement (less noise):

d_term = -KD * (mv - prev_measured[0]) / dt if dt > 0 else 0.0
prev_measured[0] = mv

Runtime Gain Tuning (via Params):

import horus

params = horus.Params()
params.set("kp", 2.0)
params.set("ki", 0.5)

def pid_tick(node):
    kp = params.get_or("kp", 2.0)
    ki = params.get_or("ki", 0.5)
    # ... use kp, ki in calculation

Cascaded PID (position → velocity → torque):

# Outer loop: position → velocity setpoint (10 Hz)
pos_pid = Node(name="pos_pid", tick=pos_tick, rate=10, order=0,
               subs=["pos_setpoint", "pos_measured"], pubs=["vel_setpoint"])

# Inner loop: velocity → torque command (100 Hz)
vel_pid = Node(name="vel_pid", tick=vel_tick, rate=100, order=1,
               subs=["vel_setpoint", "vel_measured"], pubs=["torque_cmd"])

run(pos_pid, vel_pid, tick_rate=100)

Common Errors

SymptomCauseFix
Output oscillates wildlyKd too high or noisy measurementLower Kd, use derivative-on-measurement, or low-pass filter
Output saturates at max/minKi too high or sustained errorLower Ki or increase INTEGRAL_MAX with caution
Slow response to setpoint changesKp too lowIncrease Kp (but watch for oscillation)
Different behavior in deterministic modeUsing time.time() instead of horus.dt()Always use horus.dt() for timestep
Integral windup after long errorINTEGRAL_MAX too largeReduce anti-windup limit

See Also