Nodes and Lifecycle
Key Takeaways
After reading this guide, you will understand:
- How nodes are self-contained units of computation that run in the scheduler
- The Node trait's lifecycle methods (init, tick, shutdown) and when each is called
- How NodeInfo provides logging, metrics, and timing context to your nodes
- When to use different priority levels (0 for safety-critical, 100 for background logging)
- Communication patterns (publisher, subscriber, pipeline, aggregator) for building node graphs
Nodes are the fundamental building blocks of HORUS applications. Every component in your robotics system is a node - sensors, actuators, controllers, filters, and more.
What is a Node?
A node is a self-contained unit of computation that runs in the HORUS scheduler. Nodes communicate with each other through the Topic pub/sub system using shared memory IPC.
Key Characteristics
Lifecycle Management: Nodes have explicit initialization, execution, and shutdown phases
Priority-Based Execution: Nodes run in priority order every tick cycle
Zero Boilerplate: The node! macro generates all necessary boilerplate code
Type-Safe Communication: Compile-time guarantees for message passing
Memory Safety: Written in Rust with zero unsafe code in user-facing APIs
The Node Trait
Every HORUS node implements the Node trait. Here are the methods you'll use:
pub trait Node: Send {
// Required
fn tick(&mut self);
// Name (defaults to struct type name, e.g. `MotorController`)
fn name(&self) -> &str { /* derived from type name */ }
// Optional lifecycle
fn init(&mut self) -> Result<()> { Ok(()) }
fn shutdown(&mut self) -> Result<()> { Ok(()) }
fn on_error(&mut self, error: &str) { /* logs error */ }
// Metadata (auto-generated by node! macro — rarely implemented manually)
fn publishers(&self) -> Vec<TopicMetadata> { Vec::new() }
fn subscribers(&self) -> Vec<TopicMetadata> { Vec::new() }
// Safety (used by safety monitor for Miss::SafeMode)
fn is_safe_state(&self) -> bool { true }
fn enter_safe_state(&mut self) { /* no-op */ }
}
Note: See the API Reference for complete method documentation.
Required Methods
tick(): Main execution loop called repeatedly by the scheduler. This is the only truly required method.
fn tick(&mut self) {
// Your node logic here
}
Optional Methods
name(): Returns a string identifying the node. The default implementation derives the name from the struct's type name (e.g., SensorNode → "SensorNode"). You can override it:
fn name(&self) -> &str {
"MyNode"
}
When using the node! macro, the name is auto-generated from the struct name.
init(): Called once during node startup (default: no-op)
fn init(&mut self) -> Result<()> {
hlog!(info, "Node starting up");
// Initialize resources, open files, etc.
Ok(())
}
shutdown(): Called once during graceful shutdown (default: no-op)
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Node shutting down");
// Clean up resources, close connections, etc.
Ok(())
}
is_safe_state(): Check if the node is in a safe state. Used by the safety monitor when Miss::SafeMode triggers. Override this to report your node's safety status:
fn is_safe_state(&self) -> bool {
self.velocity == 0.0 && self.motor_disabled
}
enter_safe_state(): Transition the node to a safe state. Called by the scheduler when Miss::SafeMode is active and the node misses a deadline:
fn enter_safe_state(&mut self) {
self.velocity = 0.0;
self.disable_motor();
hlog!(warn, "Entered safe state");
}
Note: Per-node tick rates are set via the scheduler builder (
.rate(100_u64.hz())) when adding the node, not on the Node trait itself.
Node Lifecycle
Nodes transition through well-defined states during their lifetime:
Lifecycle States
Uninitialized: Node created but not added to scheduler
Initializing: Running init() method
Running: Executing tick() in main loop
Paused: Temporarily suspended (future feature)
Stopping: Running shutdown() method
Stopped: Clean shutdown complete
Error: Recoverable error occurred
Crashed: Unrecoverable error, node terminated
State Transitions
Lifecycle Example
use horus::prelude::*;
struct LifecycleDemo {
counter: u32,
}
impl Node for LifecycleDemo {
fn name(&self) -> &str {
"LifecycleDemo"
}
fn init(&mut self) -> Result<()> {
// Called ONCE when node starts
hlog!(info, "Initializing resources");
self.counter = 0;
Ok(())
}
fn tick(&mut self) {
// Called REPEATEDLY in main loop (~60 FPS default)
self.counter += 1;
hlog!(debug, "Tick #{}", self.counter);
}
fn shutdown(&mut self) -> Result<()> {
// Called ONCE during graceful shutdown
hlog!(info, "Shutting down after {} ticks", self.counter);
Ok(())
}
}
Logging and Metrics
The scheduler tracks node state, metrics, and lifecycle internally. You don't interact with internal tracking directly — instead, use the hlog! macro for logging and the scheduler API for metrics.
Logging with hlog!
Use the hlog! macro for structured logging:
Info: General information messages
hlog!(info, "Robot ready");
Warn: Warning messages that don't stop execution
hlog!(warn, "Battery low");
Error: Error messages
hlog!(error, "Sensor disconnected");
Debug: Detailed debugging information
hlog!(debug, "Position: ({}, {})", x, y);
Pub/Sub Logging
With the zero-overhead IPC, send() and recv() no longer take ctx. For introspection, enable logging at construction time with .with_logging():
// Construction: choose zero-overhead or with logging
let velocity_pub: Topic<f32> = Topic::new("cmd_vel")?; // Zero-overhead (default)
let velocity_pub_logged: Topic<f32> = Topic::new("cmd_vel")?.with_logging(); // With logging (requires T: LogSummary)
fn tick(&mut self) {
// The only send method — logging depends on how the topic was constructed
self.velocity_pub.send(1.5);
// If constructed with .with_logging():
// Output: [12:34:56.789] MyNode --PUB--> 'cmd_vel' = 1.5
// The only recv method
if let Some(scan) = self.lidar_sub.recv() {
self.process(scan);
}
}
For monitoring without code changes, use CLI tools: horus topic echo, horus topic hz, horus monitor.
Performance Metrics
NodeInfo tracks detailed performance metrics:
pub struct NodeMetrics {
pub name: String,
pub priority: u32,
pub total_ticks: u64,
pub successful_ticks: u64,
pub failed_ticks: u64,
pub avg_tick_duration_ms: f64,
pub max_tick_duration_ms: f64,
pub min_tick_duration_ms: f64,
pub last_tick_duration_ms: f64,
pub messages_sent: u64,
pub messages_received: u64,
pub errors_count: u64,
pub warnings_count: u64,
pub uptime_seconds: f64,
}
Access metrics via the scheduler:
fn init(&mut self) -> Result<()> {
hlog!(info, "Node initializing");
Ok(())
}
fn tick(&mut self) {
// Track state internally if needed
self.tick_count += 1;
}
Tick Timing
The scheduler automatically tracks tick duration and updates metrics for each node. You don't need to call any timing methods manually.
Node Priority
Nodes execute in priority order each tick cycle:
Priority Levels
Priorities are represented as u32 values where lower numbers = higher priority.
Common priority values:
// Recommended priority constants
const CRITICAL: u32 = 0; // Highest priority
const HIGH: u32 = 10;
const NORMAL: u32 = 50; // Default
const LOW: u32 = 80;
const BACKGROUND: u32 = 100; // Lowest priority
You can use any u32 value for fine-grained control (e.g., 5, 15, 25, 37, 42, etc.).
Priority Usage
use horus::prelude::*;
let mut scheduler = Scheduler::new();
// Safety monitor runs FIRST every tick (order 0)
scheduler.add(safety_node).order(0).build()?;
// Controller runs second (order 10)
scheduler.add(control_node).order(10).build()?;
// Sensors run third (order 50)
scheduler.add(sensor_node).order(50).build()?;
// Logging runs LAST (order 100)
scheduler.add(logger_node).order(100).build()?;
// Fine-grained priorities for complex systems
scheduler.add(emergency_stop).order(0).build()?; // Highest
scheduler.add(motor_control).order(15).build()?; // Between HIGH and NORMAL
scheduler.add(vision_processing).order(55).build()?; // Slightly lower than normal
scheduler.add(telemetry).order(90).build()?; // Between LOW and BACKGROUND
Priority Guidelines
0 (Critical): Safety monitors, emergency stops, fault detection
10 (High): Control loops, actuator commands, real-time feedback
50 (Normal): Sensor processing, state estimation, path planning
80 (Low): Non-critical computation, filtering, analysis
100 (Background): Logging, monitoring, diagnostics, data recording
Custom Values: Use any u32 value for fine-grained priority control in complex systems
Creating Nodes
Manual Implementation
use horus::prelude::*;
struct SensorNode {
data_pub: Topic<f32>,
}
impl SensorNode {
fn new() -> Result<Self> {
Ok(Self {
data_pub: Topic::new("sensor_data")?,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &str {
"SensorNode"
}
fn tick(&mut self) {
let data = 42.0; // Read sensor
self.data_pub.send(data);
}
}
Using the node! Macro
The node! macro eliminates boilerplate:
use horus::prelude::*;
node! {
SensorNode {
pub {
sensor_data: f32 -> "sensor_data",
}
tick {
let data = 42.0;
self.sensor_data.send(data);
}
}
}
The macro generates:
- Struct definition with Topic fields
- Node trait implementation
- Constructor function (
SensorNode::new()) - Topic metadata methods
The macro also supports sub {} (subscribers), data {} (internal state), init {}, shutdown {}, and impl {} blocks. See The node! Macro Guide for the full syntax including lifecycle hooks, custom names, and advanced patterns.
Node Communication Patterns
Publisher Pattern
struct Publisher {
data_pub: Topic<f32>,
}
impl Node for Publisher {
fn tick(&mut self) {
let data = self.generate_data();
self.data_pub.send(data);
}
}
Subscriber Pattern
struct Subscriber {
data_sub: Topic<f32>,
}
impl Node for Subscriber {
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
self.process(data);
}
}
}
Pipeline Pattern
struct Filter {
input_sub: Topic<f32>,
output_pub: Topic<f32>,
}
impl Node for Filter {
fn tick(&mut self) {
if let Some(input) = self.input_sub.recv() {
let output = input * 2.0;
self.output_pub.send(output);
}
}
}
Aggregator Pattern
struct Aggregator {
input_a: Topic<f32>,
input_b: Topic<f32>,
output_pub: Topic<f32>,
}
impl Node for Aggregator {
fn tick(&mut self) {
if let (Some(a), Some(b)) = (self.input_a.recv(), self.input_b.recv()) {
let result = a + b;
self.output_pub.send(result);
}
}
}
Best Practices
Keep tick() Fast
The tick method should complete quickly (ideally <1ms):
// GOOD: Fast computation
fn tick(&mut self) {
let result = self.compute_quickly();
self.pub.send(result);
}
// BAD: Blocking I/O
fn tick(&mut self) {
let data = std::fs::read_to_string("file.txt").unwrap(); // Blocks!
// ...
}
For slow operations, use async tasks or separate threads initialized in init().
What to Include in init()
The init() method runs once when your node starts. Use it to set up everything your node needs before tick() begins.
Always include in init():
| Category | Examples | Why |
|---|---|---|
| Hardware connections | Serial ports, I2C/SPI devices, GPIO pins | Must be opened before use |
| Network connections | TCP/UDP sockets, WebSocket clients | Establish before tick loop |
| File handles | Config files, log files, data files | Open once, use in tick |
| Pre-allocated buffers | Image buffers, point cloud arrays | Avoid allocation in tick |
| Calibration/setup | Sensor calibration, motor homing | One-time setup operations |
| Initial state | Reset counters, clear flags | Start from known state |
fn init(&mut self) -> Result<()> {
hlog!(info, "Initializing MyMotorNode");
// 1. Open hardware connections
self.serial_port = serialport::new("/dev/ttyUSB0", 115200)
.open()
.map_err(|e| Error::node("MyMotorNode", format!("Failed to open serial: {}", e)))?;
// 2. Pre-allocate buffers (avoid allocation in tick)
self.command_buffer = vec![0u8; 256];
// 3. Initialize hardware state
self.send_init_sequence()?;
// 4. Set initial values
self.velocity = 0.0;
self.is_armed = false;
hlog!(info, "MyMotorNode initialized successfully");
Ok(())
}
What to Include in shutdown()
The shutdown() method runs once when your application exits (Ctrl+C, SIGINT, SIGTERM). Use it to safely stop hardware and release resources.
Always include in shutdown():
| Category | Examples | Why |
|---|---|---|
| Stop actuators | Motors, servos, pumps, valves | CRITICAL SAFETY - prevent runaway |
| Disable hardware | Disable motor drivers, turn off outputs | Safe state for power-off |
| Close connections | Serial ports, network sockets | Release system resources |
| Release GPIO | Unexport pins, set to input mode | Allow other processes to use |
| Save state | Log final position, save calibration | Preserve data for next run |
| Flush buffers | Write pending data to disk | Prevent data loss |
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "MyMotorNode shutting down");
// 1. CRITICAL: Stop all actuators FIRST
self.velocity = 0.0;
self.send_stop_command();
// 2. Disable hardware (safe state)
self.disable_motor_driver();
// 3. Close hardware connections
if let Some(port) = self.serial_port.take() {
drop(port); // Closes the port
}
// 4. Save any important state
self.save_position_to_file()?;
hlog!(info, "MyMotorNode shutdown complete");
Ok(())
}
Complete Custom Node Example
Here's a complete example showing proper init() and shutdown() implementation:
use horus::prelude::*;
struct MyMotorController {
// Hardware
serial_port: Option<Box<dyn serialport::SerialPort>>,
// Communication
cmd_sub: Topic<MotorCommand>,
status_pub: Topic<MotorStatus>,
// State
velocity: f64,
position: i32,
is_enabled: bool,
}
impl MyMotorController {
fn new() -> Result<Self> {
Ok(Self {
serial_port: None,
cmd_sub: Topic::new("motor.cmd")?,
status_pub: Topic::new("motor.status")?,
velocity: 0.0,
position: 0,
is_enabled: false,
})
}
fn send_velocity(&mut self, vel: f64) {
if let Some(ref mut port) = self.serial_port {
let cmd = format!("V{}\n", vel);
let _ = port.write(cmd.as_bytes());
}
}
}
impl Node for MyMotorController {
fn name(&self) -> &str { "MyMotorController" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Opening serial connection to motor controller");
// Open hardware connection
self.serial_port = Some(
serialport::new("/dev/ttyUSB0", 115200)
.timeout(std::time::Duration::from_millis(100))
.open()
.map_err(|e| Error::node("MyMotorController", format!("Serial open failed: {}", e)))?
);
// Initialize motor to stopped state
self.send_velocity(0.0);
self.is_enabled = true;
hlog!(info, "Motor controller ready");
Ok(())
}
fn tick(&mut self) {
// Process commands
if let Some(cmd) = self.cmd_sub.recv() {
self.velocity = cmd.velocity;
self.send_velocity(self.velocity);
}
// Publish status
let status = MotorStatus {
velocity: self.velocity,
position: self.position,
};
self.status_pub.send(status);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Stopping motor for safe shutdown");
// CRITICAL: Stop motor first!
self.velocity = 0.0;
self.send_velocity(0.0);
// Close serial port
self.serial_port = None;
self.is_enabled = false;
hlog!(info, "Motor stopped safely");
Ok(())
}
}
When init() and shutdown() Are NOT Optional
While the default implementations are no-ops, you should implement them when:
| Scenario | init() Required | shutdown() Required |
|---|---|---|
| Controls motors/actuators | Setup | YES - SAFETY CRITICAL |
| Opens serial/I2C/SPI ports | YES | YES |
| Uses GPIO pins | YES | YES |
| Opens network connections | YES | Recommended |
| Allocates large buffers | YES | No |
| Reads config files | YES | No |
| Writes log/data files | Optional | YES (flush) |
| Pure computation node | No | No |
Use Result Types
Return errors from init() and shutdown():
fn init(&mut self) -> Result<()> {
if !self.sensor.is_available() {
return Err(Error::node("MyNode", "Sensor not found"));
}
Ok(())
}
Use hlog! for Logging in tick()
Since tick() no longer receives ctx, use the hlog! macro for logging:
fn tick(&mut self) {
hlog!(info, "Processing data");
hlog!(debug, "Detailed debug info: {:?}", self.state);
}
Avoid State in Static Variables
Store state in the node struct, not static variables:
// GOOD
struct MyNode {
counter: u32, // Instance state
}
// BAD
static mut COUNTER: u32 = 0; // Unsafe global state
Error Handling
Initialization Errors
fn init(&mut self) -> Result<()> {
self.device = Device::open().map_err(|e| {
Error::node("MyNode", format!("Failed to open device: {}", e))
})?;
hlog!(info, "Device opened successfully");
Ok(())
}
If init() returns an error, the node transitions to Error state and won't run.
Runtime Errors
Handle errors in tick() without panicking:
fn tick(&mut self) {
match self.data_sub.recv() {
Some(data) => self.process(data),
None => {
// No data available - this is normal
}
}
}
Shutdown Errors
fn shutdown(&mut self) -> Result<()> {
if let Err(e) = self.device.close() {
hlog!(warn, "Failed to close device: {}", e);
// Continue shutdown anyway
}
Ok(())
}
Advanced Topics
Conditional Execution
Run logic only under certain conditions:
fn tick(&mut self) {
self.tick_count += 1;
// Execute every 10 ticks
if self.tick_count % 10 == 0 {
self.slow_operation();
}
}
State-Based Logic
Implement complex behavior with enum-based state patterns:
enum RobotState {
Idle,
Moving,
Stopped,
}
struct RobotController {
state: RobotState,
cmd_sub: Topic<CmdVel>,
}
impl Node for RobotController {
fn tick(&mut self) {
match self.state {
RobotState::Idle => {
if let Some(cmd) = self.cmd_sub.recv() {
self.state = RobotState::Moving;
}
}
RobotState::Moving => {
// Execute movement
if self.is_done() {
self.state = RobotState::Stopped;
}
}
RobotState::Stopped => {
self.state = RobotState::Idle;
}
}
}
}
Multi-Topic Synchronization
Wait for data from multiple topics:
struct Synchronizer {
topic_a: Topic<f32>,
topic_b: Topic<f32>,
last_a: Option<f32>,
last_b: Option<f32>,
}
impl Node for Synchronizer {
fn tick(&mut self) {
// Update cached values
if let Some(a) = self.topic_a.recv() {
self.last_a = Some(a);
}
if let Some(b) = self.topic_b.recv() {
self.last_b = Some(b);
}
// Process when both available
if let (Some(a), Some(b)) = (self.last_a, self.last_b) {
self.process(a, b);
}
}
}
Graceful Shutdown & Motor Safety
When a HORUS application receives a termination signal (Ctrl+C, SIGINT, SIGTERM), the scheduler automatically calls shutdown() on all registered nodes. This is critical for robotics safety.
Signal Handling
The scheduler intercepts termination signals and ensures proper cleanup:
Why shutdown() Matters for Motors
Without shutdown(): If you stop your robot with Ctrl+C while motors are running, they continue at their last commanded velocity - potentially dangerous for autonomous vehicles!
With shutdown(): Motors receive stop commands before the application exits:
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Stopping all motors for safe shutdown");
// Send stop command to all motors
self.emergency_stop();
// Optionally: disable motor drivers
self.disable_all_drivers();
hlog!(info, "Motors stopped safely");
Ok(())
}
Python Node Shutdown Behavior
Python nodes also support shutdown callbacks. When the scheduler stops, your shutdown function runs automatically:
from horus import Node, Topic, Scheduler
class MotorController(Node):
def __init__(self):
super().__init__("MotorController")
def init(self):
self.cmd_sub = Topic("cmd_vel")
self.velocity = (0.0, 0.0)
def tick(self):
msg = self.cmd_sub.recv()
if msg is not None:
self.velocity = msg
def shutdown(self):
self.velocity = (0.0, 0.0)
print("Motors stopped safely")
scheduler = Scheduler()
scheduler.node(MotorController()).order(10).build()
scheduler.run()
See the Python Bindings documentation for details.
Implementing shutdown() in Custom Nodes
Always implement shutdown() for nodes that control actuators:
impl Node for MyMotorController {
fn name(&self) -> &str { "MyMotorController" }
fn tick(&mut self) {
// Normal operation - motors running
self.motor_pub.send(self.velocity);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "MyMotorController shutting down");
// CRITICAL: Stop all motors
self.velocity = 0.0;
self.motor_pub.send(0.0);
// Close hardware connections
if let Some(port) = self.serial_port.take() {
port.close();
}
hlog!(info, "All motors stopped safely");
Ok(())
}
}
Testing Shutdown Behavior
Test your shutdown implementation before deploying:
# Start your application
horus run
# In another terminal, send SIGINT
kill -SIGINT <pid>
# Or simply press Ctrl+C in the application terminal
Verify in logs:
[12:34:56.789] [INFO] MyMotorController shutting down
[12:34:56.790] [INFO] All motors stopped safely
Best Practices for Shutdown
- Always stop actuators first - Motors, servos, and other actuators should receive stop commands
- Close hardware connections - Serial ports, I2C, SPI, CAN bus connections
- Release system resources - GPIO pins, file handles, network sockets
- Log shutdown progress - Helps debug shutdown issues
- Don't panic in shutdown - Handle errors gracefully, continue cleanup
fn shutdown(&mut self) -> Result<()> {
// Always try to stop motors, even if other cleanup fails
if let Err(e) = self.stop_motors() {
hlog!(error, "Failed to stop motors: {}", e);
// Continue with other cleanup anyway
}
// Close connections (non-critical)
if let Err(e) = self.close_connection() {
hlog!(warn, "Failed to close connection: {}", e);
}
Ok(())
}
Next Steps
- Learn about Topic and Pub/Sub for inter-node communication
- Understand the Scheduler for orchestrating nodes
- Explore Message Types for standard robotics data
- Read the API Reference for complete Node trait documentation