Debugging & Introspection (Python)

When a robot misbehaves, you need answers fast. Is the sensor publishing? Is the controller receiving? Is the tick too slow? HORUS gives you three layers of debugging tools: CLI commands that work from any terminal, a live dashboard that shows everything at once, and a programmatic introspection API you can use inside your nodes. This guide covers all three, with step-by-step workflows for the most common problems.

Your First Debugging Tool

Before writing any debug code, open a second terminal. HORUS's CLI tools let you inspect a running system from the outside --- no code changes, no restarts.

Listing Active Topics

horus topic list
TOPICS (3 active)
  cmd_vel        CmdVel       1024 capacity   2 subscribers
  imu            Imu          1024 capacity   1 subscriber
  scan           LaserScan    1024 capacity   1 subscriber

This shows every topic in the current shared memory namespace: its name, message type, buffer capacity, and subscriber count. If a topic you expect is missing, the node that creates it is either not running or not started with horus run.

Always use horus run to start your Python nodes. Running python src/main.py directly bypasses the SHM namespace setup, so topics will not appear in horus topic list and cannot connect to other processes.

Watching Messages in Real Time

horus topic echo cmd_vel
[0.001s] CmdVel { linear: 0.5, angular: 0.1 }
[0.011s] CmdVel { linear: 0.5, angular: 0.0 }
[0.021s] CmdVel { linear: 0.3, angular: -0.2 }

echo prints every message published to a topic, with a timestamp relative to when you started listening. Use this to verify:

  • That a publisher is actually sending data
  • That the data looks correct (no NaN values, no stale readings)
  • That messages arrive at the expected frequency

Press Ctrl+C to stop.


Verifying Topics Are Publishing

A silent topic is one of the most common problems. horus topic hz measures the actual publish rate:

horus topic hz imu
average rate: 99.8 Hz
  min: 9.92ms  max: 10.15ms  std dev: 0.08ms

This tells you three things:

  1. The topic is active --- if nothing is publishing, you will see no messages received after a few seconds.
  2. The rate matches your configuration --- if you configured rate=100 but see 50 Hz, your tick is taking too long (more on this below).
  3. Jitter is acceptable --- a standard deviation over 1 ms on a 100 Hz topic suggests the tick is sometimes overrunning its budget.

Quick rate check across all topics:

# In one terminal, list topics
horus topic list

# In another, check rates one by one
horus topic hz cmd_vel
horus topic hz scan

If a topic shows a rate significantly lower than the node's configured rate, the node's tick() is taking too long. Jump to Debugging Slow tick() below.


Using horus monitor

For a live overview of the entire system, use the TUI dashboard:

horus monitor

The monitor shows all nodes and topics in a single view, updated in real time:

  • Node list with per-node tick rate, average tick duration, deadline misses, and health state
  • Topic list with publish rate, subscriber count, and buffer usage
  • System-wide stats: total ticks, total errors, scheduler status

This is the fastest way to find the bottleneck in a multi-node system. Look for:

Symptom in monitorLikely cause
Node tick duration climbing over timeMemory leak or unbounded data structure
Deadline misses incrementingTick is too slow for configured rate
Topic rate lower than expectedPublisher node is overloaded or blocked
Subscriber count is 0 on a topicNo node is subscribed, or subscriber crashed

The monitor works with both Python and mixed Python/Rust systems. It reads shared memory directly, so it sees all nodes regardless of language.


Debugging Slow tick()

The most common performance problem in Python nodes is a tick() that takes longer than its budget. When this happens, the scheduler applies the node's on_miss policy and the actual tick rate drops below the configured rate.

Detecting the Problem

From the CLI:

horus topic hz output_topic

If you configured rate=100 (10 ms period) but hz reports 30 Hz, your tick is averaging ~33 ms.

From inside the node, check horus.budget_remaining():

import horus
import time

def tick(node):
    start = time.perf_counter()

    # ... your processing ...
    do_work()

    elapsed_ms = (time.perf_counter() - start) * 1000
    remaining = horus.budget_remaining()

    if remaining < 0.001:  # Less than 1ms remaining
        node.log_warning(f"Tick took {elapsed_ms:.1f}ms, budget nearly exhausted")

Profiling with cProfile

For a detailed breakdown of where time is spent, use Python's built-in profiler. Since HORUS manages the tick loop, profile the tick function itself:

import horus
import cProfile
import pstats
import io

profiler = cProfile.Profile()
profile_active = False

def tick(node):
    global profile_active

    # Profile ticks 100-200 (skip warmup)
    tick_num = horus.tick()
    if tick_num == 100:
        profiler.enable()
        profile_active = True
    elif tick_num == 200 and profile_active:
        profiler.disable()
        profile_active = False

        # Print the top 20 time consumers
        s = io.StringIO()
        ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative")
        ps.print_stats(20)
        node.log_info(f"Profile results:\n{s.getvalue()}")

    # Normal tick logic
    if node.has_msg("sensor"):
        data = node.recv("sensor")
        process(data)
        node.send("output", result)

node = horus.Node(name="processor", subs=["sensor"], pubs=["output"], tick=tick, rate=50)
horus.run(node)

Checking Budget Remaining

horus.budget_remaining() returns the time left in the current tick's budget, in seconds. Use it to skip expensive optional work when you are running out of time:

import horus

def tick(node):
    # Always do critical work
    if node.has_msg("sensor"):
        data = node.recv("sensor")
        cmd = compute_command(data)
        node.send("cmd_vel", cmd)

    # Only do visualization if budget allows
    if horus.budget_remaining() > 0.005:  # More than 5ms left
        update_visualization(data)
    else:
        node.log_debug("Skipping visualization, budget low")

Common Causes of Slow Ticks

CauseSymptomFix
NumPy/OpenCV doing work on every tickConsistent high tick timeDownsample input, reduce resolution
ML model inferenceConsistent 20-50 ms ticksLower rate, use compute=True so it runs on thread pool
Allocating large arrays every tickTick time grows over timePre-allocate in init(), reuse buffers
Calling recv_all() on a high-volume topicSpiky tick timesUse recv() (one at a time) or increase tick rate
File I/O or network calls in tickOccasional very long ticksMove to an async node or a separate node

Debugging Dropped Messages

A message is "dropped" when the publisher writes to a full ring buffer, overwriting the oldest unread message. The subscriber never sees it.

What Causes Drops

Drops happen when the publisher sends faster than the subscriber reads. The ring buffer has a fixed capacity (default: 1024 messages). When it fills up, new messages overwrite the oldest ones.

Common scenarios:

  • Slow subscriber: A 10 Hz subscriber on a 1000 Hz topic will miss messages between ticks. This is usually intentional --- the subscriber only needs the latest reading.
  • Bursty publisher: A node that publishes 100 messages in a single tick can overflow the buffer before the subscriber ticks.
  • Subscriber stall: If a subscriber's tick takes 500 ms, messages published during that time accumulate and may overflow.

Detecting Drops

There is no automatic drop counter (the ring buffer is lock-free by design). Instead, use these techniques:

Sequence numbers: Add a counter to your messages and check for gaps on the subscriber side:

import horus

# Publisher
send_count = 0

def publisher_tick(node):
    global send_count
    send_count += 1
    node.send("data", {"seq": send_count, "value": read_sensor()})

# Subscriber
last_seq = 0

def subscriber_tick(node):
    global last_seq
    if node.has_msg("data"):
        msg = node.recv("data")
        if msg["seq"] != last_seq + 1:
            dropped = msg["seq"] - last_seq - 1
            node.log_warning(f"Dropped {dropped} messages (seq {last_seq} -> {msg['seq']})")
        last_seq = msg["seq"]

Rate comparison: Compare horus topic hz on the publisher and subscriber output topics. If the publisher runs at 100 Hz but the subscriber output is at 50 Hz, messages are being lost.

Preventing Drops

Increase buffer capacity for topics that carry bursty or high-frequency data:

node = horus.Node(
    name="fast_sub",
    subs={"sensor": {"type": "sensor", "capacity": 4096}},
    tick=process,
    rate=50,
)

Use recv_all() to drain the buffer when you need to process every message:

def tick(node):
    messages = node.recv_all("commands")
    for msg in messages:
        execute(msg)

Match rates: If both publisher and subscriber need to run at the same rate, set them explicitly:

sensor = horus.Node(name="sensor", pubs=["data"], tick=sense, rate=100, order=0)
processor = horus.Node(name="proc", subs=["data"], pubs=["out"], tick=process, rate=100, order=1)

Setting order ensures the sensor publishes before the processor reads in the same tick cycle.


Node Health Monitoring

The node.info API

During tick(), init(), and shutdown(), the node object exposes an info property with scheduler-managed metrics:

import horus

def tick(node):
    # Basic metrics
    ticks = node.info.tick_count()
    errors = node.info.error_count()
    avg_ms = node.info.avg_tick_duration_ms()
    uptime = node.info.get_uptime()
    state = node.info.state

    # Full metrics snapshot (dict)
    metrics = node.info.get_metrics()

    # Periodic health logging
    if ticks % 500 == 0:
        node.log_info(
            f"Health: {ticks} ticks, {errors} errors, "
            f"avg {avg_ms:.2f}ms, uptime {uptime:.1f}s, state={state}"
        )
Method/PropertyReturnsDescription
node.info.tick_count()intTotal number of ticks executed
node.info.error_count()intTotal tick errors (exceptions)
node.info.successful_ticks()intTicks that completed without error
node.info.avg_tick_duration_ms()floatAverage tick execution time in milliseconds
node.info.get_uptime()floatSeconds since the node's init() was called
node.info.get_metrics()dictFull metrics snapshot with all available data
node.info.statestrCurrent node state (see NodeState below)
node.info.set_custom_data(key, value)---Attach custom key-value metadata
node.info.get_custom_data(key)valueRetrieve custom metadata

NodeState values: UNINITIALIZED, INITIALIZING, RUNNING, STOPPING, STOPPED, ERROR, CRASHED

Custom data lets you attach arbitrary metadata visible to horus monitor and get_node_stats():

def tick(node):
    # Track application-specific metrics
    detections = run_detector(frame)
    node.info.set_custom_data("detections_this_tick", len(detections))
    node.info.set_custom_data("model_version", "yolov8n")

Scheduler-Level Monitoring with get_node_stats()

When using a Scheduler directly (instead of horus.run()), you can query stats for any node:

import horus

scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)
scheduler.add(sensor)
scheduler.add(controller)
scheduler.run(duration=10.0)

# After the run completes
for name in scheduler.get_node_names():
    stats = scheduler.get_node_stats(name)
    print(f"{name}:")
    print(f"  Total ticks:  {stats['total_ticks']}")
    print(f"  Failed ticks: {stats['failed_ticks']}")
    print(f"  Avg tick:     {stats.get('avg_tick_duration_ms', 0):.2f} ms")
    print(f"  Max tick:     {stats.get('max_tick_duration_ms', 0):.2f} ms")
    print(f"  Errors:       {stats['errors_count']}")
    print(f"  Uptime:       {stats.get('uptime_seconds', 0):.1f}s")

Safety Stats

For systems with budgets, deadlines, or watchdogs, safety_stats() reports system-level safety events:

stats = scheduler.safety_stats()
if stats:
    print(f"Budget overruns:       {stats.get('budget_overruns', 0)}")
    print(f"Deadline misses:       {stats.get('deadline_misses', 0)}")
    print(f"Watchdog expirations:  {stats.get('watchdog_expirations', 0)}")

If any of these numbers are non-zero, the system is under stress. Budget overruns mean ticks are taking longer than expected. Deadline misses mean the on_miss policy is firing. Watchdog expirations mean a node was unresponsive for an extended period.

RT Degradations

If you requested RT features, check whether they were actually applied:

scheduler = horus.Scheduler(tick_rate=1000, rt=True)
# ... add nodes and run ...

if not scheduler.has_full_rt():
    for d in scheduler.degradations():
        print(f"Degraded: {d.get('feature')} -- {d.get('reason')}")

caps = scheduler.capabilities()
print(f"RT priority:  {caps.get('max_priority', 'N/A')}")
print(f"Memory lock:  {caps.get('memory_locking', False)}")

Logging from Nodes

HORUS provides structured logging methods on the node object. These integrate with the scheduler's log system and appear in horus monitor.

def tick(node):
    node.log_debug("Starting tick processing")

    if node.has_msg("sensor"):
        data = node.recv("sensor")
        node.log_info(f"Received sensor reading: {data}")

        if data.get("value", 0) > THRESHOLD:
            node.log_warning(f"Sensor value {data['value']} exceeds threshold {THRESHOLD}")
    else:
        node.log_debug("No sensor data this tick")

def init(node):
    node.log_info(f"Node initialized, publishing to: {node.publishers()}")
    node.log_info(f"Subscribed to: {node.subscribers()}")

def shutdown(node):
    node.log_info("Shutting down, cleaning up resources")
MethodUse for
node.log_debug(msg)Verbose tracing during development
node.log_info(msg)Normal operational messages
node.log_warning(msg)Anomalies that do not stop operation (stale data, high latency)
node.log_error(msg)Failures that affect node behavior

Logging only works during callbacks. Calling node.log_info() outside of init(), tick(), or shutdown() raises a RuntimeWarning and the message is silently dropped. The scheduler context must be active for log routing to work.

Logging performance tip: Avoid expensive string formatting in debug logs when they are not needed. Python evaluates f-string arguments before the function call:

# Bad: formats the string even if debug logging is suppressed
node.log_debug(f"Full state dump: {expensive_serialize(state)}")

# Better: guard expensive formatting
if DEBUG_ENABLED:
    node.log_debug(f"Full state dump: {expensive_serialize(state)}")

Common Debugging Workflows

"My subscriber is not getting messages"

Step 1: Verify the publisher is running.

horus topic list

If the topic is missing, the publisher node is not running or was not started with horus run.

Step 2: Verify messages are being published.

horus topic echo sensor_data

If no output appears, the publisher's tick() is not calling node.send(). Check that:

  • The publisher's pubs list includes the topic name
  • The send() call is actually reached (no early return, no exception swallowed)
  • The publisher node is not in an error state (horus monitor)

Step 3: Verify the subscriber is subscribed to the correct topic name.

Topic names must match exactly, including case. A common mistake:

# Publisher
pub = horus.Node(pubs=["sensor.data"], tick=pub_tick, rate=100)

# Subscriber -- WRONG: underscore instead of dot
sub = horus.Node(subs=["sensor_data"], tick=sub_tick, rate=100)

Step 4: Check execution order.

If both nodes run in the same scheduler, the publisher must have a lower order than the subscriber:

sensor = horus.Node(name="sensor", pubs=["data"], tick=sense, rate=100, order=0)
processor = horus.Node(name="proc", subs=["data"], tick=process, rate=100, order=1)

If order is wrong, the subscriber ticks before the publisher in every cycle and always sees an empty buffer.

Step 5: Check that recv() is called correctly.

def tick(node):
    # WRONG: recv() returns None if no message, not an exception
    data = node.recv("data")
    process(data)  # TypeError: process() got None

    # CORRECT: always check for None
    data = node.recv("data")
    if data is not None:
        process(data)

    # ALSO CORRECT: use has_msg() first
    if node.has_msg("data"):
        data = node.recv("data")
        process(data)

"My node is too slow"

Step 1: Measure the actual rate.

horus topic hz output_topic

Compare to the configured rate. If the measured rate is lower, the tick is overrunning.

Step 2: Check tick duration in the monitor.

horus monitor

Look at the average and max tick duration for the slow node.

Step 3: Profile the tick function.

Add timing inside the tick to isolate the expensive section:

import time

def tick(node):
    t0 = time.perf_counter()

    data = node.recv("input")
    t1 = time.perf_counter()

    result = heavy_computation(data)
    t2 = time.perf_counter()

    node.send("output", result)
    t3 = time.perf_counter()

    node.log_debug(
        f"recv={1000*(t1-t0):.1f}ms "
        f"compute={1000*(t2-t1):.1f}ms "
        f"send={1000*(t3-t2):.1f}ms"
    )

Step 4: Apply the right fix.

BottleneckFix
Heavy computation (NumPy, OpenCV)Use compute=True to run on thread pool, or lower the rate
ML inferenceLower rate to match inference time, or run on a dedicated node
Data serializationUse typed messages instead of dicts for zero-copy performance
File or network I/OMove to an async node (async def tick)
Repeated allocationPre-allocate arrays in init(), reuse across ticks

Step 5: Set a budget to catch future regressions.

node = horus.Node(
    name="controller",
    tick=control_tick,
    rate=100,
    budget=0.008,        # 8ms budget (80% of 10ms period)
    on_miss="warn",      # Log a warning when exceeded
)

"My robot stops unexpectedly"

Step 1: Check for exceptions in node ticks.

If a node's tick() raises an unhandled exception, the default failure_policy is "fatal", which stops the entire scheduler. Check the terminal output for tracebacks.

Step 2: Check safety stats after the run.

scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)
scheduler.add(sensor)
scheduler.add(controller)

try:
    scheduler.run()
except KeyboardInterrupt:
    pass

# Check what happened
stats = scheduler.safety_stats()
if stats:
    print(f"Budget overruns: {stats.get('budget_overruns', 0)}")
    print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
    print(f"Watchdog expirations: {stats.get('watchdog_expirations', 0)}")

for name in scheduler.get_node_names():
    ns = scheduler.get_node_stats(name)
    if ns['failed_ticks'] > 0:
        print(f"Node '{name}' had {ns['failed_ticks']} failed ticks")

Step 3: Check for on_miss="stop" triggering.

If any node has on_miss="stop", a single deadline miss will halt the system. Change to "warn" during development:

# Development: warn on deadline miss
motor = horus.Node(tick=motor_fn, rate=500, budget=0.0016, on_miss="warn")

# Production: stop on deadline miss (safety-critical)
motor = horus.Node(tick=motor_fn, rate=500, budget=0.0016, on_miss="stop")

Step 4: Check for request_stop() calls.

Search your code for node.request_stop(). A node may be intentionally stopping the scheduler based on a condition:

def tick(node):
    if battery_voltage < CRITICAL_THRESHOLD:
        node.log_error(f"Battery critical: {battery_voltage}V")
        node.request_stop()  # This stops the whole system

Step 5: Use a non-fatal failure policy during development.

node = horus.Node(
    name="experimental",
    tick=experimental_tick,
    rate=30,
    failure_policy="skip",  # Skip failed ticks instead of crashing
)
PolicyBehaviorWhen to use
"fatal"Stop the entire scheduler (default)Production safety-critical nodes
"restart"Re-run init() and resumeNodes that can recover from errors
"skip"Skip the failed tick, continueDevelopment, non-critical nodes
"ignore"Silently ignore the errorLogging, telemetry

Python-Specific Debugging

GIL and Threading

Python's Global Interpreter Lock (GIL) means only one thread executes Python bytecode at a time. This has specific implications for HORUS:

  • compute=True only helps if your code releases the GIL. NumPy, OpenCV, and PyTorch release the GIL during C/CUDA operations, so compute=True lets them run in parallel. Pure Python computation does not benefit.
  • Multiple Python nodes in the same scheduler share the GIL. If one node's tick runs pure Python for 10 ms, other Python nodes cannot tick during that time, even on separate threads.

Detecting GIL contention:

import horus
import time

def tick(node):
    wall_start = time.perf_counter()
    cpu_start = time.process_time()

    do_work()

    wall_elapsed = time.perf_counter() - wall_start
    cpu_elapsed = time.process_time() - cpu_start

    # If wall >> cpu, the thread was waiting (GIL or I/O)
    if wall_elapsed > cpu_elapsed * 1.5:
        node.log_warning(
            f"GIL contention: wall={wall_elapsed*1000:.1f}ms "
            f"cpu={cpu_elapsed*1000:.1f}ms"
        )

Mitigations:

  • Move GIL-heavy nodes to separate processes using HORUS multi-process mode
  • Use libraries that release the GIL (NumPy, OpenCV, SciPy, PyTorch)
  • Lower the rate of pure-Python nodes so they do not contend with critical nodes

Memory Leaks

Python's garbage collector handles most memory, but leaks still happen --- typically from:

  • Growing lists or dicts that are never cleared
  • Circular references with __del__ methods
  • C extension objects that are not properly released

Detecting leaks by tracking tick duration:

import horus
import tracemalloc

def init(node):
    tracemalloc.start()
    node.info.set_custom_data("baseline_mem", 0)

def tick(node):
    if horus.tick() % 1000 == 0:
        current, peak = tracemalloc.get_traced_memory()
        node.log_info(f"Memory: current={current/1024:.0f}KB, peak={peak/1024:.0f}KB")

        # If memory keeps growing, take a snapshot
        if current > 100 * 1024 * 1024:  # Over 100MB
            snapshot = tracemalloc.take_snapshot()
            top = snapshot.statistics("lineno")[:5]
            for stat in top:
                node.log_warning(f"  {stat}")

Common leak patterns in HORUS nodes:

# LEAK: appending to a list every tick without bound
history = []

def tick(node):
    data = node.recv("sensor")
    if data:
        history.append(data)  # Grows forever

# FIX: use a fixed-size buffer
from collections import deque
history = deque(maxlen=1000)

def tick(node):
    data = node.recv("sensor")
    if data:
        history.append(data)  # Oldest entries automatically removed

Exception Tracing

When a tick() raises an exception, HORUS catches it and applies the failure_policy. To get full tracebacks for debugging, add an on_error handler:

import horus
import traceback

def on_error(node, error):
    tb = traceback.format_exception(type(error), error, error.__traceback__)
    node.log_error(f"Exception in tick:\n{''.join(tb)}")

node = horus.Node(
    name="processor",
    tick=process_tick,
    on_error=on_error,
    rate=50,
    failure_policy="skip",  # Continue after errors
)

For intermittent errors that are hard to reproduce, log the node state alongside the traceback:

def on_error(node, error):
    context = {
        "tick": horus.tick(),
        "elapsed": horus.elapsed(),
        "has_sensor": node.has_msg("sensor"),
        "tick_count": node.info.tick_count() if node.info else "N/A",
        "error_count": node.info.error_count() if node.info else "N/A",
    }
    node.log_error(f"Error at {context}: {error}")

Debugging with pdb

For interactive debugging, you can use pdb inside a tick, but be aware that the scheduler pauses while you are in the debugger --- other nodes will not tick, and watchdogs may fire.

import horus

def tick(node):
    data = node.recv("sensor")
    if data and data.get("value", 0) < 0:
        # Drop into debugger on unexpected negative value
        import pdb; pdb.set_trace()
    process(data)

Use pdb only during development with a single node. In multi-node systems, pausing one node's tick in the debugger stalls the scheduler. Set watchdog_ms to a high value or disable it when using interactive debugging.


Quick Reference

CLI Commands

CommandWhat it shows
horus topic listAll active topics with type, capacity, and subscriber count
horus topic echo <name>Live stream of messages on a topic
horus topic hz <name>Measured publish rate with min/max/stddev timing
horus monitorFull-system TUI dashboard with nodes, topics, and metrics

Python Introspection API

CallReturnsDescription
node.info.tick_count()intTotal ticks executed
node.info.error_count()intTotal tick errors
node.info.avg_tick_duration_ms()floatAverage tick time in ms
node.info.get_uptime()floatSeconds since init
node.info.get_metrics()dictFull metrics snapshot
node.info.statestrCurrent node state
node.info.set_custom_data(k, v)---Attach custom metadata
node.info.get_custom_data(k)valueRetrieve custom metadata
horus.budget_remaining()floatSeconds left in tick budget
sched.get_node_stats(name)dictPer-node stats from scheduler
sched.get_all_nodes()listAll nodes with config
sched.safety_stats()dictBudget overruns, deadline misses, watchdog expirations
sched.degradations()listRT features that could not be applied
sched.capabilities()dictDetected RT capabilities
sched.status()strScheduler state: idle, running, stopped

See Also