Node Lifecycle (Python)
A warehouse robot runs an ML detector at 30 Hz, a motor controller at 100 Hz, and a safety monitor that must stop the wheels within one cycle if the detector flags an obstacle. The motor controller needs a serial port opened before the first tick. The detector needs a model loaded into GPU memory. The safety monitor must send a zero-velocity command before the serial port closes. And when someone presses Ctrl+C, all of this must happen in the right order -- motors stop before sensors disconnect, the model releases GPU memory, and the log file flushes before the process exits.
In HORUS, every Python node follows the same lifecycle: construct, initialize, tick, shut down. The scheduler controls when each phase runs, guarantees the order, and catches exceptions at every stage. This page covers the complete lifecycle, the rules for each phase, and the patterns for managing state across them.
Lifecycle Overview
Every node transitions through well-defined states, managed by the scheduler:
Construction Scheduler.run() Ctrl+C / stop()
| | |
v v v
+-----------+ +---------------+ +---------+ +---------+
| UNINIT |--->| INITIALIZING |--->| RUNNING |--->| STOPPING|---> STOPPED
+-----------+ +---------------+ +---------+ +---------+
| |
| (init error) | (unrecoverable)
v v
+-------+ +---------+
| ERROR |<-------->| CRASHED |
+-------+ +---------+
|
| (recovery)
v
+---------+
| RUNNING |
+---------+
Phase summary:
| Phase | Callback | Called | Purpose |
|---|---|---|---|
| Construction | Node(...) | Once, by your code | Declare topics, set rate, wire callbacks |
| Initialization | init(node) | Once, by the scheduler | Open hardware, load models, allocate buffers |
| Tick loop | tick(node) | Repeatedly, at configured rate | Read sensors, compute, publish results |
| Error handling | on_error(node, exception) | On each exception in tick() | Log, recover, or escalate |
| Shutdown | shutdown(node) | Once, by the scheduler | Stop motors, close files, release resources |
NodeState Enum
The NodeState enum tracks the current lifecycle phase. Import and compare it directly:
from horus import NodeState
# Values
NodeState.UNINITIALIZED # Created but scheduler hasn't started
NodeState.INITIALIZING # init() is executing
NodeState.RUNNING # Actively ticking
NodeState.STOPPING # shutdown() is executing
NodeState.STOPPED # Clean shutdown complete
NodeState.ERROR # Recoverable error -- on_error() was called
NodeState.CRASHED # Unrecoverable -- node removed from tick loop
NodeState values are strings, so direct comparison works:
if node.info.state == "running":
node.log_info("Node is active")
Construction
Construction declares what the node does. No hardware is opened, no connections are made, no resources are allocated. That happens in init().
Function-Based (Closures)
The simplest pattern. Use module-level or closure variables for state:
import horus
count = 0
def my_init(node):
node.log_info("Sensor starting")
def my_tick(node):
global count
count += 1
node.send("heartbeat", {"tick": count})
def my_shutdown(node):
node.log_info(f"Sensor stopped after {count} ticks")
sensor = horus.Node(
name="sensor",
tick=my_tick,
init=my_init,
shutdown=my_shutdown,
pubs=["heartbeat"],
rate=10,
)
horus.run(sensor)
This works well for simple nodes. The count variable lives in module scope and persists across ticks.
Class-Based (Bound Methods)
For nodes with complex state, use a class and pass its methods as callbacks:
import horus
class MotorController:
def __init__(self):
self.serial = None
self.velocity = 0.0
self.error_count = 0
def init(self, node):
import serial
self.serial = serial.Serial("/dev/ttyUSB0", 115200)
node.log_info("Motor serial port opened")
def tick(self, node):
if node.has_msg("cmd_vel"):
cmd = node.recv("cmd_vel")
self.velocity = cmd["linear"]
self.serial.write(f"V{self.velocity:.2f}\n".encode())
def on_error(self, node, exception):
self.error_count += 1
node.log_error(f"Motor error #{self.error_count}: {exception}")
if self.error_count > 5:
self.velocity = 0.0
node.log_warning("Too many errors -- zeroing velocity")
def shutdown(self, node):
self.velocity = 0.0
if self.serial:
self.serial.write(b"V0.00\n")
self.serial.close()
node.log_info("Motor stopped safely")
motor = MotorController()
node = horus.Node(
name="motor",
tick=motor.tick,
init=motor.init,
shutdown=motor.shutdown,
on_error=motor.on_error,
subs=["cmd_vel"],
rate=100,
order=10,
)
horus.run(node)
The class instance (motor) owns all state. Bound methods (motor.tick) automatically have access to self, so no globals are needed.
When to Use Which
| Pattern | Best for | Drawback |
|---|---|---|
| Functions + closures | Simple nodes, prototyping, stateless transforms | Global variables, harder to test in isolation |
| Functions + mutable container | Moderate state (a dict, a counter) | Manual discipline -- nothing enforces structure |
| Class with bound methods | Complex state, hardware drivers, ML models | More boilerplate |
A practical rule: if your node needs more than two mutable variables across ticks, use a class.
Initialization: init(node)
Called once by the scheduler, before the first tick(). This is where you do work that is too slow or too risky for the tick loop.
def init(node):
# Open hardware connections
node.serial = serial.Serial("/dev/ttyUSB0", 115200)
# Load ML models (may take seconds)
node.model = load_yolo_model("yolov8n.pt")
# Pre-allocate buffers (avoid allocation in tick)
node.buffer = bytearray(4096)
# Set actuators to known-safe state
node.velocity = 0.0
node.log_info("Controller initialized")
Rules for init():
- Do open hardware here. Serial ports, cameras, GPIO pins -- all should be opened in
init(), not during construction. Construction happens at import time;init()happens when the scheduler is ready to run. - Do pre-allocate. Create lists, arrays, buffers, and model inference sessions here. Allocation during
tick()causes jitter. - Do set safe defaults. Motors at zero velocity, grippers open, heaters off. If the system shuts down before the first tick,
shutdown()should find everything in a safe state. - Do not start background threads. The scheduler manages execution. Background threads fight the scheduler for CPU time and break deterministic ordering.
- If init() raises an exception, the node enters the Error state and its
failure_policyis applied. Other nodes continue to initialize and run.
Lazy Initialization
init() is called lazily -- when scheduler.run() or horus.run() starts, not when the node is constructed. This means:
# Construction happens here -- no hardware touched
motor = horus.Node(name="motor", tick=motor_tick, init=motor_init, ...)
# ... possibly minutes later ...
# init() runs here -- hardware is opened
horus.run(motor)
This is deliberate. You can construct and configure all nodes before any hardware is touched. The scheduler finalizes its own configuration (clock, RT settings, recording) before calling any node's init().
The Tick Loop: tick(node)
The scheduler calls tick() repeatedly at the configured rate. This is your main logic -- read sensors, compute, publish results.
def tick(node):
# Read all subscribed topics
if node.has_msg("scan"):
scan = node.recv("scan")
obstacles = detect_obstacles(scan)
if obstacles:
cmd = compute_avoidance(obstacles)
else:
cmd = {"linear": 1.0, "angular": 0.0}
node.send("cmd_vel", cmd)
Rules for tick()
Keep it fast. The scheduler monitors how long tick() takes. If it exceeds the node's budget, that is a deadline miss -- which can trigger safety responses. For a 100 Hz node, each tick has 10 ms. Stay well under.
Never block on I/O. Do not read files, make HTTP requests, or wait on sockets inside tick(). These operations can take milliseconds to seconds, stalling the entire tick cycle. For I/O-heavy work, use an async tick function -- it auto-detects and runs on the async I/O executor:
import aiohttp
async def fetch_tick(node):
async with aiohttp.ClientSession() as session:
async with session.get("http://api.weather.com/data") as resp:
data = await resp.json()
node.send("weather", data)
# async def auto-detected -- runs on async executor, not the main tick loop
node = horus.Node(tick=fetch_tick, pubs=["weather"], rate=0.1)
Never call time.sleep(). The scheduler controls timing. Sleeping inside tick() wastes the node's entire time budget and delays every node that runs after it.
Always drain your topics. Call recv() on every subscribed topic every tick, even if you do not need the data. Unread messages pile up in the ring buffer, and when you finally read them, you get stale data from potentially seconds ago:
def tick(node):
# CORRECT: always drain, even if you only act on the latest
scan = node.recv("scan")
odom = node.recv("odom")
if scan is not None:
process(scan)
Do not allocate in the hot path. Creating lists, dicts, or large objects in tick() triggers Python's allocator, which can add milliseconds of jitter. Pre-allocate in init() and reuse:
class Processor:
def __init__(self):
self.buffer = [0.0] * 360 # Pre-allocate in constructor
def init(self, node):
self.result = {"ranges": self.buffer, "count": 0}
def tick(self, node):
scan = node.recv("scan")
if scan:
# Reuse pre-allocated buffer
for i, r in enumerate(scan["ranges"]):
self.buffer[i] = r * 0.01
self.result["count"] += 1
node.send("processed", self.result)
Shutdown: shutdown(node)
Called once when the scheduler stops -- whether from Ctrl+C, SIGINT, SIGTERM, node.request_stop(), or scheduler.stop(). Nodes shut down in reverse order: the last-added node shuts down first.
def shutdown(node):
# 1. Stop actuators FIRST
node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
# 2. Close hardware connections
if hasattr(node, "serial") and node.serial:
node.serial.close()
# 3. Flush and close files
if hasattr(node, "log_file") and node.log_file:
node.log_file.flush()
node.log_file.close()
node.log_info("Motor controller shut down safely")
Rules for shutdown():
- Stop actuators before closing connections. Send zero velocity before dropping the serial port. Otherwise, the motor holds its last commanded velocity.
- Never raise exceptions. If one cleanup step fails, log and continue. Do not let one failure prevent other cleanup:
def shutdown(node):
try:
send_stop_command()
except Exception as e:
node.log_error(f"Failed to stop motors: {e}")
# Continue cleanup -- don't return early
try:
close_serial_port()
except Exception as e:
node.log_warning(f"Failed to close serial: {e}")
- Do not assume tick() ran. The system may shut down between
init()and the firsttick(). Your shutdown code must handle hardware that was opened but never used.
Reverse-Order Shutdown
Nodes are typically added in dependency order: sensors first, then controllers, then loggers. Reverse-order shutdown means controllers stop motors before sensors disconnect:
sensor = horus.Node(name="sensor", ...) # Added first, shuts down last
controller = horus.Node(name="controller", ...) # Shuts down second
logger = horus.Node(name="logger", ...) # Added last, shuts down first
horus.run(sensor, controller, logger)
# Shutdown order: logger -> controller -> sensor
This prevents the scenario where a sensor disconnects while the motor controller is still running with its last received command.
Error Handling: on_error(node, exception)
When tick() raises an exception, the scheduler catches it and calls on_error() if provided. The callback receives both the node and the exception:
def on_error(node, exception):
node.log_error(f"Tick failed: {exception}")
# Track consecutive errors
if not hasattr(node, "_error_count"):
node._error_count = 0
node._error_count += 1
if node._error_count > 10:
node.log_error("Too many consecutive errors -- requesting stop")
node.request_stop()
How error escalation works:
tick()raises an exception- If
on_erroris set, it is called with(node, exception) - If
on_errorhandles the exception (does not re-raise), the node continues ticking - If
on_erroritself raises, or if noon_erroris set, the exception propagates to the RustFailurePolicy - The
FailurePolicydecides what happens next:
| Policy | Behavior |
|---|---|
"fatal" (default) | Stop the entire scheduler |
"restart" | Re-initialize and resume the node |
"skip" | Skip this tick, continue with next |
"ignore" | Silently continue |
# Node with custom error handling + skip policy as fallback
node = horus.Node(
name="detector",
tick=detect_objects,
on_error=handle_detection_error,
failure_policy="skip", # If on_error also fails, skip this tick
rate=30,
)
When to use on_error vs failure_policy: Use on_error when you want to inspect the exception, track error counts, or take corrective action in Python. Use failure_policy as a safety net for unhandled exceptions. They compose -- on_error runs first, and failure_policy only fires if the exception is not handled.
Topic Operations
All topic communication happens through the node object passed to your callbacks. Topics must be declared at construction time via pubs and subs.
node.send(topic, data)
Publish a message. Non-blocking. If the ring buffer is full, the oldest unread message is overwritten.
# Dict (generic -- MessagePack serialization)
node.send("status", {"battery": 85.2, "state": "running"})
# Typed message (zero-copy POD transport)
from horus import CmdVel
node.send("cmd_vel", CmdVel(linear=1.0, angular=0.5))
# Primitive
node.send("temperature", 22.5)
node.recv(topic)
Receive one message (FIFO order). Returns None if no messages are available.
msg = node.recv("scan")
if msg is not None:
process(msg)
node.recv_all(topic)
Drain all available messages as a list. Returns an empty list if none are available. Use this for batch processing when every message matters:
def tick(node):
commands = node.recv_all("commands")
for cmd in commands:
execute(cmd)
node.log_debug(f"Processed {len(commands)} commands this tick")
node.has_msg(topic)
Check if at least one message is available without consuming it. The message is buffered internally and returned by the next recv() call.
def tick(node):
if node.has_msg("emergency_stop"):
stop = node.recv("emergency_stop")
node.log_warning("Emergency stop received")
node.request_stop()
Multi-Topic Pattern
When your node subscribes to multiple topics, always drain all of them every tick. Cache the latest value from each and process when all are available:
class SensorFusion:
def __init__(self):
self.last_imu = None
self.last_odom = None
def tick(self, node):
# ALWAYS drain both topics -- even if you only act when both are present
imu = node.recv("imu")
odom = node.recv("odom")
if imu is not None:
self.last_imu = imu
if odom is not None:
self.last_odom = odom
if self.last_imu and self.last_odom:
fused = self.fuse(self.last_imu, self.last_odom)
node.send("pose", fused)
Logging
HORUS provides structured logging through the node object. Log messages are routed to the HORUS logging system, which integrates with the TUI monitor and CLI tools.
def tick(node):
node.log_info("Processing frame")
node.log_warning("Sensor reading is stale")
node.log_error("Failed to connect to motor")
node.log_debug(f"Raw value: {value}")
| Method | Level | Use for |
|---|---|---|
node.log_debug(msg) | DEBUG | Verbose diagnostics, variable dumps |
node.log_info(msg) | INFO | Normal operation milestones |
node.log_warning(msg) | WARNING | Degraded operation, stale data |
node.log_error(msg) | ERROR | Failures that affect behavior |
Logging only works during lifecycle callbacks -- inside init(), tick(), shutdown(), and on_error(). Calling logging methods outside the scheduler produces a RuntimeWarning and the message is silently dropped:
node = horus.Node(name="test", tick=my_tick, pubs=["data"], rate=10)
# This is dropped with a RuntimeWarning -- scheduler is not running yet
node.log_info("This will not appear")
# This works -- called inside tick() by the scheduler
def my_tick(node):
node.log_info("This appears in the log")
This restriction exists because the HORUS logging system requires the scheduler's runtime context (timestamps, node identity, routing) to function. Outside the scheduler, that context does not exist.
NodeInfo: Runtime Metrics
During init(), tick(), and shutdown(), the scheduler attaches a NodeInfo object to node.info. It provides scheduler-managed metrics:
| Method/Property | Returns | Description |
|---|---|---|
node.info.name | str | Node name |
node.info.state | str | Current NodeState value |
node.info.tick_count() | int | Total ticks executed |
node.info.error_count() | int | Total errors encountered |
node.info.successful_ticks() | int | Ticks that completed without error |
node.info.avg_tick_duration_ms() | float | Average tick execution time |
node.info.get_uptime() | float | Seconds since init() completed |
node.info.get_metrics() | dict | Full metrics snapshot |
node.info.request_stop() | -- | Signal the scheduler to stop |
def tick(node):
# Periodic health report
if node.info.tick_count() % 1000 == 0:
metrics = node.info.get_metrics()
node.log_info(
f"Health: {node.info.tick_count()} ticks, "
f"avg={node.info.avg_tick_duration_ms():.2f}ms, "
f"errors={node.info.error_count()}"
)
# Self-monitoring: stop if error rate is too high
total = node.info.tick_count()
if total > 100 and node.info.error_count() / total > 0.1:
node.log_error("Error rate exceeded 10% -- shutting down")
node.info.request_stop()
node.info is None outside of lifecycle callbacks. Do not cache it -- the scheduler replaces it each tick with updated metrics.
Scheduler Context Manager
The Scheduler (not Node) supports Python's with statement for automatic cleanup. When the with block exits -- whether normally or via exception -- stop() is called automatically:
import horus
with horus.Scheduler(tick_rate=100) as sched:
sched.add(horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0, pubs=["data"]))
sched.add(horus.Node(name="ctrl", tick=ctrl_fn, rate=100, order=1, subs=["data"]))
sched.run(duration=10.0)
# stop() called automatically here -- all shutdown() callbacks run
This is equivalent to wrapping the scheduler in a try/finally block. Use it in scripts and tests to ensure clean shutdown even when exceptions occur.
For most applications, horus.run() is simpler:
# horus.run() handles scheduler creation and cleanup internally
horus.run(sensor, controller, logger, duration=10.0)
Complete Example: ML Object Detector
This example ties every lifecycle phase together -- a real-world pattern where an ML model processes camera frames and publishes detection results:
import horus
class ObjectDetector:
def __init__(self):
self.model = None
self.frame_count = 0
self.detection_count = 0
self.log_file = None
def init(self, node):
# Load model (slow -- do it here, not in tick)
import torch
self.model = torch.hub.load("ultralytics/yolov5", "yolov5s")
self.model.eval()
# Open log file
self.log_file = open("/tmp/detections.csv", "w")
self.log_file.write("frame,num_detections,latency_ms\n")
node.log_info("YOLOv5 model loaded, logging to /tmp/detections.csv")
def tick(self, node):
frame = node.recv("camera.rgb")
if frame is None:
return
self.frame_count += 1
# Run inference
import time
t0 = time.monotonic()
results = self.model(frame["data"])
latency = (time.monotonic() - t0) * 1000
detections = results.xyxy[0].tolist()
self.detection_count += len(detections)
# Publish results
node.send("detections", {
"frame": self.frame_count,
"objects": detections,
"latency_ms": latency,
})
# Log to file
self.log_file.write(f"{self.frame_count},{len(detections)},{latency:.1f}\n")
if latency > 50:
node.log_warning(f"Inference slow: {latency:.1f}ms")
def on_error(self, node, exception):
node.log_error(f"Detection failed on frame {self.frame_count}: {exception}")
# Continue -- skip this frame, try the next one
def shutdown(self, node):
if self.log_file:
self.log_file.flush()
self.log_file.close()
node.log_info(
f"Detector stopped: {self.frame_count} frames, "
f"{self.detection_count} detections"
)
detector = ObjectDetector()
node = horus.Node(
name="detector",
tick=detector.tick,
init=detector.init,
shutdown=detector.shutdown,
on_error=detector.on_error,
subs=["camera.rgb"],
pubs=["detections"],
rate=30,
order=50,
failure_policy="skip", # Skip frame on unhandled error
)
horus.run(node)
Design Decisions
Why callbacks instead of a base class?
A base class (class MyNode(horus.Node)) would require every node to be a class, even trivial ones. Callbacks let you use plain functions for simple nodes and classes for complex ones. The same Node constructor handles both -- pass a function or a bound method.
Why does init() run lazily instead of at construction?
Construction happens when your Python module loads. init() runs when the scheduler starts. This separation means you can construct all nodes, configure the scheduler, and defer hardware initialization until the system is actually ready. It also means init() can depend on the scheduler's configuration (clock source, RT settings) being finalized.
Why does on_error receive (node, exception) instead of just (exception)?
The node parameter gives your error handler access to node.log_error(), node.request_stop(), node.info, and topic operations. Without it, error handlers would need to capture the node via closure -- forcing every error handler to be a closure or bound method, even for simple "log and continue" cases.
Why is logging restricted to lifecycle callbacks?
HORUS logging integrates with the scheduler's runtime context -- timestamps, node identity, log routing, and the TUI monitor. Outside the scheduler, that context does not exist. Rather than silently producing broken log entries, HORUS warns you that the message was dropped. Use Python's standard print() or logging module for messages outside the scheduler.
Why reverse-order shutdown instead of parallel? Parallel shutdown is faster but introduces race conditions. If the sensor and motor controller shut down simultaneously, the motor controller might try to read one last sensor value from a topic that has already been deallocated. Reverse-order shutdown is deterministic: controllers always stop before the sensors they depend on.
Trade-offs
| Gain | Cost |
|---|---|
| Callback-based API -- functions and classes both work without boilerplate | No static type checking on callback signatures (Python limitation) |
| Lazy initialization -- hardware only opens when the system starts | Must handle the case where init() succeeds but tick() never runs |
Structured error handling -- on_error + failure_policy compose | Two layers to understand (Python callback + Rust policy) |
| Guaranteed shutdown -- runs even on Ctrl+C and SIGTERM | Must implement shutdown() for every node with hardware |
| Reverse-order shutdown -- deterministic dependency cleanup | Cannot shut down independent nodes in parallel |
| Logging only in callbacks -- consistent timestamps and routing | Cannot use node.log_*() for setup messages before run() |
See Also
- Python API Reference -- Complete Node, Scheduler, and Topic API
- Python Bindings -- Full Python API with Scheduler, typed messages, and multiprocess
- Nodes -- Full Reference -- Rust Node trait, lifecycle, and safety mechanisms
- Scheduler -- Full Reference -- Execution model, RT enforcement, and deterministic mode
- Execution Classes -- How
rate,compute,on, andasyncmap to executors