Rate & Params

Two utility types for timing control and dynamic configuration outside the scheduler's node lifecycle.


Rate — Fixed-Frequency Loop

horus.Rate provides drift-compensated rate limiting for loops that need to run at a fixed frequency. Use it for standalone scripts and background threads — inside nodes, the scheduler handles timing automatically.

import horus

rate = horus.Rate(100)  # 100 Hz

while True:
    do_work()
    rate.sleep()  # Blocks until next tick (drift-compensated)

Constructor

horus.Rate(hz: float)  # Target frequency in Hz

Methods

MethodReturnsDescription
rate.sleep()Block until next tick (compensates for work time)
rate.remaining()floatSeconds until next tick
rate.reset()Reset the timer (next sleep starts fresh)

Example: Hardware Driver Thread

import horus
import threading

def imu_reader_thread():
    rate = horus.Rate(100)  # 100 Hz
    topic = horus.Topic(horus.Imu)

    while True:
        reading = read_imu_hardware()
        topic.send(reading)
        rate.sleep()

thread = threading.Thread(target=imu_reader_thread, daemon=True)
thread.start()

When to Use Rate vs Scheduler

Use CaseUse
Nodes with tick callbacksScheduler (handles timing, RT, safety)
Standalone scriptsRate
Background threads alongside schedulerRate
One-shot toolsNeither — just run once

Params — Runtime Parameters

horus.Params is a typed key-value store for dynamic configuration. Change gains, thresholds, or feature flags at runtime without restarting nodes.

import horus

params = horus.Params()
params.set("pid.kp", 1.5)
params.set("pid.ki", 0.01)

kp = params.get("pid.kp")  # 1.5

Constructor

horus.Params()  # Empty parameter store

Methods

MethodReturnsDescription
params.get(key)valueGet parameter value (raises if missing)
params.get_or(key, default)valueGet with fallback
params.set(key, value)Set parameter value
params.has(key)boolCheck if key exists
params.list_keys()list[str]All parameter names
params.remove(key)Remove a parameter
params.reset()Clear all parameters

Example: Dynamic PID Tuning

import horus

params = horus.Params()
params.set("kp", 2.0)
params.set("ki", 0.1)
params.set("kd", 0.05)

def controller_tick(node):
    kp = params.get_or("kp", 2.0)
    ki = params.get_or("ki", 0.1)
    kd = params.get_or("kd", 0.05)

    error = get_setpoint() - get_measured()
    output = kp * error + ki * integral + kd * derivative
    node.send("control", {"output": output})

# Another thread or monitoring tool can update params at runtime:
# params.set("kp", 3.0)  # Takes effect on next tick

Example: Feature Flags

params = horus.Params()
params.set("enable_slam", True)
params.set("max_speed", 1.0)

def tick(node):
    if params.get_or("enable_slam", False):
        run_slam()

    speed = min(velocity, params.get_or("max_speed", 1.0))
    node.send("cmd_vel", horus.CmdVel(linear=speed, angular=0.0))

See Also