Diagnostics Messages
Diagnostics messages keep robots safe and observable in production. They report node health, trigger emergency stops, monitor resources, and track safety state. Every production robot needs these — even simple hobby robots benefit from battery monitoring and heartbeats.
from horus import (
DiagnosticStatus, EmergencyStop, ResourceUsage, SafetyStatus,
DiagnosticReport, DiagnosticValue, Heartbeat, NodeHeartbeat,
)
DiagnosticStatus
Node health reporting with severity-level factory methods. Instead of remembering that level 2 means ERROR, use the error() factory.
Constructor
ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")
.ok(message) — Everything Is Fine
ds = DiagnosticStatus.ok("All systems nominal")
Level 0. Publish this periodically to confirm your node is alive and healthy. Monitoring dashboards show OK nodes in green. If a node stops publishing OK statuses, the watchdog knows something is wrong.
.warn(code, message) — Degraded But Functional
ds = DiagnosticStatus.warn(code=101, message="Temperature rising: 65°C")
Level 1. The node is still working but something needs attention. Examples: battery getting low, sensor noise increasing, CPU usage above 70%, communication latency above threshold.
When to use warn vs error: If the robot can still complete its mission, it's a warning. If the mission is compromised, it's an error.
.error(code, message) — Something Is Wrong
ds = DiagnosticStatus.error(code=201, message="Motor stalled on joint 3")
Level 2. The node cannot function correctly. Examples: motor stalled, sensor disconnected, localization lost, path blocked. An operator should investigate.
Common mistake: Using
error()for recoverable conditions. If the motor stalls briefly then recovers, that's awarn().error()should mean "this needs human intervention."
.fatal(code, message) — System Cannot Continue
ds = DiagnosticStatus.fatal(code=301, message="Hardware fault: CAN bus disconnected")
Level 3. Unrecoverable failure. The node should enter safe state and stop. Examples: hardware fault, firmware crash, safety violation. This often triggers an EmergencyStop.
.with_component(name) — Set Component Name
ds = DiagnosticStatus.error(code=201, message="Overheating") \
.with_component("left_drive_motor")
Returns a new DiagnosticStatus with the component name set. Always set this — monitoring dashboards group statuses by component, and without it, operators can't tell which motor is overheating.
.message_str() / .component_str() — Read Back as Strings
print(ds.message_str()) # "Overheating"
print(ds.component_str()) # "left_drive_motor"
The message and component are stored as fixed-size byte arrays internally. These methods convert them to Python strings.
Example — Node Health Reporter:
from horus import Node, run, DiagnosticStatus, Topic
diag_topic = Topic(DiagnosticStatus)
cpu_percent = 0.0 # Updated elsewhere
def report_health(node):
if cpu_percent > 90:
status = DiagnosticStatus.error(code=100, message=f"CPU at {cpu_percent:.0f}%")
elif cpu_percent > 70:
status = DiagnosticStatus.warn(code=100, message=f"CPU at {cpu_percent:.0f}%")
else:
status = DiagnosticStatus.ok(f"CPU at {cpu_percent:.0f}%")
diag_topic.send(status.with_component("controller"), node)
run(Node(tick=report_health, rate=1, pubs=["diagnostics"]))
EmergencyStop
The panic button. engage() triggers an immediate stop; release() clears it after an operator confirms safe conditions.
.engage(reason) — Trigger E-Stop
estop = EmergencyStop.engage("Obstacle detected at 0.1m")
Creates an engaged emergency stop with a reason string. Publish this on the e-stop topic and all nodes should immediately enter safe state — stop motors, lock brakes, disable actuators.
.release() — Clear E-Stop
release = EmergencyStop.release()
Creates a release command. Publish this to clear the e-stop and allow normal operation to resume.
Common mistake: Auto-releasing the e-stop programmatically. E-stop release should always require human confirmation — a physical button, operator console acknowledgment, or at minimum a deliberate command. Auto-release defeats the purpose of safety systems.
.with_source(source) — Identify Who Triggered It
estop = EmergencyStop.engage("Collision detected") \
.with_source("lidar_safety_node")
Returns a new EmergencyStop with a source identifier. When multiple nodes can trigger e-stops, the source tells operators which node detected the problem.
.reason_str() — Read the Reason
print(estop.reason_str()) # "Collision detected"
Example — Safety Controller:
from horus import Node, run, EmergencyStop, LaserScan, CmdVel, Topic
scan_topic = Topic(LaserScan)
estop_topic = Topic(EmergencyStop)
cmd_topic = Topic(CmdVel)
def safety_check(node):
scan = scan_topic.recv(node)
if scan is None:
return
closest = scan.min_range()
if closest is not None and closest < 0.15:
estop = EmergencyStop.engage(f"Object at {closest:.2f}m") \
.with_source("safety_monitor")
estop_topic.send(estop, node)
cmd_topic.send(CmdVel.zero(), node)
run(Node(tick=safety_check, rate=50, pubs=["estop", "cmd_vel"], subs=["scan"]))
ResourceUsage
System resource monitoring with threshold checks.
Constructor
ru = ResourceUsage(cpu_percent=85.0, memory_bytes=4_000_000_000)
.is_cpu_high(threshold) — CPU Overload Check
if ru.is_cpu_high(80.0):
print("CPU overloaded!")
Returns True if cpu_percent exceeds the given threshold. Typical thresholds:
- 70%: Warning — consider reducing processing load
- 85%: Error — system may miss deadlines
- 95%: Critical — risk of dropped messages and missed ticks
.is_memory_high(threshold) — Memory Pressure
if ru.is_memory_high(90.0):
print("Memory pressure! Consider releasing caches")
.is_temperature_high(threshold) — Thermal Check
if ru.is_temperature_high(75.0):
print("Overheating! Reduce motor duty cycle")
Hardware-specific. Raspberry Pi throttles at 80°C. Jetson limits at 97°C. Industrial PCs vary.
SafetyStatus
Safety system state machine with fault tracking.
Constructor
ss = SafetyStatus()
.is_safe() — All Clear?
if not ss.is_safe():
print("Safety fault — entering safe state")
Returns True when no faults are active, e-stop is not engaged, and watchdog is healthy. Check this every tick — if it returns False, your node should stop actuators.
.set_fault(code) — Register a Fault
ss.set_fault(101) # Motor overcurrent fault
Registers a fault code. is_safe() will return False until all faults are cleared. Use fault codes consistently across your system — document what each code means.
.clear_faults() — Reset After Recovery
ss.clear_faults()
print(ss.is_safe()) # True (assuming no other issues)
Clears all registered faults. Call this only after the root cause has been fixed — not as a way to ignore problems.
DiagnosticReport
Structured diagnostic data with typed key-value pairs. More organized than free-text messages — monitoring tools can parse and chart the values.
Constructor
report = DiagnosticReport(component="sensor_hub")
.add_string(key, value) — Text Data
report.add_string("firmware_version", "2.1.3")
report.add_string("status", "calibrating")
.add_int(key, value) — Integer Data
report.add_int("retry_count", 3)
report.add_int("messages_dropped", 0)
.add_float(key, value) — Float Data
report.add_float("temperature_c", 42.5)
report.add_float("voltage", 24.1)
.add_bool(key, value) — Boolean Data
report.add_bool("calibrated", True)
report.add_bool("firmware_update_available", False)
All add_* methods raise ValueError if the report is full (max 16 values).
Example — Periodic Diagnostic Report:
from horus import DiagnosticReport, Topic
diag_topic = Topic(DiagnosticReport)
def publish_diagnostics(node, temp, voltage, calibrated):
report = DiagnosticReport(component="imu_driver")
report.add_float("temperature_c", temp)
report.add_float("supply_voltage", voltage)
report.add_bool("calibrated", calibrated)
report.add_int("tick_count", node.tick)
diag_topic.send(report, node)
Heartbeat
Simple "I'm alive" signal from nodes.
.update(uptime) — Tick the Heartbeat
hb = Heartbeat(node_name="controller", node_id=1)
hb.update(uptime=120.5) # Increments sequence, sets uptime
Call once per tick and publish. The monitoring system watches for heartbeats — if a node stops publishing, it's considered dead.
NodeHeartbeat
Filesystem-based heartbeat for cross-process discovery. Written to shared memory, not published on topics.
.update_timestamp() — Refresh Timestamp
nhb = NodeHeartbeat(state=1, health=0)
nhb.update_timestamp() # Sets to current time
.is_fresh(max_age_secs) — Check Staleness
if not nhb.is_fresh(max_age_secs=5):
print("Node heartbeat is stale — node may have crashed")
Returns True if the timestamp is within max_age_secs of the current time. Use this in monitoring tools to detect crashed nodes.
Design Decisions
Why factory methods (ok(), warn(), error(), fatal()) instead of severity integers? Level numbers (0, 1, 2, 3) are meaningless without documentation. Factory methods are self-documenting: DiagnosticStatus.error(201, "Motor stalled") is immediately clear. The factories also set default fields correctly, reducing the chance of publishing a severity-2 status with level=0.
Why does EmergencyStop have a .with_source() method instead of a required field? Not all e-stop triggers are software nodes. A physical e-stop button, a hardware watchdog, or an operator console might trigger an e-stop. The source is optional metadata that helps debugging, not a required field that would complicate hardware integration.
Why DiagnosticReport with typed key-value pairs instead of free-text? Free-text diagnostics are human-readable but machine-unparseable. Typed values (add_float("temperature_c", 42.5)) can be charted, alerted on, and aggregated by monitoring tools. The 16-value limit keeps the message Pod-compatible (fixed-size, no heap allocation).
Why both Heartbeat (topic-based) and NodeHeartbeat (filesystem-based)? Topic-based heartbeats detect node crashes within the same horus instance. Filesystem-based heartbeats enable cross-process discovery (monitoring tools that are not part of the horus graph can still check if nodes are alive by reading shared memory). Different failure modes require different detection mechanisms.
Why is SafetyStatus.clear_faults() a manual operation? Auto-clearing faults is dangerous. If a motor overcurrent fault clears automatically, the motor re-engages immediately, potentially causing the same overcurrent condition. Manual clearing forces an operator (or a deliberate recovery procedure) to confirm the root cause is resolved before the system resumes.
See Also
- Navigation Messages — NavGoal, path following (often paired with diagnostics)
- Sensor Messages — BatteryState for power monitoring
- Force Messages —
WrenchStamped.exceeds_limits()for force safety - Control Messages — MotorCommand.stop() for actuator safety
- Python Message Library — All 55+ message types overview