Nodes and Lifecycle

Key Takeaways

After reading this guide, you will understand:

  • How nodes are self-contained units of computation that run in the scheduler
  • The Node trait's lifecycle methods (init, tick, shutdown) and when each is called
  • How NodeInfo provides logging, metrics, and timing context to your nodes
  • When to use different priority levels (0 for safety-critical, 100 for background logging)
  • Communication patterns (publisher, subscriber, pipeline, aggregator) for building node graphs

Nodes are the fundamental building blocks of HORUS applications. Every component in your robotics system is a node - sensors, actuators, controllers, filters, and more.

What is a Node?

A node is a self-contained unit of computation that runs in the HORUS scheduler. Nodes communicate with each other through the Topic pub/sub system using shared memory IPC.

Key Characteristics

Lifecycle Management: Nodes have explicit initialization, execution, and shutdown phases

Priority-Based Execution: Nodes run in priority order every tick cycle

Zero Boilerplate: The node! macro generates all necessary boilerplate code

Type-Safe Communication: Compile-time guarantees for message passing

Memory Safety: Written in Rust with zero unsafe code in user-facing APIs

The Node Trait

Every HORUS node implements the Node trait. Here are the methods you'll use:

pub trait Node: Send {
    // Required
    fn tick(&mut self);

    // Name (defaults to struct type name, e.g. `MotorController`)
    fn name(&self) -> &str { /* derived from type name */ }

    // Optional lifecycle
    fn init(&mut self) -> Result<()> { Ok(()) }
    fn shutdown(&mut self) -> Result<()> { Ok(()) }
    fn on_error(&mut self, error: &str) { /* logs error */ }

    // Metadata (auto-generated by node! macro — rarely implemented manually)
    fn publishers(&self) -> Vec<TopicMetadata> { Vec::new() }
    fn subscribers(&self) -> Vec<TopicMetadata> { Vec::new() }

    // Safety (used by safety monitor for Miss::SafeMode)
    fn is_safe_state(&self) -> bool { true }
    fn enter_safe_state(&mut self) { /* no-op */ }
}

Note: See the API Reference for complete method documentation.

Required Methods

tick(): Main execution loop called repeatedly by the scheduler. This is the only truly required method.

fn tick(&mut self) {
    // Your node logic here
}

Optional Methods

name(): Returns a string identifying the node. The default implementation derives the name from the struct's type name (e.g., SensorNode"SensorNode"). You can override it:

fn name(&self) -> &str {
    "MyNode"
}

When using the node! macro, the name is auto-generated from the struct name.

init(): Called once during node startup (default: no-op)

fn init(&mut self) -> Result<()> {
    hlog!(info, "Node starting up");
    // Initialize resources, open files, etc.
    Ok(())
}

shutdown(): Called once during graceful shutdown (default: no-op)

fn shutdown(&mut self) -> Result<()> {
    hlog!(info, "Node shutting down");
    // Clean up resources, close connections, etc.
    Ok(())
}

is_safe_state(): Check if the node is in a safe state. Used by the safety monitor when Miss::SafeMode triggers. Override this to report your node's safety status:

fn is_safe_state(&self) -> bool {
    self.velocity == 0.0 && self.motor_disabled
}

enter_safe_state(): Transition the node to a safe state. Called by the scheduler when Miss::SafeMode is active and the node misses a deadline:

fn enter_safe_state(&mut self) {
    self.velocity = 0.0;
    self.disable_motor();
    hlog!(warn, "Entered safe state");
}

Note: Per-node tick rates are set via the scheduler builder (.rate(100_u64.hz())) when adding the node, not on the Node trait itself.

Node Lifecycle

Nodes transition through well-defined states during their lifetime:

Lifecycle States

Uninitialized: Node created but not added to scheduler

Initializing: Running init() method

Running: Executing tick() in main loop

Paused: Temporarily suspended (future feature)

Stopping: Running shutdown() method

Stopped: Clean shutdown complete

Error: Recoverable error occurred

Crashed: Unrecoverable error, node terminated

State Transitions

Loading diagram...
Node State Transitions

Lifecycle Example

use horus::prelude::*;

struct LifecycleDemo {
    counter: u32,
}

impl Node for LifecycleDemo {
    fn name(&self) -> &str {
        "LifecycleDemo"
    }

    fn init(&mut self) -> Result<()> {
        // Called ONCE when node starts
        hlog!(info, "Initializing resources");
        self.counter = 0;
        Ok(())
    }

    fn tick(&mut self) {
        // Called REPEATEDLY in main loop (~60 FPS default)
        self.counter += 1;

        hlog!(debug, "Tick #{}", self.counter);
    }

    fn shutdown(&mut self) -> Result<()> {
        // Called ONCE during graceful shutdown
        hlog!(info, "Shutting down after {} ticks", self.counter);
        Ok(())
    }
}

Logging and Metrics

The scheduler tracks node state, metrics, and lifecycle internally. You don't interact with internal tracking directly — instead, use the hlog! macro for logging and the scheduler API for metrics.

Logging with hlog!

Use the hlog! macro for structured logging:

Info: General information messages

hlog!(info, "Robot ready");

Warn: Warning messages that don't stop execution

hlog!(warn, "Battery low");

Error: Error messages

hlog!(error, "Sensor disconnected");

Debug: Detailed debugging information

hlog!(debug, "Position: ({}, {})", x, y);

Pub/Sub Logging

With the zero-overhead IPC, send() and recv() no longer take ctx. For introspection, enable logging at construction time with .with_logging():

// Construction: choose zero-overhead or with logging
let velocity_pub: Topic<f32> = Topic::new("cmd_vel")?;                  // Zero-overhead (default)
let velocity_pub_logged: Topic<f32> = Topic::new("cmd_vel")?.with_logging(); // With logging (requires T: LogSummary)

fn tick(&mut self) {
    // The only send method — logging depends on how the topic was constructed
    self.velocity_pub.send(1.5);
    // If constructed with .with_logging():
    // Output: [12:34:56.789] MyNode --PUB--> 'cmd_vel' = 1.5

    // The only recv method
    if let Some(scan) = self.lidar_sub.recv() {
        self.process(scan);
    }
}

For monitoring without code changes, use CLI tools: horus topic echo, horus topic hz, horus monitor.

Performance Metrics

NodeInfo tracks detailed performance metrics:

pub struct NodeMetrics {
    pub name: String,
    pub priority: u32,
    pub total_ticks: u64,
    pub successful_ticks: u64,
    pub failed_ticks: u64,
    pub avg_tick_duration_ms: f64,
    pub max_tick_duration_ms: f64,
    pub min_tick_duration_ms: f64,
    pub last_tick_duration_ms: f64,
    pub messages_sent: u64,
    pub messages_received: u64,
    pub errors_count: u64,
    pub warnings_count: u64,
    pub uptime_seconds: f64,
}

Access metrics via the scheduler:

fn init(&mut self) -> Result<()> {
    hlog!(info, "Node initializing");
    Ok(())
}

fn tick(&mut self) {
    // Track state internally if needed
    self.tick_count += 1;
}

Tick Timing

The scheduler automatically tracks tick duration and updates metrics for each node. You don't need to call any timing methods manually.

Node Priority

Nodes execute in priority order each tick cycle:

Priority Levels

Priorities are represented as u32 values where lower numbers = higher priority.

Common priority values:

// Recommended priority constants
const CRITICAL: u32    = 0;   // Highest priority
const HIGH: u32        = 10;
const NORMAL: u32      = 50;  // Default
const LOW: u32         = 80;
const BACKGROUND: u32  = 100; // Lowest priority

You can use any u32 value for fine-grained control (e.g., 5, 15, 25, 37, 42, etc.).

Priority Usage

use horus::prelude::*;

let mut scheduler = Scheduler::new();

// Safety monitor runs FIRST every tick (order 0)
scheduler.add(safety_node).order(0).build()?;

// Controller runs second (order 10)
scheduler.add(control_node).order(10).build()?;

// Sensors run third (order 50)
scheduler.add(sensor_node).order(50).build()?;

// Logging runs LAST (order 100)
scheduler.add(logger_node).order(100).build()?;

// Fine-grained priorities for complex systems
scheduler.add(emergency_stop).order(0).build()?;      // Highest
scheduler.add(motor_control).order(15).build()?;      // Between HIGH and NORMAL
scheduler.add(vision_processing).order(55).build()?;  // Slightly lower than normal
scheduler.add(telemetry).order(90).build()?;          // Between LOW and BACKGROUND

Priority Guidelines

0 (Critical): Safety monitors, emergency stops, fault detection

10 (High): Control loops, actuator commands, real-time feedback

50 (Normal): Sensor processing, state estimation, path planning

80 (Low): Non-critical computation, filtering, analysis

100 (Background): Logging, monitoring, diagnostics, data recording

Custom Values: Use any u32 value for fine-grained priority control in complex systems

Creating Nodes

Manual Implementation

use horus::prelude::*;

struct SensorNode {
    data_pub: Topic<f32>,
}

impl SensorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            data_pub: Topic::new("sensor_data")?,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &str {
        "SensorNode"
    }

    fn tick(&mut self) {
        let data = 42.0; // Read sensor
        self.data_pub.send(data);
    }
}

Using the node! Macro

The node! macro eliminates boilerplate:

use horus::prelude::*;

node! {
    SensorNode {
        pub {
            sensor_data: f32 -> "sensor_data",
        }

        tick {
            let data = 42.0;
            self.sensor_data.send(data);
        }
    }
}

The macro generates:

  • Struct definition with Topic fields
  • Node trait implementation
  • Constructor function (SensorNode::new())
  • Topic metadata methods

The macro also supports sub {} (subscribers), data {} (internal state), init {}, shutdown {}, and impl {} blocks. See The node! Macro Guide for the full syntax including lifecycle hooks, custom names, and advanced patterns.

Node Communication Patterns

Publisher Pattern

struct Publisher {
    data_pub: Topic<f32>,
}

impl Node for Publisher {
    fn tick(&mut self) {
        let data = self.generate_data();
        self.data_pub.send(data);
    }
}

Subscriber Pattern

struct Subscriber {
    data_sub: Topic<f32>,
}

impl Node for Subscriber {
    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            self.process(data);
        }
    }
}

Pipeline Pattern

struct Filter {
    input_sub: Topic<f32>,
    output_pub: Topic<f32>,
}

impl Node for Filter {
    fn tick(&mut self) {
        if let Some(input) = self.input_sub.recv() {
            let output = input * 2.0;
            self.output_pub.send(output);
        }
    }
}

Aggregator Pattern

struct Aggregator {
    input_a: Topic<f32>,
    input_b: Topic<f32>,
    output_pub: Topic<f32>,
}

impl Node for Aggregator {
    fn tick(&mut self) {
        if let (Some(a), Some(b)) = (self.input_a.recv(), self.input_b.recv()) {
            let result = a + b;
            self.output_pub.send(result);
        }
    }
}

Best Practices

Keep tick() Fast

The tick method should complete quickly (ideally <1ms):

// GOOD: Fast computation
fn tick(&mut self) {
    let result = self.compute_quickly();
    self.pub.send(result);
}

// BAD: Blocking I/O
fn tick(&mut self) {
    let data = std::fs::read_to_string("file.txt").unwrap(); // Blocks!
    // ...
}

For slow operations, use async tasks or separate threads initialized in init().

What to Include in init()

The init() method runs once when your node starts. Use it to set up everything your node needs before tick() begins.

Always include in init():

CategoryExamplesWhy
Hardware connectionsSerial ports, I2C/SPI devices, GPIO pinsMust be opened before use
Network connectionsTCP/UDP sockets, WebSocket clientsEstablish before tick loop
File handlesConfig files, log files, data filesOpen once, use in tick
Pre-allocated buffersImage buffers, point cloud arraysAvoid allocation in tick
Calibration/setupSensor calibration, motor homingOne-time setup operations
Initial stateReset counters, clear flagsStart from known state
fn init(&mut self) -> Result<()> {
    hlog!(info, "Initializing MyMotorNode");

    // 1. Open hardware connections
    self.serial_port = serialport::new("/dev/ttyUSB0", 115200)
        .open()
        .map_err(|e| Error::node("MyMotorNode", format!("Failed to open serial: {}", e)))?;

    // 2. Pre-allocate buffers (avoid allocation in tick)
    self.command_buffer = vec![0u8; 256];

    // 3. Initialize hardware state
    self.send_init_sequence()?;

    // 4. Set initial values
    self.velocity = 0.0;
    self.is_armed = false;

    hlog!(info, "MyMotorNode initialized successfully");
    Ok(())
}

What to Include in shutdown()

The shutdown() method runs once when your application exits (Ctrl+C, SIGINT, SIGTERM). Use it to safely stop hardware and release resources.

Always include in shutdown():

CategoryExamplesWhy
Stop actuatorsMotors, servos, pumps, valvesCRITICAL SAFETY - prevent runaway
Disable hardwareDisable motor drivers, turn off outputsSafe state for power-off
Close connectionsSerial ports, network socketsRelease system resources
Release GPIOUnexport pins, set to input modeAllow other processes to use
Save stateLog final position, save calibrationPreserve data for next run
Flush buffersWrite pending data to diskPrevent data loss
fn shutdown(&mut self) -> Result<()> {
    hlog!(info, "MyMotorNode shutting down");

    // 1. CRITICAL: Stop all actuators FIRST
    self.velocity = 0.0;
    self.send_stop_command();

    // 2. Disable hardware (safe state)
    self.disable_motor_driver();

    // 3. Close hardware connections
    if let Some(port) = self.serial_port.take() {
        drop(port);  // Closes the port
    }

    // 4. Save any important state
    self.save_position_to_file()?;

    hlog!(info, "MyMotorNode shutdown complete");
    Ok(())
}

Complete Custom Node Example

Here's a complete example showing proper init() and shutdown() implementation:

use horus::prelude::*;

struct MyMotorController {
    // Hardware
    serial_port: Option<Box<dyn serialport::SerialPort>>,

    // Communication
    cmd_sub: Topic<MotorCommand>,
    status_pub: Topic<MotorStatus>,

    // State
    velocity: f64,
    position: i32,
    is_enabled: bool,
}

impl MyMotorController {
    fn new() -> Result<Self> {
        Ok(Self {
            serial_port: None,
            cmd_sub: Topic::new("motor.cmd")?,
            status_pub: Topic::new("motor.status")?,
            velocity: 0.0,
            position: 0,
            is_enabled: false,
        })
    }

    fn send_velocity(&mut self, vel: f64) {
        if let Some(ref mut port) = self.serial_port {
            let cmd = format!("V{}\n", vel);
            let _ = port.write(cmd.as_bytes());
        }
    }
}

impl Node for MyMotorController {
    fn name(&self) -> &str { "MyMotorController" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Opening serial connection to motor controller");

        // Open hardware connection
        self.serial_port = Some(
            serialport::new("/dev/ttyUSB0", 115200)
                .timeout(std::time::Duration::from_millis(100))
                .open()
                .map_err(|e| Error::node("MyMotorController", format!("Serial open failed: {}", e)))?
        );

        // Initialize motor to stopped state
        self.send_velocity(0.0);
        self.is_enabled = true;

        hlog!(info, "Motor controller ready");
        Ok(())
    }

    fn tick(&mut self) {
        // Process commands
        if let Some(cmd) = self.cmd_sub.recv() {
            self.velocity = cmd.velocity;
            self.send_velocity(self.velocity);
        }

        // Publish status
        let status = MotorStatus {
            velocity: self.velocity,
            position: self.position,
        };
        self.status_pub.send(status);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Stopping motor for safe shutdown");

        // CRITICAL: Stop motor first!
        self.velocity = 0.0;
        self.send_velocity(0.0);

        // Close serial port
        self.serial_port = None;
        self.is_enabled = false;

        hlog!(info, "Motor stopped safely");
        Ok(())
    }
}

When init() and shutdown() Are NOT Optional

While the default implementations are no-ops, you should implement them when:

Scenarioinit() Requiredshutdown() Required
Controls motors/actuatorsSetupYES - SAFETY CRITICAL
Opens serial/I2C/SPI portsYESYES
Uses GPIO pinsYESYES
Opens network connectionsYESRecommended
Allocates large buffersYESNo
Reads config filesYESNo
Writes log/data filesOptionalYES (flush)
Pure computation nodeNoNo

Use Result Types

Return errors from init() and shutdown():

fn init(&mut self) -> Result<()> {
    if !self.sensor.is_available() {
        return Err(Error::node("MyNode", "Sensor not found"));
    }
    Ok(())
}

Use hlog! for Logging in tick()

Since tick() no longer receives ctx, use the hlog! macro for logging:

fn tick(&mut self) {
    hlog!(info, "Processing data");
    hlog!(debug, "Detailed debug info: {:?}", self.state);
}

Avoid State in Static Variables

Store state in the node struct, not static variables:

// GOOD
struct MyNode {
    counter: u32,  // Instance state
}

// BAD
static mut COUNTER: u32 = 0;  // Unsafe global state

Error Handling

Initialization Errors

fn init(&mut self) -> Result<()> {
    self.device = Device::open().map_err(|e| {
        Error::node("MyNode", format!("Failed to open device: {}", e))
    })?;

    hlog!(info, "Device opened successfully");
    Ok(())
}

If init() returns an error, the node transitions to Error state and won't run.

Runtime Errors

Handle errors in tick() without panicking:

fn tick(&mut self) {
    match self.data_sub.recv() {
        Some(data) => self.process(data),
        None => {
            // No data available - this is normal
        }
    }
}

Shutdown Errors

fn shutdown(&mut self) -> Result<()> {
    if let Err(e) = self.device.close() {
        hlog!(warn, "Failed to close device: {}", e);
        // Continue shutdown anyway
    }
    Ok(())
}

Advanced Topics

Conditional Execution

Run logic only under certain conditions:

fn tick(&mut self) {
    self.tick_count += 1;

    // Execute every 10 ticks
    if self.tick_count % 10 == 0 {
        self.slow_operation();
    }
}

State-Based Logic

Implement complex behavior with enum-based state patterns:

enum RobotState {
    Idle,
    Moving,
    Stopped,
}

struct RobotController {
    state: RobotState,
    cmd_sub: Topic<CmdVel>,
}

impl Node for RobotController {
    fn tick(&mut self) {
        match self.state {
            RobotState::Idle => {
                if let Some(cmd) = self.cmd_sub.recv() {
                    self.state = RobotState::Moving;
                }
            }
            RobotState::Moving => {
                // Execute movement
                if self.is_done() {
                    self.state = RobotState::Stopped;
                }
            }
            RobotState::Stopped => {
                self.state = RobotState::Idle;
            }
        }
    }
}

Multi-Topic Synchronization

Wait for data from multiple topics:

struct Synchronizer {
    topic_a: Topic<f32>,
    topic_b: Topic<f32>,
    last_a: Option<f32>,
    last_b: Option<f32>,
}

impl Node for Synchronizer {
    fn tick(&mut self) {
        // Update cached values
        if let Some(a) = self.topic_a.recv() {
            self.last_a = Some(a);
        }
        if let Some(b) = self.topic_b.recv() {
            self.last_b = Some(b);
        }

        // Process when both available
        if let (Some(a), Some(b)) = (self.last_a, self.last_b) {
            self.process(a, b);
        }
    }
}

Graceful Shutdown & Motor Safety

When a HORUS application receives a termination signal (Ctrl+C, SIGINT, SIGTERM), the scheduler automatically calls shutdown() on all registered nodes. This is critical for robotics safety.

Signal Handling

The scheduler intercepts termination signals and ensures proper cleanup:

Loading diagram...
Signal Handling Flow

Why shutdown() Matters for Motors

Without shutdown(): If you stop your robot with Ctrl+C while motors are running, they continue at their last commanded velocity - potentially dangerous for autonomous vehicles!

With shutdown(): Motors receive stop commands before the application exits:

fn shutdown(&mut self) -> Result<()> {
    hlog!(info, "Stopping all motors for safe shutdown");

    // Send stop command to all motors
    self.emergency_stop();

    // Optionally: disable motor drivers
    self.disable_all_drivers();

    hlog!(info, "Motors stopped safely");
    Ok(())
}

Python Node Shutdown Behavior

Python nodes also support shutdown callbacks. When the scheduler stops, your shutdown function runs automatically:

from horus import Node, Topic, Scheduler

class MotorController(Node):
    def __init__(self):
        super().__init__("MotorController")

    def init(self):
        self.cmd_sub = Topic("cmd_vel")
        self.velocity = (0.0, 0.0)

    def tick(self):
        msg = self.cmd_sub.recv()
        if msg is not None:
            self.velocity = msg

    def shutdown(self):
        self.velocity = (0.0, 0.0)
        print("Motors stopped safely")

scheduler = Scheduler()
scheduler.node(MotorController()).order(10).build()
scheduler.run()

See the Python Bindings documentation for details.

Implementing shutdown() in Custom Nodes

Always implement shutdown() for nodes that control actuators:

impl Node for MyMotorController {
    fn name(&self) -> &str { "MyMotorController" }

    fn tick(&mut self) {
        // Normal operation - motors running
        self.motor_pub.send(self.velocity);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "MyMotorController shutting down");

        // CRITICAL: Stop all motors
        self.velocity = 0.0;
        self.motor_pub.send(0.0);

        // Close hardware connections
        if let Some(port) = self.serial_port.take() {
            port.close();
        }

        hlog!(info, "All motors stopped safely");
        Ok(())
    }
}

Testing Shutdown Behavior

Test your shutdown implementation before deploying:

# Start your application
horus run

# In another terminal, send SIGINT
kill -SIGINT <pid>

# Or simply press Ctrl+C in the application terminal

Verify in logs:

[12:34:56.789] [INFO] MyMotorController shutting down
[12:34:56.790] [INFO] All motors stopped safely

Best Practices for Shutdown

  1. Always stop actuators first - Motors, servos, and other actuators should receive stop commands
  2. Close hardware connections - Serial ports, I2C, SPI, CAN bus connections
  3. Release system resources - GPIO pins, file handles, network sockets
  4. Log shutdown progress - Helps debug shutdown issues
  5. Don't panic in shutdown - Handle errors gracefully, continue cleanup
fn shutdown(&mut self) -> Result<()> {
    // Always try to stop motors, even if other cleanup fails
    if let Err(e) = self.stop_motors() {
        hlog!(error, "Failed to stop motors: {}", e);
        // Continue with other cleanup anyway
    }

    // Close connections (non-critical)
    if let Err(e) = self.close_connection() {
        hlog!(warn, "Failed to close connection: {}", e);
    }

    Ok(())
}

Next Steps