Node Lifecycle (Python)

A warehouse robot runs an ML detector at 30 Hz, a motor controller at 100 Hz, and a safety monitor that must stop the wheels within one cycle if the detector flags an obstacle. The motor controller needs a serial port opened before the first tick. The detector needs a model loaded into GPU memory. The safety monitor must send a zero-velocity command before the serial port closes. And when someone presses Ctrl+C, all of this must happen in the right order -- motors stop before sensors disconnect, the model releases GPU memory, and the log file flushes before the process exits.

In HORUS, every Python node follows the same lifecycle: construct, initialize, tick, shut down. The scheduler controls when each phase runs, guarantees the order, and catches exceptions at every stage. This page covers the complete lifecycle, the rules for each phase, and the patterns for managing state across them.

Lifecycle Overview

Every node transitions through well-defined states, managed by the scheduler:

Construction        Scheduler.run()           Ctrl+C / stop()
     |                    |                        |
     v                    v                        v
+-----------+    +---------------+    +---------+    +---------+
|  UNINIT   |--->| INITIALIZING  |--->| RUNNING |--->| STOPPING|---> STOPPED
+-----------+    +---------------+    +---------+    +---------+
                       |                   |
                       |  (init error)     |  (unrecoverable)
                       v                   v
                   +-------+          +---------+
                   | ERROR |<-------->| CRASHED |
                   +-------+          +---------+
                       |
                       | (recovery)
                       v
                   +---------+
                   | RUNNING |
                   +---------+

Phase summary:

PhaseCallbackCalledPurpose
ConstructionNode(...)Once, by your codeDeclare topics, set rate, wire callbacks
Initializationinit(node)Once, by the schedulerOpen hardware, load models, allocate buffers
Tick looptick(node)Repeatedly, at configured rateRead sensors, compute, publish results
Error handlingon_error(node, exception)On each exception in tick()Log, recover, or escalate
Shutdownshutdown(node)Once, by the schedulerStop motors, close files, release resources

NodeState Enum

The NodeState enum tracks the current lifecycle phase. Import and compare it directly:

from horus import NodeState

# Values
NodeState.UNINITIALIZED   # Created but scheduler hasn't started
NodeState.INITIALIZING    # init() is executing
NodeState.RUNNING         # Actively ticking
NodeState.STOPPING        # shutdown() is executing
NodeState.STOPPED         # Clean shutdown complete
NodeState.ERROR           # Recoverable error -- on_error() was called
NodeState.CRASHED         # Unrecoverable -- node removed from tick loop

NodeState values are strings, so direct comparison works:

if node.info.state == "running":
    node.log_info("Node is active")

Construction

Construction declares what the node does. No hardware is opened, no connections are made, no resources are allocated. That happens in init().

Function-Based (Closures)

The simplest pattern. Use module-level or closure variables for state:

import horus

count = 0

def my_init(node):
    node.log_info("Sensor starting")

def my_tick(node):
    global count
    count += 1
    node.send("heartbeat", {"tick": count})

def my_shutdown(node):
    node.log_info(f"Sensor stopped after {count} ticks")

sensor = horus.Node(
    name="sensor",
    tick=my_tick,
    init=my_init,
    shutdown=my_shutdown,
    pubs=["heartbeat"],
    rate=10,
)

horus.run(sensor)

This works well for simple nodes. The count variable lives in module scope and persists across ticks.

Class-Based (Bound Methods)

For nodes with complex state, use a class and pass its methods as callbacks:

import horus

class MotorController:
    def __init__(self):
        self.serial = None
        self.velocity = 0.0
        self.error_count = 0

    def init(self, node):
        import serial
        self.serial = serial.Serial("/dev/ttyUSB0", 115200)
        node.log_info("Motor serial port opened")

    def tick(self, node):
        if node.has_msg("cmd_vel"):
            cmd = node.recv("cmd_vel")
            self.velocity = cmd["linear"]

        self.serial.write(f"V{self.velocity:.2f}\n".encode())

    def on_error(self, node, exception):
        self.error_count += 1
        node.log_error(f"Motor error #{self.error_count}: {exception}")
        if self.error_count > 5:
            self.velocity = 0.0
            node.log_warning("Too many errors -- zeroing velocity")

    def shutdown(self, node):
        self.velocity = 0.0
        if self.serial:
            self.serial.write(b"V0.00\n")
            self.serial.close()
        node.log_info("Motor stopped safely")

motor = MotorController()
node = horus.Node(
    name="motor",
    tick=motor.tick,
    init=motor.init,
    shutdown=motor.shutdown,
    on_error=motor.on_error,
    subs=["cmd_vel"],
    rate=100,
    order=10,
)

horus.run(node)

The class instance (motor) owns all state. Bound methods (motor.tick) automatically have access to self, so no globals are needed.

When to Use Which

PatternBest forDrawback
Functions + closuresSimple nodes, prototyping, stateless transformsGlobal variables, harder to test in isolation
Functions + mutable containerModerate state (a dict, a counter)Manual discipline -- nothing enforces structure
Class with bound methodsComplex state, hardware drivers, ML modelsMore boilerplate

A practical rule: if your node needs more than two mutable variables across ticks, use a class.

Initialization: init(node)

Called once by the scheduler, before the first tick(). This is where you do work that is too slow or too risky for the tick loop.

def init(node):
    # Open hardware connections
    node.serial = serial.Serial("/dev/ttyUSB0", 115200)

    # Load ML models (may take seconds)
    node.model = load_yolo_model("yolov8n.pt")

    # Pre-allocate buffers (avoid allocation in tick)
    node.buffer = bytearray(4096)

    # Set actuators to known-safe state
    node.velocity = 0.0

    node.log_info("Controller initialized")

Rules for init():

  • Do open hardware here. Serial ports, cameras, GPIO pins -- all should be opened in init(), not during construction. Construction happens at import time; init() happens when the scheduler is ready to run.
  • Do pre-allocate. Create lists, arrays, buffers, and model inference sessions here. Allocation during tick() causes jitter.
  • Do set safe defaults. Motors at zero velocity, grippers open, heaters off. If the system shuts down before the first tick, shutdown() should find everything in a safe state.
  • Do not start background threads. The scheduler manages execution. Background threads fight the scheduler for CPU time and break deterministic ordering.
  • If init() raises an exception, the node enters the Error state and its failure_policy is applied. Other nodes continue to initialize and run.

Lazy Initialization

init() is called lazily -- when scheduler.run() or horus.run() starts, not when the node is constructed. This means:

# Construction happens here -- no hardware touched
motor = horus.Node(name="motor", tick=motor_tick, init=motor_init, ...)

# ... possibly minutes later ...

# init() runs here -- hardware is opened
horus.run(motor)

This is deliberate. You can construct and configure all nodes before any hardware is touched. The scheduler finalizes its own configuration (clock, RT settings, recording) before calling any node's init().

The Tick Loop: tick(node)

The scheduler calls tick() repeatedly at the configured rate. This is your main logic -- read sensors, compute, publish results.

def tick(node):
    # Read all subscribed topics
    if node.has_msg("scan"):
        scan = node.recv("scan")
        obstacles = detect_obstacles(scan)

        if obstacles:
            cmd = compute_avoidance(obstacles)
        else:
            cmd = {"linear": 1.0, "angular": 0.0}

        node.send("cmd_vel", cmd)

Rules for tick()

Keep it fast. The scheduler monitors how long tick() takes. If it exceeds the node's budget, that is a deadline miss -- which can trigger safety responses. For a 100 Hz node, each tick has 10 ms. Stay well under.

Never block on I/O. Do not read files, make HTTP requests, or wait on sockets inside tick(). These operations can take milliseconds to seconds, stalling the entire tick cycle. For I/O-heavy work, use an async tick function -- it auto-detects and runs on the async I/O executor:

import aiohttp

async def fetch_tick(node):
    async with aiohttp.ClientSession() as session:
        async with session.get("http://api.weather.com/data") as resp:
            data = await resp.json()
            node.send("weather", data)

# async def auto-detected -- runs on async executor, not the main tick loop
node = horus.Node(tick=fetch_tick, pubs=["weather"], rate=0.1)

Never call time.sleep(). The scheduler controls timing. Sleeping inside tick() wastes the node's entire time budget and delays every node that runs after it.

Always drain your topics. Call recv() on every subscribed topic every tick, even if you do not need the data. Unread messages pile up in the ring buffer, and when you finally read them, you get stale data from potentially seconds ago:

def tick(node):
    # CORRECT: always drain, even if you only act on the latest
    scan = node.recv("scan")
    odom = node.recv("odom")

    if scan is not None:
        process(scan)

Do not allocate in the hot path. Creating lists, dicts, or large objects in tick() triggers Python's allocator, which can add milliseconds of jitter. Pre-allocate in init() and reuse:

class Processor:
    def __init__(self):
        self.buffer = [0.0] * 360  # Pre-allocate in constructor

    def init(self, node):
        self.result = {"ranges": self.buffer, "count": 0}

    def tick(self, node):
        scan = node.recv("scan")
        if scan:
            # Reuse pre-allocated buffer
            for i, r in enumerate(scan["ranges"]):
                self.buffer[i] = r * 0.01
            self.result["count"] += 1
            node.send("processed", self.result)

Shutdown: shutdown(node)

Called once when the scheduler stops -- whether from Ctrl+C, SIGINT, SIGTERM, node.request_stop(), or scheduler.stop(). Nodes shut down in reverse order: the last-added node shuts down first.

def shutdown(node):
    # 1. Stop actuators FIRST
    node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})

    # 2. Close hardware connections
    if hasattr(node, "serial") and node.serial:
        node.serial.close()

    # 3. Flush and close files
    if hasattr(node, "log_file") and node.log_file:
        node.log_file.flush()
        node.log_file.close()

    node.log_info("Motor controller shut down safely")

Rules for shutdown():

  • Stop actuators before closing connections. Send zero velocity before dropping the serial port. Otherwise, the motor holds its last commanded velocity.
  • Never raise exceptions. If one cleanup step fails, log and continue. Do not let one failure prevent other cleanup:
def shutdown(node):
    try:
        send_stop_command()
    except Exception as e:
        node.log_error(f"Failed to stop motors: {e}")
        # Continue cleanup -- don't return early

    try:
        close_serial_port()
    except Exception as e:
        node.log_warning(f"Failed to close serial: {e}")
  • Do not assume tick() ran. The system may shut down between init() and the first tick(). Your shutdown code must handle hardware that was opened but never used.

Reverse-Order Shutdown

Nodes are typically added in dependency order: sensors first, then controllers, then loggers. Reverse-order shutdown means controllers stop motors before sensors disconnect:

sensor = horus.Node(name="sensor", ...)      # Added first, shuts down last
controller = horus.Node(name="controller", ...)  # Shuts down second
logger = horus.Node(name="logger", ...)      # Added last, shuts down first

horus.run(sensor, controller, logger)
# Shutdown order: logger -> controller -> sensor

This prevents the scenario where a sensor disconnects while the motor controller is still running with its last received command.

Error Handling: on_error(node, exception)

When tick() raises an exception, the scheduler catches it and calls on_error() if provided. The callback receives both the node and the exception:

def on_error(node, exception):
    node.log_error(f"Tick failed: {exception}")

    # Track consecutive errors
    if not hasattr(node, "_error_count"):
        node._error_count = 0
    node._error_count += 1

    if node._error_count > 10:
        node.log_error("Too many consecutive errors -- requesting stop")
        node.request_stop()

How error escalation works:

  1. tick() raises an exception
  2. If on_error is set, it is called with (node, exception)
  3. If on_error handles the exception (does not re-raise), the node continues ticking
  4. If on_error itself raises, or if no on_error is set, the exception propagates to the Rust FailurePolicy
  5. The FailurePolicy decides what happens next:
PolicyBehavior
"fatal" (default)Stop the entire scheduler
"restart"Re-initialize and resume the node
"skip"Skip this tick, continue with next
"ignore"Silently continue
# Node with custom error handling + skip policy as fallback
node = horus.Node(
    name="detector",
    tick=detect_objects,
    on_error=handle_detection_error,
    failure_policy="skip",  # If on_error also fails, skip this tick
    rate=30,
)

When to use on_error vs failure_policy: Use on_error when you want to inspect the exception, track error counts, or take corrective action in Python. Use failure_policy as a safety net for unhandled exceptions. They compose -- on_error runs first, and failure_policy only fires if the exception is not handled.

Topic Operations

All topic communication happens through the node object passed to your callbacks. Topics must be declared at construction time via pubs and subs.

node.send(topic, data)

Publish a message. Non-blocking. If the ring buffer is full, the oldest unread message is overwritten.

# Dict (generic -- MessagePack serialization)
node.send("status", {"battery": 85.2, "state": "running"})

# Typed message (zero-copy POD transport)
from horus import CmdVel
node.send("cmd_vel", CmdVel(linear=1.0, angular=0.5))

# Primitive
node.send("temperature", 22.5)

node.recv(topic)

Receive one message (FIFO order). Returns None if no messages are available.

msg = node.recv("scan")
if msg is not None:
    process(msg)

node.recv_all(topic)

Drain all available messages as a list. Returns an empty list if none are available. Use this for batch processing when every message matters:

def tick(node):
    commands = node.recv_all("commands")
    for cmd in commands:
        execute(cmd)
    node.log_debug(f"Processed {len(commands)} commands this tick")

node.has_msg(topic)

Check if at least one message is available without consuming it. The message is buffered internally and returned by the next recv() call.

def tick(node):
    if node.has_msg("emergency_stop"):
        stop = node.recv("emergency_stop")
        node.log_warning("Emergency stop received")
        node.request_stop()

Multi-Topic Pattern

When your node subscribes to multiple topics, always drain all of them every tick. Cache the latest value from each and process when all are available:

class SensorFusion:
    def __init__(self):
        self.last_imu = None
        self.last_odom = None

    def tick(self, node):
        # ALWAYS drain both topics -- even if you only act when both are present
        imu = node.recv("imu")
        odom = node.recv("odom")

        if imu is not None:
            self.last_imu = imu
        if odom is not None:
            self.last_odom = odom

        if self.last_imu and self.last_odom:
            fused = self.fuse(self.last_imu, self.last_odom)
            node.send("pose", fused)

Logging

HORUS provides structured logging through the node object. Log messages are routed to the HORUS logging system, which integrates with the TUI monitor and CLI tools.

def tick(node):
    node.log_info("Processing frame")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to connect to motor")
    node.log_debug(f"Raw value: {value}")
MethodLevelUse for
node.log_debug(msg)DEBUGVerbose diagnostics, variable dumps
node.log_info(msg)INFONormal operation milestones
node.log_warning(msg)WARNINGDegraded operation, stale data
node.log_error(msg)ERRORFailures that affect behavior

Logging only works during lifecycle callbacks -- inside init(), tick(), shutdown(), and on_error(). Calling logging methods outside the scheduler produces a RuntimeWarning and the message is silently dropped:

node = horus.Node(name="test", tick=my_tick, pubs=["data"], rate=10)

# This is dropped with a RuntimeWarning -- scheduler is not running yet
node.log_info("This will not appear")

# This works -- called inside tick() by the scheduler
def my_tick(node):
    node.log_info("This appears in the log")

This restriction exists because the HORUS logging system requires the scheduler's runtime context (timestamps, node identity, routing) to function. Outside the scheduler, that context does not exist.

NodeInfo: Runtime Metrics

During init(), tick(), and shutdown(), the scheduler attaches a NodeInfo object to node.info. It provides scheduler-managed metrics:

Method/PropertyReturnsDescription
node.info.namestrNode name
node.info.statestrCurrent NodeState value
node.info.tick_count()intTotal ticks executed
node.info.error_count()intTotal errors encountered
node.info.successful_ticks()intTicks that completed without error
node.info.avg_tick_duration_ms()floatAverage tick execution time
node.info.get_uptime()floatSeconds since init() completed
node.info.get_metrics()dictFull metrics snapshot
node.info.request_stop()--Signal the scheduler to stop
def tick(node):
    # Periodic health report
    if node.info.tick_count() % 1000 == 0:
        metrics = node.info.get_metrics()
        node.log_info(
            f"Health: {node.info.tick_count()} ticks, "
            f"avg={node.info.avg_tick_duration_ms():.2f}ms, "
            f"errors={node.info.error_count()}"
        )

    # Self-monitoring: stop if error rate is too high
    total = node.info.tick_count()
    if total > 100 and node.info.error_count() / total > 0.1:
        node.log_error("Error rate exceeded 10% -- shutting down")
        node.info.request_stop()

node.info is None outside of lifecycle callbacks. Do not cache it -- the scheduler replaces it each tick with updated metrics.

Scheduler Context Manager

The Scheduler (not Node) supports Python's with statement for automatic cleanup. When the with block exits -- whether normally or via exception -- stop() is called automatically:

import horus

with horus.Scheduler(tick_rate=100) as sched:
    sched.add(horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0, pubs=["data"]))
    sched.add(horus.Node(name="ctrl", tick=ctrl_fn, rate=100, order=1, subs=["data"]))
    sched.run(duration=10.0)
# stop() called automatically here -- all shutdown() callbacks run

This is equivalent to wrapping the scheduler in a try/finally block. Use it in scripts and tests to ensure clean shutdown even when exceptions occur.

For most applications, horus.run() is simpler:

# horus.run() handles scheduler creation and cleanup internally
horus.run(sensor, controller, logger, duration=10.0)

Complete Example: ML Object Detector

This example ties every lifecycle phase together -- a real-world pattern where an ML model processes camera frames and publishes detection results:

import horus

class ObjectDetector:
    def __init__(self):
        self.model = None
        self.frame_count = 0
        self.detection_count = 0
        self.log_file = None

    def init(self, node):
        # Load model (slow -- do it here, not in tick)
        import torch
        self.model = torch.hub.load("ultralytics/yolov5", "yolov5s")
        self.model.eval()

        # Open log file
        self.log_file = open("/tmp/detections.csv", "w")
        self.log_file.write("frame,num_detections,latency_ms\n")

        node.log_info("YOLOv5 model loaded, logging to /tmp/detections.csv")

    def tick(self, node):
        frame = node.recv("camera.rgb")
        if frame is None:
            return

        self.frame_count += 1

        # Run inference
        import time
        t0 = time.monotonic()
        results = self.model(frame["data"])
        latency = (time.monotonic() - t0) * 1000

        detections = results.xyxy[0].tolist()
        self.detection_count += len(detections)

        # Publish results
        node.send("detections", {
            "frame": self.frame_count,
            "objects": detections,
            "latency_ms": latency,
        })

        # Log to file
        self.log_file.write(f"{self.frame_count},{len(detections)},{latency:.1f}\n")

        if latency > 50:
            node.log_warning(f"Inference slow: {latency:.1f}ms")

    def on_error(self, node, exception):
        node.log_error(f"Detection failed on frame {self.frame_count}: {exception}")
        # Continue -- skip this frame, try the next one

    def shutdown(self, node):
        if self.log_file:
            self.log_file.flush()
            self.log_file.close()

        node.log_info(
            f"Detector stopped: {self.frame_count} frames, "
            f"{self.detection_count} detections"
        )

detector = ObjectDetector()
node = horus.Node(
    name="detector",
    tick=detector.tick,
    init=detector.init,
    shutdown=detector.shutdown,
    on_error=detector.on_error,
    subs=["camera.rgb"],
    pubs=["detections"],
    rate=30,
    order=50,
    failure_policy="skip",  # Skip frame on unhandled error
)

horus.run(node)

Design Decisions

Why callbacks instead of a base class? A base class (class MyNode(horus.Node)) would require every node to be a class, even trivial ones. Callbacks let you use plain functions for simple nodes and classes for complex ones. The same Node constructor handles both -- pass a function or a bound method.

Why does init() run lazily instead of at construction? Construction happens when your Python module loads. init() runs when the scheduler starts. This separation means you can construct all nodes, configure the scheduler, and defer hardware initialization until the system is actually ready. It also means init() can depend on the scheduler's configuration (clock source, RT settings) being finalized.

Why does on_error receive (node, exception) instead of just (exception)? The node parameter gives your error handler access to node.log_error(), node.request_stop(), node.info, and topic operations. Without it, error handlers would need to capture the node via closure -- forcing every error handler to be a closure or bound method, even for simple "log and continue" cases.

Why is logging restricted to lifecycle callbacks? HORUS logging integrates with the scheduler's runtime context -- timestamps, node identity, log routing, and the TUI monitor. Outside the scheduler, that context does not exist. Rather than silently producing broken log entries, HORUS warns you that the message was dropped. Use Python's standard print() or logging module for messages outside the scheduler.

Why reverse-order shutdown instead of parallel? Parallel shutdown is faster but introduces race conditions. If the sensor and motor controller shut down simultaneously, the motor controller might try to read one last sensor value from a topic that has already been deallocated. Reverse-order shutdown is deterministic: controllers always stop before the sensors they depend on.

Trade-offs

GainCost
Callback-based API -- functions and classes both work without boilerplateNo static type checking on callback signatures (Python limitation)
Lazy initialization -- hardware only opens when the system startsMust handle the case where init() succeeds but tick() never runs
Structured error handling -- on_error + failure_policy composeTwo layers to understand (Python callback + Rust policy)
Guaranteed shutdown -- runs even on Ctrl+C and SIGTERMMust implement shutdown() for every node with hardware
Reverse-order shutdown -- deterministic dependency cleanupCannot shut down independent nodes in parallel
Logging only in callbacks -- consistent timestamps and routingCannot use node.log_*() for setup messages before run()

See Also