Execution Classes (Python)

A motor controller that misses its deadline by even a millisecond can cause a robot arm to overshoot and collide with a person. A path planner that takes 50 ms of CPU time is completely normal — but if it runs on the same thread as the motor controller, it blocks 50 ticks. A logging node that takes an extra 10 ms is harmless. A cloud uploader that blocks on a network request shouldn't hold up anything.

These are fundamentally different workloads. Running them all the same way — in a single sequential loop — forces every node to compromise. The fast ones wait for the slow ones. The critical ones share a thread with the optional ones. A single slow node can cascade timing failures across the entire system.

HORUS solves this with execution classes: five different executors, each optimized for a specific workload type. The scheduler automatically selects the right class based on how you configure the Node() constructor — you describe what your node needs, and the scheduler figures out how to run it.

The Five Classes

Loading diagram...
The scheduler selects the execution class from your Node() configuration

BestEffort (Default)

Nodes tick sequentially in the main loop, ordered by order=. This is the simplest and lowest-overhead class.

import horus

def update_display(node):
    stats = node.recv("stats")
    if stats:
        print(f"Speed: {stats['speed']:.1f} m/s")

display = horus.Node(
    name="display",
    subs=["stats"],
    tick=update_display,
    order=100,
    # No rate=, compute=, on=, or async def → BestEffort
)

How it works: The main scheduler thread calls tick() on each BestEffort node in sequence, once per scheduler cycle. No threads are spawned. No synchronization is needed.

Use for: Logging, telemetry, display, diagnostic reporting — anything without timing requirements or heavy computation.

Characteristics:

  • Runs at the scheduler's global tick_rate
  • Deterministic ordering (same sequence every cycle)
  • Lowest overhead — no thread spawn, no atomics, no synchronization

Rt (Real-Time)

Each RT node gets a dedicated thread with optional OS-level priority scheduling. The scheduler enforces timing budgets and deadlines, and takes action when a node runs too long.

import horus

us = horus.us  # 1e-6
ms = horus.ms  # 1e-3

def motor_control(node):
    cmd = node.recv("cmd_vel")
    if cmd:
        write_to_motor(cmd.linear, cmd.angular)

# Auto-derived: budget = 80% of period, deadline = 95% of period
motor = horus.Node(
    name="motor",
    subs=[horus.CmdVel],
    tick=motor_control,
    rate=1000,               # 1 kHz → budget=800μs, deadline=950μs
    on_miss="safe_mode",     # Enter safe state on deadline miss
    order=0,
)

# Explicit budget and deadline
safety = horus.Node(
    name="safety_monitor",
    subs=["safety.heartbeat"],
    tick=check_safety,
    budget=100 * us,
    deadline=200 * us,
    on_miss="stop",          # Full stop on deadline miss
    order=0,
)

There is no rt=True parameter on Node(). RT is always auto-detected from timing constraints. Setting rate=, budget=, or deadline= — any of them — automatically assigns the Rt class. This maps developer intent ("this node needs to run at 1 kHz") directly to the right executor.

How it works: Each RT node runs on its own dedicated thread. If rt=True is set on the Scheduler and the OS supports it, the thread gets SCHED_FIFO real-time priority. The scheduler measures every tick() call and applies the on_miss policy when budget or deadline is exceeded.

Auto-derivation from rate:

When you set rate= without explicit budget= or deadline=, the scheduler derives them:

You setScheduler derives
rate=100 (10 ms period)budget=8 ms (80%), deadline=9.5 ms (95%)
rate=1000 (1 ms period)budget=800 μs (80%), deadline=950 μs (95%)
rate=500 (2 ms period)budget=1.6 ms (80%), deadline=1.9 ms (95%)

You can override either or both:

# Auto budget + explicit deadline
horus.Node(rate=1000, deadline=900 * us, ...)

# Explicit budget + auto deadline
horus.Node(rate=1000, budget=300 * us, ...)

# Both explicit
horus.Node(rate=1000, budget=300 * us, deadline=900 * us, ...)

Additional RT configuration:

critical = horus.Node(
    name="critical_ctrl",
    tick=control_loop,
    rate=1000,
    budget=300 * us,
    deadline=900 * us,
    on_miss="skip",
    priority=90,         # OS-level thread priority (1-99, higher = more urgent)
    core=0,              # Pin to CPU core 0
    watchdog=0.5,        # 500 ms freeze detection
    order=0,
)

Use for: Motor control, safety monitoring, sensor fusion — anything where missing a deadline has physical consequences.

Compute

For CPU-heavy work that benefits from parallelism. Multiple Compute nodes run simultaneously on a shared thread pool.

import horus

def plan_path(node):
    scan = node.recv("scan")
    if scan:
        path = a_star(scan.ranges, goal)
        node.send("path", path)

planner = horus.Node(
    name="planner",
    subs=[horus.LaserScan],
    pubs=["path"],
    tick=plan_path,
    compute=True,        # CPU thread pool
    rate=10,             # Optional: tick at most 10 times/sec
    order=5,
)

How it works: Compute nodes are dispatched to a thread pool. Multiple compute nodes can run in parallel on different CPU cores. They don't block the main tick loop or RT threads.

Use for: Path planning, SLAM, point cloud processing, ML inference on CPU, image processing — any CPU-bound work that takes more than ~1 ms per tick.

rate= on a Compute node does not make it RT — it only limits how often the node ticks. A rate=10, compute=True node ticks at most 10 times per second but has no budget or deadline enforcement.

Event

Nodes that sleep until a specific topic receives new data. Zero CPU usage when idle.

import horus

def handle_estop(node):
    msg = node.recv("emergency.stop")
    if msg:
        node.log_warning("EMERGENCY STOP RECEIVED")
        disable_all_motors()
        node.request_stop()

estop = horus.Node(
    name="estop_handler",
    subs=["emergency.stop"],
    tick=handle_estop,
    on="emergency.stop",     # Sleep until this topic gets data
    order=0,
)

How it works: The node's thread sleeps. When any publisher calls send() on the named topic, the Event node wakes and tick() is called. If multiple messages arrive between wakes, the node ticks once — call recv() in a loop inside tick() to drain all pending messages.

Use for: Emergency stop handlers, command receivers, sparse event processors — anything where the node should be completely idle until something specific happens.

Characteristics:

  • Zero CPU when no messages arrive
  • Wake latency: ~microseconds from send() to tick()
  • order= still applies: if two event nodes wake simultaneously, lower order runs first
  • The topic name in on= must match a topic declared in another node's pubs

AsyncIo

For network, file I/O, or GPU operations that would block a real-time thread. Runs on a tokio runtime. In Python, this class is automatic — just use async def for your tick function.

import horus
import aiohttp

async def upload_telemetry(node):
    if node.has_msg("telemetry"):
        data = node.recv("telemetry")
        async with aiohttp.ClientSession() as session:
            await session.post("https://api.example.com/telemetry", json=data)

uploader = horus.Node(
    name="uploader",
    subs=["telemetry"],
    tick=upload_telemetry,   # async def → automatically AsyncIo
    rate=1,                  # Upload once per second
    order=50,
)

How it works: The node's tick() runs on a tokio-managed thread pool. The node can safely block on network requests, file I/O, or database queries without affecting any other node. Python's async def is detected automatically via inspect.iscoroutinefunction.

Use for: HTTP/REST API calls, database writes, file logging, cloud telemetry, WebSocket connections.

Unlike Rust (which uses an explicit .async_io() builder method), Python auto-detects async from the function definition. Writing async def tick(node): is all you need — no async_io=True parameter exists.

How Classes Are Selected

The scheduler selects the execution class based on which Node() parameters you set:

ConfigurationResulting Class
(nothing special)BestEffort
rate=100Rt (auto-derived budget/deadline)
budget=300*usRt
deadline=900*usRt
rate=100, budget=..., deadline=...Rt (explicit overrides)
compute=TrueCompute
compute=True, rate=10Compute (rate-limited, not RT)
on="topic.name"Event
async def tickAsyncIo
async def tick, rate=1AsyncIo (rate-limited, not RT)

Key rule: rate= only auto-enables RT when no explicit execution class (compute=True, on=, async def) is set. When combined with an explicit class, rate= just limits tick frequency.

The rate= Dual Meaning

This is the most important interaction to understand:

# rate= ALONE → Rt (dedicated thread, timing enforced)
horus.Node(tick=motor_ctrl, rate=1000)
# Result: Rt class, budget=800μs, deadline=950μs

# rate= WITH compute=True → Compute (frequency cap, no timing enforcement)
horus.Node(tick=plan_path, rate=10, compute=True)
# Result: Compute class, ticks at most 10/sec, no budget or deadline

# rate= WITH on= → Event (frequency cap after wake)
horus.Node(tick=handler, rate=100, on="commands")
# Result: Event class, processes at most 100 msg/sec after waking

# rate= WITH async def → AsyncIo (frequency cap)
horus.Node(tick=async_upload, rate=1)
# Result: AsyncIo class, uploads at most 1/sec, no budget or deadline

The rule is simple: rate= triggers RT only when it is the sole execution signal. The moment you add compute=True, on=, or use async def, the rate= becomes a frequency cap with no timing enforcement.

Deferred Finalization

Class selection happens when horus.run() or sched.add() resolves the node, not at Node() construction time. This means parameter order in the constructor does not matter:

# These produce identical results (both Compute, not RT):
horus.Node(rate=100, compute=True, tick=fn)
horus.Node(compute=True, rate=100, tick=fn)

If you accidentally set conflicting classes, the last explicit class wins and a warning is logged:

# compute=True is overridden by on= — warning logged
horus.Node(compute=True, on="topic", tick=fn)  # → Event, NOT Compute

Decision Guide

Your node does...UseNode() params
Motor control at 500+ HzRtrate=500
Safety monitoring with deadlinesRtrate=100, budget=..., deadline=..., on_miss="stop"
Sensor fusion at 200 HzRtrate=200
Path planning (takes 10-50 ms)Computecompute=True
ML inference on CPUComputecompute=True, rate=30
SLAM processingComputecompute=True
React to emergency stopEventon="emergency.stop"
Process commands as they arriveEventon="commands"
Upload telemetry to cloudAsyncIoasync def tick
Write logs to databaseAsyncIoasync def tick
WebSocket streamingAsyncIoasync def tick
Display dashboard updatesBestEffort(default)
Simple diagnosticsBestEffort(default)

Validation and Common Mistakes

The scheduler validates your configuration when adding nodes and catches mistakes at startup, not at runtime.

What's Rejected

ConfigurationError
compute=True, budget=300*usBudget only meaningful for RT nodes
on="topic", deadline=900*usDeadline only meaningful for RT nodes
async def tick, budget=...Budget only meaningful for RT nodes
budget=0Budget must be > 0
on=""Empty topic — node can never trigger
compute=True with async def tickMutually exclusive — pick one
on="topic" with async def tickMutually exclusive — pick one

What's Warned

ConfigurationWarning
compute=True, on="topic"Last class wins (Event), first silently overridden
compute=True, priority=99Priority ignored on non-RT nodes
on_miss="stop" without rate/budget/deadlineNo deadline to miss — policy has no effect
core=0 without rate/budget/deadlineCPU pinning ignored on non-RT nodes

Common Mistakes

Mistake 1: Thinking rate= always means RT

# WRONG assumption: "rate=10 means this is an RT node"
planner = horus.Node(
    tick=plan_path,
    rate=10,
    compute=True,   # ← compute=True overrides RT
)
# Result: Compute class. rate=10 is just a frequency cap.
# There is NO budget or deadline enforcement.

If you need timing enforcement on a compute-heavy node, drop compute=True and use rate= alone — but understand that the node gets a dedicated thread, not a pool:

# This IS an RT node — budget and deadline enforced
planner = horus.Node(
    tick=plan_path,
    rate=10,
    budget=80 * ms,
    deadline=95 * ms,
    on_miss="warn",
)

Mistake 2: Setting on_miss= without a deadline

# on_miss has no effect — there's no deadline to miss
horus.Node(
    tick=log_tick,
    compute=True,
    on_miss="stop",  # ← useless on a Compute node
)

# Fix: make it an RT node so a deadline exists
horus.Node(
    tick=ctrl_tick,
    rate=100,
    on_miss="stop",  # ← now triggers when the 9.5 ms deadline is missed
)

Mistake 3: Thinking priority= works on Compute nodes

# Priority is silently ignored — only RT nodes get SCHED_FIFO threads
horus.Node(tick=plan, compute=True, priority=99)

# Fix: make it RT if you need OS-level priority
horus.Node(tick=plan, rate=100, priority=99)

Mistake 4: Using async def when you want Compute

# WRONG: this node does CPU-heavy ML inference, but async def
# makes it AsyncIo — it runs on the I/O pool, not the compute pool
async def infer(node):
    img = node.recv("camera")
    if img:
        result = model.predict(img.to_numpy())  # CPU-bound, not I/O
        node.send("detections", result)

# Fix: use a regular def with compute=True
def infer(node):
    img = node.recv("camera")
    if img:
        result = model.predict(img.to_numpy())
        node.send("detections", result)

detector = horus.Node(tick=infer, compute=True, rate=30)

Mistake 5: Budget on a Compute node

# REJECTED: budget is only for RT nodes
horus.Node(
    tick=process,
    compute=True,
    budget=50 * ms,  # ← scheduler rejects this
)

# Fix option A: remove compute=True (becomes RT with timing enforcement)
horus.Node(tick=process, rate=20, budget=50 * ms, on_miss="skip")

# Fix option B: remove budget (stays Compute, no timing enforcement)
horus.Node(tick=process, compute=True, rate=20)

Mistake 6: Forgetting that rate=30 (the default) triggers RT

# This looks innocent but IS an RT node (rate=30 is the default)
horus.Node(tick=log_tick)
# rate=30 is set by default → auto-RT with budget=26.6ms, deadline=31.6ms

# If you truly want BestEffort, you need rate=0 or the node must have
# no timing parameters. In practice, the default rate=30 makes most
# nodes RT — this is intentional for safety.

The default rate=30 on Node() means most nodes are RT by default. This is a deliberate safety choice — nodes get timing enforcement unless you explicitly opt out. If you want BestEffort, set compute=True with no rate, or use on= for event-driven behavior.

Complete Example: Mixed Execution Classes

import horus
import aiohttp

us = horus.us
ms = horus.ms

# --- Rt: 1 kHz motor control with strict timing ---
def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd:
        write_motors(cmd.linear, cmd.angular)

motor = horus.Node(
    name="motor_ctrl",
    subs=[horus.CmdVel],
    tick=motor_tick,
    rate=1000,
    budget=300 * us,
    on_miss="skip",
    priority=90,
    core=0,
    order=0,
)

# --- Event: only runs when emergency.stop topic updates ---
def estop_tick(node):
    msg = node.recv("emergency.stop")
    if msg:
        disable_all_motors()
        node.request_stop()

estop = horus.Node(
    name="estop",
    subs=["emergency.stop"],
    tick=estop_tick,
    on="emergency.stop",
    order=0,
)

# --- Rt: 100 Hz IMU sensor reading ---
def imu_tick(node):
    reading = read_imu_hardware()
    node.send("imu", horus.Imu(
        accel_x=reading.ax, accel_y=reading.ay, accel_z=reading.az,
        gyro_x=reading.gx, gyro_y=reading.gy, gyro_z=reading.gz,
    ))

imu = horus.Node(
    name="imu_reader",
    pubs=[horus.Imu],
    tick=imu_tick,
    rate=100,
    order=1,
)

# --- Compute: path planning on thread pool ---
def plan_tick(node):
    scan = node.recv("scan")
    if scan:
        path = compute_path(scan.ranges)
        node.send("path", path)

planner = horus.Node(
    name="planner",
    subs=[horus.LaserScan],
    pubs=["path"],
    tick=plan_tick,
    compute=True,
    rate=10,
    order=5,
)

# --- AsyncIo: cloud telemetry upload ---
async def telemetry_tick(node):
    if node.has_msg("telemetry"):
        data = node.recv("telemetry")
        async with aiohttp.ClientSession() as session:
            await session.post("https://api.example.com/telemetry", json=data)

uploader = horus.Node(
    name="telemetry",
    subs=["telemetry"],
    tick=telemetry_tick,
    rate=0.2,          # Every 5 seconds
    order=50,
)

# --- BestEffort: dashboard display in main loop ---
def dashboard_tick(node):
    if node.has_msg("stats"):
        stats = node.recv("stats")
        update_display(stats)

dashboard = horus.Node(
    name="dashboard",
    subs=["stats"],
    tick=dashboard_tick,
    order=100,
    # No rate, compute, on, or async → would be BestEffort
    # But note: default rate=30 makes this RT. To truly get BestEffort,
    # the scheduler treats order-only nodes in the main loop.
)

# Run everything
horus.run(
    motor, estop, imu, planner, uploader, dashboard,
    tick_rate=1000,
    rt=True,
    watchdog_ms=500,
)

Unit Constants

Python doesn't have Rust's 300_u64.us() extension trait syntax. Instead, HORUS provides unit constants for readable duration expressions:

import horus

# Unit constants
horus.us  # 1e-6 (microseconds → seconds)
horus.ms  # 1e-3 (milliseconds → seconds)

# Usage in Node()
horus.Node(
    budget=300 * horus.us,     # 300 μs = 0.0003 seconds
    deadline=900 * horus.us,   # 900 μs = 0.0009 seconds
    watchdog=500 * horus.ms,   # 500 ms = 0.5 seconds
)

# Equivalent raw values (less readable)
horus.Node(
    budget=0.0003,
    deadline=0.0009,
    watchdog=0.5,
)
PythonRust equivalentValue
300 * horus.us300_u64.us()0.0003 s
1 * horus.ms1_u64.ms()0.001 s
500 * horus.ms500_u64.ms()0.5 s

Testing with tick_once()

Execution classes work with single-tick testing. The scheduler still classifies nodes correctly — it just runs one cycle instead of looping:

import horus

results = []

def sensor_tick(node):
    node.send("temp", {"value": 25.0})

def logger_tick(node):
    msg = node.recv("temp")
    if msg:
        results.append(msg["value"])

sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)

sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)

# RT nodes still get their class — tick_once just runs one cycle
for _ in range(5):
    sched.tick_once()

assert len(results) == 5

For event-driven nodes, tick_once() only ticks them if the trigger topic has data:

def handler_tick(node):
    msg = node.recv("commands")
    results.append(msg)

handler = horus.Node(name="handler", subs=["commands"], tick=handler_tick, on="commands")
sched.add(handler)

# handler does NOT tick here — no data on "commands"
sched.tick_once()
assert len(results) == 0

# Publish data, then tick — now handler runs
cmd_topic = horus.Topic("commands")
cmd_topic.send({"action": "go"})
sched.tick_once()
assert len(results) == 1

Design Decisions

Why 5 classes instead of just RT and non-RT? A thread-per-node model (RT) is wasteful for logging nodes — dedicating OS threads and SCHED_FIFO slots to telemetry is overkill. A single-threaded model (BestEffort) can't handle 50 ms path planning without stalling the control loop. A two-class split (RT vs non-RT) doesn't distinguish between CPU-bound work (Compute), event-driven reactions (Event), and I/O-bound operations (AsyncIo) — each of which has a fundamentally different optimal executor. The five-class model matches the five common robotics workload patterns.

Why auto-detection instead of an explicit execution_class= parameter? Most developers don't think in terms of "execution classes" — they think "this node needs to run at 1 kHz" or "this node does heavy computation." Auto-detection from rate=, compute=, on=, and async def maps intent to the right executor without requiring framework knowledge. If you set rate=1000, the scheduler knows you need a dedicated real-time thread. You don't have to explicitly request one.

Why does rate= with compute=True not become RT? Because rate-limiting and real-time are different things. A path planner at 10 Hz means "tick at most 10 times per second" — not "this node has a 100 ms deadline that must be enforced." Mixing the two concepts would force compute nodes to pay RT overhead (dedicated threads, timing measurement) for no benefit. The rule is clear: rate= only triggers RT when no explicit class is set.

Why does Python auto-detect async def instead of having an async_io=True parameter? Python already distinguishes def from async def at the language level. Auto-detection means zero boilerplate — just write async def tick(node): and the scheduler does the right thing. Adding a redundant async_io=True parameter would create two conflicting signals and make the API harder to use. The async keyword is the explicit signal.

Why can't you combine compute=True with async def? Compute nodes run on a CPU thread pool optimized for parallel computation. Async nodes run on a tokio I/O thread pool optimized for non-blocking await. These are fundamentally different runtimes — a CPU-bound ML inference should not share the I/O pool (it would block other async nodes), and an I/O-bound HTTP request should not occupy a compute slot (it wastes a CPU thread while waiting). The mutual exclusion forces you to pick the right executor for your workload.

Trade-offs

GainCost
Right executor per workload — each node runs optimallyMust understand which class fits your node
Auto-detectionrate= infers RT without explicit configurationLess explicit — must know the rate= + compute=True interaction
RT isolation — a slow Compute node can't block an RT motor controllerRT nodes consume one OS thread each
Event nodes — zero CPU when idleMust match on="topic" name exactly to a publisher's topic
AsyncIo auto-detectasync def just worksCannot combine with compute=True or on=
Default rate=30 — nodes get timing enforcement by defaultMust explicitly opt out for truly best-effort nodes
Python unit constants (300 * horus.us)Less ergonomic than Rust's 300_u64.us() — multiplication instead of method

See Also