Scheduler API
The Scheduler orchestrates node execution with tick-rate control, real-time scheduling, watchdogs, and recording.
import horus
sched = horus.Scheduler(tick_rate=100, rt=True)
sched.add(my_node)
sched.run()
Constructor
horus.Scheduler(
*, # keyword-only
tick_rate=1000.0, # Global tick rate in Hz
rt=False, # Enable RT scheduling (memory locking, SCHED_FIFO)
deterministic=False, # SimClock, fixed dt, seeded RNG
blackbox_mb=0, # Flight recorder size (0 = disabled)
watchdog_ms=0, # Global watchdog timeout (0 = disabled)
recording=False, # Enable session recording
name=None, # Scheduler name for logging
cores=None, # CPU affinity list (e.g., [0, 1])
max_deadline_misses=None, # Escalation threshold
verbose=False, # Debug logging
telemetry=None, # Telemetry endpoint URL
)
Lifecycle Methods
| Method | Signature | Returns | Description |
|---|---|---|---|
add | add(node: Node) -> Scheduler | self (chaining) | Register a node |
run | run(duration: float = None) | — | Start tick loop. None = run forever |
stop | stop() | — | Signal graceful shutdown |
is_running | is_running() -> bool | bool | Check if scheduler is running |
status | status() -> str | "idle", "running", "stopped" | Current state |
current_tick | current_tick() -> int | Tick count | Current tick number |
scheduler_name | scheduler_name() -> str | Name | Scheduler name for logging |
add()
sched.add(node) -> Scheduler # Returns self for chaining
Registers a node with the scheduler. Returns self for chaining: sched.add(a).add(b).add(c).
Edge cases:
- Duplicate
nameraises an error — node names must be unique - Can be called before
run()only — adding nodes duringrun()is not supported - Does NOT call
init()— init happens lazily on firstrun()ortick_once()
run()
sched.run(duration: float = None)
Start the tick loop. Blocks until completion.
duration=None— run forever (until Ctrl+C, SIGTERM, orrequest_stop())duration=10.0— run for 10 seconds, then return- GIL is released during the Rust scheduler loop — other Python threads run freely
- GIL is re-acquired only when calling Python tick/init/shutdown callbacks (~500ns per acquire)
- Ctrl+C triggers graceful shutdown: all nodes get
shutdown()called
stop()
sched.stop()
Signal graceful shutdown from another thread or from within a node's request_stop().
Single-Tick Execution (Testing & Simulation)
| Method | Signature | Returns | Description |
|---|---|---|---|
tick_once | tick_once(node_names: list = None) | — | Execute one tick cycle (lazy init on first call) |
tick_for | tick_for(duration: float, node_names: list = None) | — | Run tick loop for a duration, then return |
tick_once()
sched.tick_once(node_names: list = None)
Execute exactly one tick cycle and return. Lazy init: on first call, init() is called on all nodes.
node_names=None— tick all nodes in ordernode_names=["sensor", "controller"]— tick only named nodes (skip others)- Each call advances the tick counter by 1
- In deterministic mode,
horus.dt()returns fixed1/rateper tick - Async nodes are handled transparently (async event loop runs internally)
Edge cases:
- Calling before
add()does nothing (no nodes to tick) - Filtered
node_namesthat don't exist are silently ignored - If a node's
init()fails,tick_once()raises based onfailure_policy
tick_for()
sched.tick_for(duration: float, node_names: list = None)
Run the tick loop for duration seconds, then return. Useful for bounded test runs:
sched.tick_for(1.0) # Run for 1 second at tick_rate, then return
Example: Testing with tick_once()
Step through ticks manually for unit testing and simulation:
import horus
results = []
def sensor_tick(node):
node.send("temp", {"value": 25.0 + horus.tick() * 0.1})
def logger_tick(node):
msg = node.recv("temp")
if msg:
results.append(msg["value"])
sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)
# Step through 5 ticks
for _ in range(5):
sched.tick_once()
assert len(results) == 5
assert results[0] == 25.0
print(f"Passed: {results}")
Runtime Mutation
| Method | Signature | Returns | Description |
|---|---|---|---|
set_node_rate | set_node_rate(name: str, rate: float) | — | Change a node's tick rate at runtime |
set_tick_budget | set_tick_budget(name: str, budget_us: int) | — | Change a node's tick budget at runtime (microseconds) |
add_critical_node | add_critical_node(name: str, timeout_ms: int) | — | Mark node as safety-critical with watchdog timeout |
remove_node | remove_node(name: str) -> bool | bool | Exclude a node from stats and queries (node still ticks until next restart) |
Example: Runtime Safety Configuration
Mark safety-critical nodes and adjust budgets at runtime:
import horus
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd:
node.send("motor_cmd", {"rpm": cmd.linear * 100})
motor = horus.Node(
name="motor",
subs=[horus.CmdVel],
pubs=["motor_cmd"],
tick=motor_tick,
rate=1000,
budget=300 * horus.us,
on_miss="safe_mode",
)
sched = horus.Scheduler(tick_rate=1000, watchdog_ms=500)
sched.add(motor)
# Mark motor as critical — triggers enter_safe_state() on all nodes if motor
# exceeds 500ms without ticking
sched.add_critical_node("motor", timeout_ms=500)
# Adjust budget at runtime (e.g., after profiling shows headroom)
sched.set_tick_budget("motor", 200) # tighten to 200μs
# Check RT capabilities
if sched.has_full_rt():
print("Full RT: memory locked, SCHED_FIFO active")
else:
for d in sched.degradations():
print(f"RT degradation: {d['feature']} — {d['reason']}")
sched.run()
# After run, inspect safety stats
stats = sched.safety_stats()
if stats:
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
print(f"Watchdog expirations: {stats.get('watchdog_expirations', 0)}")
Introspection
| Method | Signature | Returns | Description |
|---|---|---|---|
get_node_stats | get_node_stats(name: str) -> dict | Metrics dict | Get node performance stats |
get_all_nodes | get_all_nodes() -> list | Node info list | Get all registered nodes |
get_node_names | get_node_names() -> list[str] | Name list | Get all node names |
get_node_count | get_node_count() -> int | Count | Number of registered nodes |
has_node | has_node(name: str) -> bool | bool | Check if a node exists |
get_node_info | get_node_info(name: str) -> Optional[int] | Order or None | Get execution order of a node |
Example: Introspection at Runtime
sched = horus.Scheduler(tick_rate=100, name="my_robot")
sched.add(sensor)
sched.add(controller)
# After starting (e.g., in a monitoring thread)
print(f"Scheduler: {sched.scheduler_name()}")
print(f"Status: {sched.status()}")
print(f"Nodes: {sched.get_node_names()}")
print(f"Count: {sched.get_node_count()}")
print(f"Has motor? {sched.has_node('motor')}")
# Per-node stats
for name in sched.get_node_names():
stats = sched.get_node_stats(name)
print(f" {name}: {stats['total_ticks']} ticks, avg {stats.get('avg_tick_duration_ms', 0):.2f}ms")
RT & Safety
| Method | Signature | Returns | Description |
|---|---|---|---|
capabilities | capabilities() -> dict | Capability dict | RT support, CPU features |
has_full_rt | has_full_rt() -> bool | bool | Full RT capabilities available? |
degradations | degradations() -> list[dict] | Degradation list | RT features requested but unavailable (dicts with feature, reason, severity keys) |
safety_stats | safety_stats() -> dict | Stats dict | Watchdog stats, deadline misses, health states |
Recording & Replay
| Method | Signature | Returns | Description |
|---|---|---|---|
is_recording | is_recording() -> bool | bool | Session recording active? |
is_replaying | is_replaying() -> bool | bool | Session replay active? |
stop_recording | stop_recording() -> list[str] | File paths | Stop recording, return session files |
list_recordings | list_recordings() -> list[str] | Session list | List available recordings |
delete_recording | delete_recording(name: str) | None | Delete a recorded session |
Example: Recording and Replay
import horus
def sensor_tick(node):
node.send("imu", horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81))
sensor = horus.Node(name="imu", pubs=[horus.Imu], tick=sensor_tick, rate=100)
# Record a 5-second session
sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.run(duration=5.0)
# Stop recording and get file paths
files = sched.stop_recording()
print(f"Recorded to: {files}")
# List all recordings
for rec in sched.list_recordings():
print(f" Session: {rec}")
# Delete a recording
sched.delete_recording(files[0])
Context Manager
with horus.Scheduler(tick_rate=100) as sched:
sched.add(sensor_node)
sched.add(controller_node)
sched.run(duration=10.0) # Run for 10 seconds
# sched.stop() called automatically on exit
Deterministic Mode
Deterministic mode uses SimClock (fixed dt), seeded RNG, and sequential execution — every run produces identical output:
import horus
outputs = []
def physics_tick(node):
# horus.dt() returns fixed 1/rate in deterministic mode
# horus.rng_float() returns tick-seeded values (reproducible)
noise = horus.rng_float() * 0.01
position = horus.dt() * 10.0 + noise
outputs.append(position)
node = horus.Node(name="physics", tick=physics_tick, rate=100)
# Run 1
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(node)
sched.run(duration=1.0)
run1 = outputs.copy()
# Run 2 — identical output
outputs.clear()
sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
sched2.add(horus.Node(name="physics", tick=physics_tick, rate=100))
sched2.run(duration=1.0)
run2 = outputs.copy()
assert run1 == run2, "Deterministic mode guarantees identical output"
Multi-Node System
import horus
def sensor_tick(node):
reading = horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81)
node.send("imu", reading)
def controller_tick(node):
imu = node.recv("imu")
if imu:
cmd = horus.CmdVel(linear=0.5 if imu.accel_z > 9.0 else 0.0, angular=0.0)
node.send("cmd_vel", cmd)
sensor = horus.Node(name="imu_sensor", pubs=[horus.Imu], tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="nav", subs=[horus.Imu], pubs=[horus.CmdVel], tick=controller_tick, rate=50, order=1)
sched = horus.Scheduler(tick_rate=100, watchdog_ms=500)
sched.add(sensor)
sched.add(controller)
sched.run()
run() Convenience Function
One-liner — creates a Scheduler, adds all nodes, and runs:
horus.run(
*nodes, # Node instances to run
duration=None, # Seconds to run (None = forever)
tick_rate=1000.0, # Global tick rate
rt=False, # RT scheduling
deterministic=False, # Deterministic mode
watchdog_ms=0, # Watchdog timeout
blackbox_mb=0, # Flight recorder
recording=False, # Session recording
name=None, # Scheduler name
cores=None, # CPU affinity
max_deadline_misses=None, # Miss threshold
verbose=False, # Debug logging
telemetry=None, # Telemetry endpoint
)
Equivalent to:
sched = horus.Scheduler(tick_rate=tick_rate, rt=rt, ...)
for node in nodes:
sched.add(node)
sched.run(duration=duration)
Execution Classes
The scheduler automatically classifies each node into an execution class based on its configuration:
| Node Configuration | Execution Class | Thread | Timing |
|---|---|---|---|
rate=100, budget=X or deadline=X | Rt (auto-detected) | Dedicated RT thread (SCHED_FIFO if available) | Strict budget/deadline enforcement |
compute=True | Compute | Worker thread pool | No timing guarantee |
on="topic_name" | Event | Event-triggered | Tick only when topic has data |
async def tick | AsyncIo | Async I/O thread pool | Async event loop scheduling |
| None of the above | BestEffort | Main tick thread | Best-effort timing |
RT auto-detection: Setting rate with budget or deadline automatically classifies the node as RT — no explicit flag needed on the node. The scheduler's rt=True enables the RT runtime (memory locking, SCHED_FIFO); individual nodes opt in via timing constraints.
Mutual exclusion: compute=True, on="topic", and async def tick are mutually exclusive. Combining them raises an error.
Design Decisions
Why keyword-only constructor? All parameters are keyword-only (* in the signature) to prevent positional argument mistakes. Scheduler(100, True) is ambiguous — is 100 the tick rate or watchdog timeout? Scheduler(tick_rate=100, rt=True) is unambiguous.
Why run() blocks? The scheduler's tick loop must own the execution thread for deterministic timing. Returning a future or running in a background thread would add jitter. For concurrent Python work (HTTP server, monitoring), use async def nodes or Python threads — the GIL is released during run().
Why rt=True on the scheduler, not per-node? RT requires system-level setup (memory locking, SCHED_FIFO). This is a process-level decision, not per-node. Individual nodes opt into RT timing via budget/deadline — the scheduler handles the runtime.
Why tick_once() for testing? Unit tests need deterministic, step-by-step execution. tick_once() executes one complete tick cycle (all nodes in order) and returns. This makes tests reproducible without timers or sleeps.
Why release the GIL during run()? The tick loop is Rust code — no Python objects are accessed between callbacks. Releasing the GIL lets other Python threads (Flask server, monitoring, logging) run concurrently. The GIL is re-acquired only for tick()/init()/shutdown() callbacks (~500ns per acquire).
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
run() blocks | Deterministic timing, no jitter | Must use threads for concurrent Python work |
| GIL released during run | Other Python threads work | ~500ns GIL re-acquire per tick per node |
| Keyword-only constructor | No positional argument mistakes | More typing than positional args |
tick_once() for testing | Step-by-step deterministic tests | Must manually loop for multi-tick scenarios |
| Auto RT classification | No manual RT flags | Must understand budget/deadline → RT mapping |
| Context manager support | Clean resource cleanup | Extra indentation |
See Also
- Node API — Node constructor, methods, lifecycle
- Clock API — Framework clock functions
- Deterministic Mode — Full deterministic mode guide
- Safety Monitor — Graduated degradation
- Record & Replay — Session recording guide
- Scheduler Deep-Dive — Python scheduler patterns
- Rust Scheduler API — Rust equivalent