Diagnostics Messages

Diagnostics messages keep robots safe and observable in production. They report node health, trigger emergency stops, monitor resources, and track safety state. Every production robot needs these — even simple hobby robots benefit from battery monitoring and heartbeats.

from horus import (
    DiagnosticStatus, EmergencyStop, ResourceUsage, SafetyStatus,
    DiagnosticReport, DiagnosticValue, Heartbeat, NodeHeartbeat,
)

DiagnosticStatus

Node health reporting with severity-level factory methods. Instead of remembering that level 2 means ERROR, use the error() factory.

Constructor

ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")

.ok(message) — Everything Is Fine

ds = DiagnosticStatus.ok("All systems nominal")

Level 0. Publish this periodically to confirm your node is alive and healthy. Monitoring dashboards show OK nodes in green. If a node stops publishing OK statuses, the watchdog knows something is wrong.

.warn(code, message) — Degraded But Functional

ds = DiagnosticStatus.warn(code=101, message="Temperature rising: 65°C")

Level 1. The node is still working but something needs attention. Examples: battery getting low, sensor noise increasing, CPU usage above 70%, communication latency above threshold.

When to use warn vs error: If the robot can still complete its mission, it's a warning. If the mission is compromised, it's an error.

.error(code, message) — Something Is Wrong

ds = DiagnosticStatus.error(code=201, message="Motor stalled on joint 3")

Level 2. The node cannot function correctly. Examples: motor stalled, sensor disconnected, localization lost, path blocked. An operator should investigate.

Common mistake: Using error() for recoverable conditions. If the motor stalls briefly then recovers, that's a warn(). error() should mean "this needs human intervention."

.fatal(code, message) — System Cannot Continue

ds = DiagnosticStatus.fatal(code=301, message="Hardware fault: CAN bus disconnected")

Level 3. Unrecoverable failure. The node should enter safe state and stop. Examples: hardware fault, firmware crash, safety violation. This often triggers an EmergencyStop.

.with_component(name) — Set Component Name

ds = DiagnosticStatus.error(code=201, message="Overheating") \
    .with_component("left_drive_motor")

Returns a new DiagnosticStatus with the component name set. Always set this — monitoring dashboards group statuses by component, and without it, operators can't tell which motor is overheating.

.message_str() / .component_str() — Read Back as Strings

print(ds.message_str())     # "Overheating"
print(ds.component_str())   # "left_drive_motor"

The message and component are stored as fixed-size byte arrays internally. These methods convert them to Python strings.

Example — Node Health Reporter:

from horus import Node, run, DiagnosticStatus, Topic

diag_topic = Topic(DiagnosticStatus)
cpu_percent = 0.0  # Updated elsewhere

def report_health(node):
    if cpu_percent > 90:
        status = DiagnosticStatus.error(code=100, message=f"CPU at {cpu_percent:.0f}%")
    elif cpu_percent > 70:
        status = DiagnosticStatus.warn(code=100, message=f"CPU at {cpu_percent:.0f}%")
    else:
        status = DiagnosticStatus.ok(f"CPU at {cpu_percent:.0f}%")
    diag_topic.send(status.with_component("controller"), node)

run(Node(tick=report_health, rate=1, pubs=["diagnostics"]))

EmergencyStop

The panic button. engage() triggers an immediate stop; release() clears it after an operator confirms safe conditions.

.engage(reason) — Trigger E-Stop

estop = EmergencyStop.engage("Obstacle detected at 0.1m")

Creates an engaged emergency stop with a reason string. Publish this on the e-stop topic and all nodes should immediately enter safe state — stop motors, lock brakes, disable actuators.

.release() — Clear E-Stop

release = EmergencyStop.release()

Creates a release command. Publish this to clear the e-stop and allow normal operation to resume.

Common mistake: Auto-releasing the e-stop programmatically. E-stop release should always require human confirmation — a physical button, operator console acknowledgment, or at minimum a deliberate command. Auto-release defeats the purpose of safety systems.

.with_source(source) — Identify Who Triggered It

estop = EmergencyStop.engage("Collision detected") \
    .with_source("lidar_safety_node")

Returns a new EmergencyStop with a source identifier. When multiple nodes can trigger e-stops, the source tells operators which node detected the problem.

.reason_str() — Read the Reason

print(estop.reason_str())  # "Collision detected"

Example — Safety Controller:

from horus import Node, run, EmergencyStop, LaserScan, CmdVel, Topic

scan_topic = Topic(LaserScan)
estop_topic = Topic(EmergencyStop)
cmd_topic = Topic(CmdVel)

def safety_check(node):
    scan = scan_topic.recv(node)
    if scan is None:
        return
    closest = scan.min_range()
    if closest is not None and closest < 0.15:
        estop = EmergencyStop.engage(f"Object at {closest:.2f}m") \
            .with_source("safety_monitor")
        estop_topic.send(estop, node)
        cmd_topic.send(CmdVel.zero(), node)

run(Node(tick=safety_check, rate=50, pubs=["estop", "cmd_vel"], subs=["scan"]))

ResourceUsage

System resource monitoring with threshold checks.

Constructor

ru = ResourceUsage(cpu_percent=85.0, memory_bytes=4_000_000_000)

.is_cpu_high(threshold) — CPU Overload Check

if ru.is_cpu_high(80.0):
    print("CPU overloaded!")

Returns True if cpu_percent exceeds the given threshold. Typical thresholds:

  • 70%: Warning — consider reducing processing load
  • 85%: Error — system may miss deadlines
  • 95%: Critical — risk of dropped messages and missed ticks

.is_memory_high(threshold) — Memory Pressure

if ru.is_memory_high(90.0):
    print("Memory pressure! Consider releasing caches")

.is_temperature_high(threshold) — Thermal Check

if ru.is_temperature_high(75.0):
    print("Overheating! Reduce motor duty cycle")

Hardware-specific. Raspberry Pi throttles at 80°C. Jetson limits at 97°C. Industrial PCs vary.


SafetyStatus

Safety system state machine with fault tracking.

Constructor

ss = SafetyStatus()

.is_safe() — All Clear?

if not ss.is_safe():
    print("Safety fault — entering safe state")

Returns True when no faults are active, e-stop is not engaged, and watchdog is healthy. Check this every tick — if it returns False, your node should stop actuators.

.set_fault(code) — Register a Fault

ss.set_fault(101)  # Motor overcurrent fault

Registers a fault code. is_safe() will return False until all faults are cleared. Use fault codes consistently across your system — document what each code means.

.clear_faults() — Reset After Recovery

ss.clear_faults()
print(ss.is_safe())  # True (assuming no other issues)

Clears all registered faults. Call this only after the root cause has been fixed — not as a way to ignore problems.


DiagnosticReport

Structured diagnostic data with typed key-value pairs. More organized than free-text messages — monitoring tools can parse and chart the values.

Constructor

report = DiagnosticReport(component="sensor_hub")

.add_string(key, value) — Text Data

report.add_string("firmware_version", "2.1.3")
report.add_string("status", "calibrating")

.add_int(key, value) — Integer Data

report.add_int("retry_count", 3)
report.add_int("messages_dropped", 0)

.add_float(key, value) — Float Data

report.add_float("temperature_c", 42.5)
report.add_float("voltage", 24.1)

.add_bool(key, value) — Boolean Data

report.add_bool("calibrated", True)
report.add_bool("firmware_update_available", False)

All add_* methods raise ValueError if the report is full (max 16 values).

Example — Periodic Diagnostic Report:

from horus import DiagnosticReport, Topic

diag_topic = Topic(DiagnosticReport)

def publish_diagnostics(node, temp, voltage, calibrated):
    report = DiagnosticReport(component="imu_driver")
    report.add_float("temperature_c", temp)
    report.add_float("supply_voltage", voltage)
    report.add_bool("calibrated", calibrated)
    report.add_int("tick_count", node.tick)
    diag_topic.send(report, node)

Heartbeat

Simple "I'm alive" signal from nodes.

.update(uptime) — Tick the Heartbeat

hb = Heartbeat(node_name="controller", node_id=1)
hb.update(uptime=120.5)  # Increments sequence, sets uptime

Call once per tick and publish. The monitoring system watches for heartbeats — if a node stops publishing, it's considered dead.


NodeHeartbeat

Filesystem-based heartbeat for cross-process discovery. Written to shared memory, not published on topics.

.update_timestamp() — Refresh Timestamp

nhb = NodeHeartbeat(state=1, health=0)
nhb.update_timestamp()  # Sets to current time

.is_fresh(max_age_secs) — Check Staleness

if not nhb.is_fresh(max_age_secs=5):
    print("Node heartbeat is stale — node may have crashed")

Returns True if the timestamp is within max_age_secs of the current time. Use this in monitoring tools to detect crashed nodes.


Design Decisions

Why factory methods (ok(), warn(), error(), fatal()) instead of severity integers? Level numbers (0, 1, 2, 3) are meaningless without documentation. Factory methods are self-documenting: DiagnosticStatus.error(201, "Motor stalled") is immediately clear. The factories also set default fields correctly, reducing the chance of publishing a severity-2 status with level=0.

Why does EmergencyStop have a .with_source() method instead of a required field? Not all e-stop triggers are software nodes. A physical e-stop button, a hardware watchdog, or an operator console might trigger an e-stop. The source is optional metadata that helps debugging, not a required field that would complicate hardware integration.

Why DiagnosticReport with typed key-value pairs instead of free-text? Free-text diagnostics are human-readable but machine-unparseable. Typed values (add_float("temperature_c", 42.5)) can be charted, alerted on, and aggregated by monitoring tools. The 16-value limit keeps the message Pod-compatible (fixed-size, no heap allocation).

Why both Heartbeat (topic-based) and NodeHeartbeat (filesystem-based)? Topic-based heartbeats detect node crashes within the same horus instance. Filesystem-based heartbeats enable cross-process discovery (monitoring tools that are not part of the horus graph can still check if nodes are alive by reading shared memory). Different failure modes require different detection mechanisms.

Why is SafetyStatus.clear_faults() a manual operation? Auto-clearing faults is dangerous. If a motor overcurrent fault clears automatically, the motor re-engages immediately, potentially causing the same overcurrent condition. Manual clearing forces an operator (or a deliberate recovery procedure) to confirm the root cause is resolved before the system resumes.


See Also