Nodes — Full Reference

In every robotics system, software components fight over shared resources. The camera driver writes to a buffer while the vision system reads from it. The motor controller modifies velocity state while the safety monitor checks it. Traditional multithreaded programs manage this with locks — but locks introduce deadlocks, priority inversions, and subtle race conditions that only manifest when the robot is moving at full speed through a warehouse.

HORUS takes a different approach. Each component is a node: an isolated unit with its own state, running in a scheduler-controlled tick loop. Nodes don't share memory directly. They communicate through typed channels (topics) backed by lock-free shared memory. The scheduler controls when each node runs, how long it's allowed to take, and what happens when it misbehaves. The result is a system where you can reason about each node independently — test it in isolation, monitor its timing, and replace it without touching anything else.

This page covers the complete Node trait, lifecycle, communication patterns, and safety mechanisms. For a gentler introduction, start with Nodes: The Building Blocks.

The Node Trait

Every HORUS node implements the Node trait. The only required method is tick() — everything else has sensible defaults:

// simplified
pub trait Node: Send {
    // Required — your main logic, called repeatedly by the scheduler
    fn tick(&mut self);

    // Identity — defaults to the struct's type name
    fn name(&self) -> &str;

    // Lifecycle — called once at startup and shutdown
    fn init(&mut self) -> Result<()> { Ok(()) }
    fn shutdown(&mut self) -> Result<()> { Ok(()) }

    // Safety — used by the safety monitor during deadline misses
    fn is_safe_state(&self) -> bool { true }
    fn enter_safe_state(&mut self) {}

    // Error recovery — called when tick() panics or errors
    fn on_error(&mut self, error: &str) { /* logs error */ }

    // Metadata — auto-generated by node! macro, rarely implemented manually
    fn publishers(&self) -> Vec<TopicMetadata> { Vec::new() }
    fn subscribers(&self) -> Vec<TopicMetadata> { Vec::new() }
}

The trait requires Send because the scheduler may move nodes between threads (e.g., when assigning a node to a dedicated RT thread). It does not require Sync — nodes are never accessed from multiple threads simultaneously. The scheduler owns each node exclusively and calls its methods sequentially.

tick() — Your Main Logic

tick() is the only required method. The scheduler calls it repeatedly — once per cycle at the configured rate:

// simplified
impl Node for MotorController {
    fn tick(&mut self) {
        if let Some(cmd) = self.commands.recv() {
            self.motor.set_velocity(cmd.linear);
        }
    }
}

Critical rules for tick():

  • Keep it fast. The scheduler monitors how long tick() takes. If it exceeds the node's budget, that's a deadline miss — which can trigger safety responses. For a 1 kHz node, tick() should complete in under 800 µs.
  • Never block on I/O. Reading a file, making an HTTP request, or waiting on a socket blocks the entire tick cycle. Use .async_io() or .compute() execution classes for blocking work.
  • Never allocate in the hot path. Vec::new(), String::from(), and Box::new() call the allocator, which can take microseconds and introduces jitter. Pre-allocate in init().
  • Always drain your topics. Call recv() on every subscribed topic every tick, even if you don't need the data. Unread messages pile up and consume buffer slots.

init() — One-Time Setup

Called once at startup, before the first tick(). Use it for hardware connections, file handles, buffer allocation, and calibration:

// simplified
fn init(&mut self) -> Result<()> {
    // Open hardware
    self.serial = serialport::new("/dev/ttyUSB0", 115200)
        .open()
        .horus_context("opening motor serial port")?;

    // IMPORTANT: pre-allocate buffers here — allocation in tick() causes jitter
    self.buffer = vec![0u8; 256];

    // SAFETY: start with actuators in known-safe state
    self.velocity = 0.0;

    hlog!(info, "MotorController initialized");
    Ok(())
}

If init() returns Err, the node is marked as failed and its FailurePolicy is applied (default: node is skipped, error logged). The scheduler continues running other nodes.

init() is called lazily — on the first call to scheduler.run() or scheduler.tick_once(), not at .add().build() time. This means topic connections, hardware, and resources are set up only when the system actually starts, not during configuration.

shutdown() — Graceful Cleanup

Called once when the scheduler stops (Ctrl+C, SIGINT, SIGTERM, or .stop()). Nodes shut down in reverse order — the last-added node shuts down first:

// simplified
// SAFETY: always stop actuators before releasing hardware connections
fn shutdown(&mut self) -> Result<()> {
    // 1. Stop actuators FIRST
    self.velocity = 0.0;
    self.send_stop_command();

    // 2. Disable hardware outputs
    self.disable_motor_driver();

    // 3. Close connections
    self.serial = None;

    hlog!(info, "MotorController shut down safely");
    Ok(())
}

Always implement shutdown() for nodes that control physical hardware. Without it, motors keep spinning, grippers stay clamped, and heaters stay on when your program exits. The scheduler guarantees shutdown() is called even on Ctrl+C — but only if you implement it.

Rules for shutdown():

  • Stop actuators before closing connections — send zero velocity before dropping the serial port
  • Never panic — if one cleanup step fails, log the error and continue with the rest
  • Don't assume tick() ran — init() may have succeeded but the system may be shutting down before the first tick
// simplified
// SAFETY: never panic in shutdown — always attempt all cleanup steps
fn shutdown(&mut self) -> Result<()> {
    if let Err(e) = self.stop_motors() {
        hlog!(error, "Failed to stop motors: {}", e);
        // Continue cleanup — don't return early
    }
    if let Err(e) = self.close_connection() {
        hlog!(warn, "Failed to close connection: {}", e);
    }
    Ok(())
}

is_safe_state() and enter_safe_state() — Safety Monitor Integration

When a node with Miss::SafeMode exceeds its deadline, the scheduler calls enter_safe_state() to bring the node to a known-safe condition. It then polls is_safe_state() each tick to check for recovery:

// simplified
impl Node for MotorController {
    fn enter_safe_state(&mut self) {
        // SAFETY: reduce to zero velocity — don't cut power abruptly,
        // as that can cause mechanical shock
        self.target_velocity = 0.0;
        self.motor.set_velocity(0.0);
        hlog!(warn, "Motor entering safe state");
    }

    fn is_safe_state(&self) -> bool {
        // Report whether the node has reached a safe condition
        self.motor.velocity().abs() < 0.01
    }
}

The default is_safe_state() returns true (the node claims to always be safe). The default enter_safe_state() does nothing. Override both for any node that controls actuators.

on_error() — Error Recovery

Called when tick() panics (caught by the scheduler) or encounters an error. The default implementation logs the error. Override for custom recovery:

// simplified
fn on_error(&mut self, error: &str) {
    hlog!(error, "Motor controller error: {}", error);
    self.consecutive_errors += 1;
    if self.consecutive_errors > 5 {
        self.enter_safe_state();
    }
}

publishers() and subscribers() — Topic Metadata

Return metadata about which topics this node uses. Used by horus monitor, graph visualization, and introspection CLI commands. The node! macro generates these automatically. When implementing Node manually, override them for accurate monitoring:

// simplified
fn publishers(&self) -> Vec<TopicMetadata> {
    vec![TopicMetadata {
        topic_name: "motor.status".to_string(),
        type_name: std::any::type_name::<MotorStatus>().to_string(),
    }]
}

Node Lifecycle

Every node transitions through well-defined states:

Node state transitions — Error is recoverable, Crashed is not
StateDescriptionTransitions to
UninitializedCreated but init() not yet calledInitializing
Initializinginit() is runningRunning, Error
RunningActively tickingStopping, Error, Crashed
Stoppingshutdown() is runningStopped
StoppedClean shutdown complete(terminal)
ErrorRecoverable error — on_error() calledRunning (recovery), Crashed
CrashedUnrecoverable — node is removed from tick loop(terminal)

The scheduler monitors health via NodeHealthState, tracked per-node with lock-free atomics:

HealthMeaningScheduler response
HealthyOperating within budgetNormal ticking
Warning1× timeout elapsedLog warning
Unhealthy2× timeout — tick skippedSkip tick, log error
Isolated3× timeout on critical nodeIsolate from scheduler, call enter_safe_state()

Communication Patterns

Nodes communicate through Topics — named, typed, shared-memory channels. Here are the standard patterns:

Publisher

A node that produces data for others to consume:

// simplified
struct SensorNode {
    data_pub: Topic<f32>,
}

impl Node for SensorNode {
    fn name(&self) -> &str { "Sensor" }

    fn tick(&mut self) {
        let reading = self.read_hardware();
        self.data_pub.send(reading);
    }
}

Subscriber

A node that consumes data from others:

// simplified
struct DisplayNode {
    data_sub: Topic<f32>,
}

impl Node for DisplayNode {
    fn name(&self) -> &str { "Display" }

    fn tick(&mut self) {
        // IMPORTANT: always call recv() — even if you don't need data this tick
        if let Some(value) = self.data_sub.recv() {
            println!("Value: {:.1}", value);
        }
    }
}

Pipeline (Subscribe → Transform → Publish)

A node that transforms data between topics:

// simplified
struct Filter {
    input: Topic<f32>,
    output: Topic<f32>,
    alpha: f32,
    smoothed: f32,
}

impl Node for Filter {
    fn name(&self) -> &str { "Filter" }

    fn tick(&mut self) {
        if let Some(raw) = self.input.recv() {
            // Exponential moving average
            self.smoothed = self.alpha * raw + (1.0 - self.alpha) * self.smoothed;
            self.output.send(self.smoothed);
        }
    }
}

Multi-Topic Synchronization

When you need data from multiple topics, cache the latest from each and process when all are available:

// simplified
struct Fusion {
    imu_sub: Topic<Imu>,
    odom_sub: Topic<Odometry>,
    pose_pub: Topic<Pose3D>,
    last_imu: Option<Imu>,
    last_odom: Option<Odometry>,
}

impl Node for Fusion {
    fn name(&self) -> &str { "Fusion" }

    fn tick(&mut self) {
        // IMPORTANT: drain ALL topics every tick — never skip a recv() conditionally
        if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
        if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }

        if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
            let fused = self.fuse(imu, odom);
            self.pose_pub.send(fused);
        }
    }
}

Always call recv() on every topic every tick. If you only call recv() inside a conditional branch (e.g., only when a state machine is in a certain state), messages pile up in the buffer. When the condition becomes true, you process stale data from potentially seconds ago.

The node! Macro

The node! macro eliminates boilerplate by generating the struct, Node implementation, constructor, and topic metadata:

// simplified
use horus::prelude::*;

node! {
    SensorNode {
        pub {
            sensor_data: f32 -> "sensor.data",
        }

        tick {
            let data = 42.0;
            self.sensor_data.send(data);
        }
    }
}

This generates:

  • A SensorNode struct with a Topic<f32> field named sensor_data
  • A SensorNode::new() constructor that creates the topic
  • A Node implementation with tick(), name(), and publishers()
  • Topic metadata for monitoring and introspection

The macro also supports sub {} (subscribers), data {} (internal state), init {}, shutdown {}, and impl {} blocks. See The node! Macro Guide for the full syntax.

Logging

Use the hlog! macro for structured logging inside nodes:

// simplified
fn tick(&mut self) {
    hlog!(info, "Processing frame {}", self.frame_count);
    hlog!(warn, "Battery at {}%", self.battery_level);
    hlog!(error, "Sensor disconnected: {}", self.sensor_name);
    hlog!(debug, "Position: ({:.2}, {:.2})", self.x, self.y);
}

For topic-level monitoring without code changes, use CLI tools:

horus topic echo sensor.data    # Print messages on a topic
horus topic hz sensor.data      # Show publish rate
horus monitor --tui             # Interactive dashboard

Python Nodes

Python nodes use a callback-based API that mirrors the Rust pattern:

import horus

def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd is not None:
        set_motor_velocity(cmd)

def motor_shutdown(node):
    set_motor_velocity(0.0)
    print("Motor stopped safely")

motor = horus.Node(
    name="Motor",
    tick=motor_tick,
    shutdown=motor_shutdown,
    subs=["cmd_vel"],
    order=10,
)
horus.run(motor)

Key differences from Rust:

  • tick, init, and shutdown are callback functions, not trait methods
  • Topics are declared via pubs=["topic"] and subs=["topic"] in the constructor
  • node.send("topic", data) and node.recv("topic") instead of self.topic.send(data) / self.topic.recv()

See Python Bindings for the complete Python Node API.

Design Decisions

Why tick() instead of run()? A run() method gives each node full control — it can loop forever, block on I/O, or ignore shutdown signals. This makes it impossible for the scheduler to enforce execution order, monitor timing, or coordinate shutdown. A tick() method inverts the control: the scheduler decides when to call each node, measures how long it takes, and can force shutdown at any time. This enables deterministic execution, deadline monitoring, and safety enforcement.

Why a trait instead of callbacks or closures? A trait lets each node hold its own state as struct fields — typed, owned, and scoped to the node's lifetime. Closures would require capturing state via Arc<Mutex<...>>, reintroducing the shared-state problems nodes are designed to eliminate. A trait also enables the compiler to verify that all required methods are implemented and that the node is Send.

Why Send but not Sync? Send is required because the scheduler may move a node to a dedicated thread (e.g., for RT execution). Sync is not required because the scheduler never allows two threads to access the same node simultaneously — it owns each node exclusively and calls methods sequentially. This means your nodes can use Cell, RefCell, and other non-Sync types without restriction.

Why no context parameter in tick()? Earlier versions of the HORUS API passed a NodeContext or NodeInfo to tick(). This was removed because it coupled every node to the scheduler's internal types, made nodes harder to test in isolation, and added overhead to every tick call. Instead, nodes use hlog! for logging (a global macro) and own their topics directly. Testing a node is now: create an instance, call tick(), check the output topics.

Why lazy initialization? init() is called when the scheduler starts, not when the node is added. This means you can configure all your nodes, set up the scheduler, and only open hardware connections when the system is actually ready to run. It also means init() can depend on the scheduler's configuration (e.g., clock source) being finalized.

Trade-offs

GainCost
Isolation — each node has its own state, no shared memoryCommunication through topics adds nanoseconds of latency
Testability — tick a node once and assert on output topicsMore boilerplate than a bare function call
Deterministic ordering — scheduler controls execution sequenceNodes can't call each other directly
Safety monitoring — budget/deadline/miss enforcementMust implement shutdown() and enter_safe_state() for hardware
Hot-swappable — replace a node without touching othersNodes must agree on topic names and message types

See Also