Nodes — Full Reference
In every robotics system, software components fight over shared resources. The camera driver writes to a buffer while the vision system reads from it. The motor controller modifies velocity state while the safety monitor checks it. Traditional multithreaded programs manage this with locks — but locks introduce deadlocks, priority inversions, and subtle race conditions that only manifest when the robot is moving at full speed through a warehouse.
HORUS takes a different approach. Each component is a node: an isolated unit with its own state, running in a scheduler-controlled tick loop. Nodes don't share memory directly. They communicate through typed channels (topics) backed by lock-free shared memory. The scheduler controls when each node runs, how long it's allowed to take, and what happens when it misbehaves. The result is a system where you can reason about each node independently — test it in isolation, monitor its timing, and replace it without touching anything else.
This page covers the complete Node trait, lifecycle, communication patterns, and safety mechanisms. For a gentler introduction, start with Nodes: The Building Blocks.
The Node Trait
Every HORUS node implements the Node trait. The only required method is tick() — everything else has sensible defaults:
// simplified
pub trait Node: Send {
// Required — your main logic, called repeatedly by the scheduler
fn tick(&mut self);
// Identity — defaults to the struct's type name
fn name(&self) -> &str;
// Lifecycle — called once at startup and shutdown
fn init(&mut self) -> Result<()> { Ok(()) }
fn shutdown(&mut self) -> Result<()> { Ok(()) }
// Safety — used by the safety monitor during deadline misses
fn is_safe_state(&self) -> bool { true }
fn enter_safe_state(&mut self) {}
// Error recovery — called when tick() panics or errors
fn on_error(&mut self, error: &str) { /* logs error */ }
// Metadata — auto-generated by node! macro, rarely implemented manually
fn publishers(&self) -> Vec<TopicMetadata> { Vec::new() }
fn subscribers(&self) -> Vec<TopicMetadata> { Vec::new() }
}
The trait requires Send because the scheduler may move nodes between threads (e.g., when assigning a node to a dedicated RT thread). It does not require Sync — nodes are never accessed from multiple threads simultaneously. The scheduler owns each node exclusively and calls its methods sequentially.
tick() — Your Main Logic
tick() is the only required method. The scheduler calls it repeatedly — once per cycle at the configured rate:
// simplified
impl Node for MotorController {
fn tick(&mut self) {
if let Some(cmd) = self.commands.recv() {
self.motor.set_velocity(cmd.linear);
}
}
}
Critical rules for tick():
- Keep it fast. The scheduler monitors how long
tick()takes. If it exceeds the node's budget, that's a deadline miss — which can trigger safety responses. For a 1 kHz node,tick()should complete in under 800 µs. - Never block on I/O. Reading a file, making an HTTP request, or waiting on a socket blocks the entire tick cycle. Use
.async_io()or.compute()execution classes for blocking work. - Never allocate in the hot path.
Vec::new(),String::from(), andBox::new()call the allocator, which can take microseconds and introduces jitter. Pre-allocate ininit(). - Always drain your topics. Call
recv()on every subscribed topic every tick, even if you don't need the data. Unread messages pile up and consume buffer slots.
init() — One-Time Setup
Called once at startup, before the first tick(). Use it for hardware connections, file handles, buffer allocation, and calibration:
// simplified
fn init(&mut self) -> Result<()> {
// Open hardware
self.serial = serialport::new("/dev/ttyUSB0", 115200)
.open()
.horus_context("opening motor serial port")?;
// IMPORTANT: pre-allocate buffers here — allocation in tick() causes jitter
self.buffer = vec![0u8; 256];
// SAFETY: start with actuators in known-safe state
self.velocity = 0.0;
hlog!(info, "MotorController initialized");
Ok(())
}
If init() returns Err, the node is marked as failed and its FailurePolicy is applied (default: node is skipped, error logged). The scheduler continues running other nodes.
init() is called lazily — on the first call to scheduler.run() or scheduler.tick_once(), not at .add().build() time. This means topic connections, hardware, and resources are set up only when the system actually starts, not during configuration.
shutdown() — Graceful Cleanup
Called once when the scheduler stops (Ctrl+C, SIGINT, SIGTERM, or .stop()). Nodes shut down in reverse order — the last-added node shuts down first:
// simplified
// SAFETY: always stop actuators before releasing hardware connections
fn shutdown(&mut self) -> Result<()> {
// 1. Stop actuators FIRST
self.velocity = 0.0;
self.send_stop_command();
// 2. Disable hardware outputs
self.disable_motor_driver();
// 3. Close connections
self.serial = None;
hlog!(info, "MotorController shut down safely");
Ok(())
}
Always implement shutdown() for nodes that control physical hardware. Without it, motors keep spinning, grippers stay clamped, and heaters stay on when your program exits. The scheduler guarantees shutdown() is called even on Ctrl+C — but only if you implement it.
Rules for shutdown():
- Stop actuators before closing connections — send zero velocity before dropping the serial port
- Never panic — if one cleanup step fails, log the error and continue with the rest
- Don't assume tick() ran — init() may have succeeded but the system may be shutting down before the first tick
// simplified
// SAFETY: never panic in shutdown — always attempt all cleanup steps
fn shutdown(&mut self) -> Result<()> {
if let Err(e) = self.stop_motors() {
hlog!(error, "Failed to stop motors: {}", e);
// Continue cleanup — don't return early
}
if let Err(e) = self.close_connection() {
hlog!(warn, "Failed to close connection: {}", e);
}
Ok(())
}
is_safe_state() and enter_safe_state() — Safety Monitor Integration
When a node with Miss::SafeMode exceeds its deadline, the scheduler calls enter_safe_state() to bring the node to a known-safe condition. It then polls is_safe_state() each tick to check for recovery:
// simplified
impl Node for MotorController {
fn enter_safe_state(&mut self) {
// SAFETY: reduce to zero velocity — don't cut power abruptly,
// as that can cause mechanical shock
self.target_velocity = 0.0;
self.motor.set_velocity(0.0);
hlog!(warn, "Motor entering safe state");
}
fn is_safe_state(&self) -> bool {
// Report whether the node has reached a safe condition
self.motor.velocity().abs() < 0.01
}
}
The default is_safe_state() returns true (the node claims to always be safe). The default enter_safe_state() does nothing. Override both for any node that controls actuators.
on_error() — Error Recovery
Called when tick() panics (caught by the scheduler) or encounters an error. The default implementation logs the error. Override for custom recovery:
// simplified
fn on_error(&mut self, error: &str) {
hlog!(error, "Motor controller error: {}", error);
self.consecutive_errors += 1;
if self.consecutive_errors > 5 {
self.enter_safe_state();
}
}
publishers() and subscribers() — Topic Metadata
Return metadata about which topics this node uses. Used by horus monitor, graph visualization, and introspection CLI commands. The node! macro generates these automatically. When implementing Node manually, override them for accurate monitoring:
// simplified
fn publishers(&self) -> Vec<TopicMetadata> {
vec![TopicMetadata {
topic_name: "motor.status".to_string(),
type_name: std::any::type_name::<MotorStatus>().to_string(),
}]
}
Node Lifecycle
Every node transitions through well-defined states:
| State | Description | Transitions to |
|---|---|---|
| Uninitialized | Created but init() not yet called | Initializing |
| Initializing | init() is running | Running, Error |
| Running | Actively ticking | Stopping, Error, Crashed |
| Stopping | shutdown() is running | Stopped |
| Stopped | Clean shutdown complete | (terminal) |
| Error | Recoverable error — on_error() called | Running (recovery), Crashed |
| Crashed | Unrecoverable — node is removed from tick loop | (terminal) |
The scheduler monitors health via NodeHealthState, tracked per-node with lock-free atomics:
| Health | Meaning | Scheduler response |
|---|---|---|
Healthy | Operating within budget | Normal ticking |
Warning | 1× timeout elapsed | Log warning |
Unhealthy | 2× timeout — tick skipped | Skip tick, log error |
Isolated | 3× timeout on critical node | Isolate from scheduler, call enter_safe_state() |
Communication Patterns
Nodes communicate through Topics — named, typed, shared-memory channels. Here are the standard patterns:
Publisher
A node that produces data for others to consume:
// simplified
struct SensorNode {
data_pub: Topic<f32>,
}
impl Node for SensorNode {
fn name(&self) -> &str { "Sensor" }
fn tick(&mut self) {
let reading = self.read_hardware();
self.data_pub.send(reading);
}
}
Subscriber
A node that consumes data from others:
// simplified
struct DisplayNode {
data_sub: Topic<f32>,
}
impl Node for DisplayNode {
fn name(&self) -> &str { "Display" }
fn tick(&mut self) {
// IMPORTANT: always call recv() — even if you don't need data this tick
if let Some(value) = self.data_sub.recv() {
println!("Value: {:.1}", value);
}
}
}
Pipeline (Subscribe → Transform → Publish)
A node that transforms data between topics:
// simplified
struct Filter {
input: Topic<f32>,
output: Topic<f32>,
alpha: f32,
smoothed: f32,
}
impl Node for Filter {
fn name(&self) -> &str { "Filter" }
fn tick(&mut self) {
if let Some(raw) = self.input.recv() {
// Exponential moving average
self.smoothed = self.alpha * raw + (1.0 - self.alpha) * self.smoothed;
self.output.send(self.smoothed);
}
}
}
Multi-Topic Synchronization
When you need data from multiple topics, cache the latest from each and process when all are available:
// simplified
struct Fusion {
imu_sub: Topic<Imu>,
odom_sub: Topic<Odometry>,
pose_pub: Topic<Pose3D>,
last_imu: Option<Imu>,
last_odom: Option<Odometry>,
}
impl Node for Fusion {
fn name(&self) -> &str { "Fusion" }
fn tick(&mut self) {
// IMPORTANT: drain ALL topics every tick — never skip a recv() conditionally
if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }
if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
let fused = self.fuse(imu, odom);
self.pose_pub.send(fused);
}
}
}
Always call recv() on every topic every tick. If you only call recv() inside a conditional branch (e.g., only when a state machine is in a certain state), messages pile up in the buffer. When the condition becomes true, you process stale data from potentially seconds ago.
The node! Macro
The node! macro eliminates boilerplate by generating the struct, Node implementation, constructor, and topic metadata:
// simplified
use horus::prelude::*;
node! {
SensorNode {
pub {
sensor_data: f32 -> "sensor.data",
}
tick {
let data = 42.0;
self.sensor_data.send(data);
}
}
}
This generates:
- A
SensorNodestruct with aTopic<f32>field namedsensor_data - A
SensorNode::new()constructor that creates the topic - A
Nodeimplementation withtick(),name(), andpublishers() - Topic metadata for monitoring and introspection
The macro also supports sub {} (subscribers), data {} (internal state), init {}, shutdown {}, and impl {} blocks. See The node! Macro Guide for the full syntax.
Logging
Use the hlog! macro for structured logging inside nodes:
// simplified
fn tick(&mut self) {
hlog!(info, "Processing frame {}", self.frame_count);
hlog!(warn, "Battery at {}%", self.battery_level);
hlog!(error, "Sensor disconnected: {}", self.sensor_name);
hlog!(debug, "Position: ({:.2}, {:.2})", self.x, self.y);
}
For topic-level monitoring without code changes, use CLI tools:
horus topic echo sensor.data # Print messages on a topic
horus topic hz sensor.data # Show publish rate
horus monitor --tui # Interactive dashboard
Python Nodes
Python nodes use a callback-based API that mirrors the Rust pattern:
import horus
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is not None:
set_motor_velocity(cmd)
def motor_shutdown(node):
set_motor_velocity(0.0)
print("Motor stopped safely")
motor = horus.Node(
name="Motor",
tick=motor_tick,
shutdown=motor_shutdown,
subs=["cmd_vel"],
order=10,
)
horus.run(motor)
Key differences from Rust:
tick,init, andshutdownare callback functions, not trait methods- Topics are declared via
pubs=["topic"]andsubs=["topic"]in the constructor node.send("topic", data)andnode.recv("topic")instead ofself.topic.send(data)/self.topic.recv()
See Python Bindings for the complete Python Node API.
Design Decisions
Why tick() instead of run()?
A run() method gives each node full control — it can loop forever, block on I/O, or ignore shutdown signals. This makes it impossible for the scheduler to enforce execution order, monitor timing, or coordinate shutdown. A tick() method inverts the control: the scheduler decides when to call each node, measures how long it takes, and can force shutdown at any time. This enables deterministic execution, deadline monitoring, and safety enforcement.
Why a trait instead of callbacks or closures?
A trait lets each node hold its own state as struct fields — typed, owned, and scoped to the node's lifetime. Closures would require capturing state via Arc<Mutex<...>>, reintroducing the shared-state problems nodes are designed to eliminate. A trait also enables the compiler to verify that all required methods are implemented and that the node is Send.
Why Send but not Sync?
Send is required because the scheduler may move a node to a dedicated thread (e.g., for RT execution). Sync is not required because the scheduler never allows two threads to access the same node simultaneously — it owns each node exclusively and calls methods sequentially. This means your nodes can use Cell, RefCell, and other non-Sync types without restriction.
Why no context parameter in tick()?
Earlier versions of the HORUS API passed a NodeContext or NodeInfo to tick(). This was removed because it coupled every node to the scheduler's internal types, made nodes harder to test in isolation, and added overhead to every tick call. Instead, nodes use hlog! for logging (a global macro) and own their topics directly. Testing a node is now: create an instance, call tick(), check the output topics.
Why lazy initialization?
init() is called when the scheduler starts, not when the node is added. This means you can configure all your nodes, set up the scheduler, and only open hardware connections when the system is actually ready to run. It also means init() can depend on the scheduler's configuration (e.g., clock source) being finalized.
Trade-offs
| Gain | Cost |
|---|---|
| Isolation — each node has its own state, no shared memory | Communication through topics adds nanoseconds of latency |
| Testability — tick a node once and assert on output topics | More boilerplate than a bare function call |
| Deterministic ordering — scheduler controls execution sequence | Nodes can't call each other directly |
| Safety monitoring — budget/deadline/miss enforcement | Must implement shutdown() and enter_safe_state() for hardware |
| Hot-swappable — replace a node without touching others | Nodes must agree on topic names and message types |
See Also
- Nodes: The Building Blocks — Beginner introduction
- Topics: How Nodes Talk — How nodes communicate
- Scheduler: Running Your Nodes — Execution and timing
- Scheduler API — Node builder methods (
.order(),.rate(),.budget(), etc.) - node! Macro — Code generation for nodes
- Python Bindings — Python Node API
- Execution Classes — How different workloads run