HORUS Python Bindings

Production-Ready Python API for the HORUS robotics framework - combines simplicity with advanced features for professional robotics applications.

Why HORUS Python?

  • Zero Boilerplate: Working node in 10 lines
  • Flexible API: Functional style or class inheritance - your choice
  • Production Performance: ~500ns latency (same shared memory as Rust)
  • Per-Node Rate Control: Different nodes at different frequencies (100Hz sensor, 10Hz logger)
  • Message Timestamps: Typed messages include timestamp_ns for timing
  • Typed Messages: Optional type-safe messages from Rust
  • Multiprocess Support: Process isolation and multi-language nodes
  • Pythonic: Feels like native Python, not a foreign function wrapper
  • Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.

Quick Start

Installation

Automatic (Recommended)

Python bindings are automatically installed when you run the HORUS installer:

# From HORUS root directory
./install.sh

The installer will detect Python 3.9+ and automatically build and install the bindings.

Manual Installation

If you prefer to install manually or need to rebuild:

# Install maturin (Python/Rust build tool)
# Option A: Via Cargo (recommended for Ubuntu 24.04+)
cargo install maturin

# Option B: Via pip (if not blocked by PEP 668)
# pip install maturin

# Build and install from source
cd horus_py
maturin develop --release

Requirements:

  • Python 3.9+
  • Rust 1.70+
  • Linux (for shared memory support)

Minimal Example

# simplified
import horus

def process(node):
    node.send("output", "Hello HORUS!")

node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)

This minimal example demonstrates functional-style node creation without class boilerplate.


Core API

Creating a Node

# simplified
def Node(
    name: str = "",              # Node name (auto-generated from tick function if empty)
    subs: str | list = "",       # Topics to subscribe (string, list of strings, or list of message types)
    pubs: str | list = "",       # Topics to publish (string, list of strings, or list of message types)
    tick: Callable = None,       # Function called every tick: tick(node) -> None
    init: Callable = None,       # Optional: called once on startup: init(node) -> None
    shutdown: Callable = None,   # Optional: called on graceful shutdown: shutdown(node) -> None
    rate: int = 30,              # Tick rate in Hz
    order: int = 100,            # Execution priority (lower = earlier)
    budget: float = None,        # Max tick duration in seconds (None = auto from rate)
    deadline: float = None,      # Hard deadline in seconds (None = auto from rate)
    on_miss: str = None,         # "warn", "skip", "safe_mode", "stop" (None = "warn")
) -> Node

Parameters:

  • name — Node identifier. If empty, derived from tick function name.
  • subs/pubs — Topic declarations. Accepts: "topic_name", ["topic1", "topic2"], or typed [CmdVel, LaserScan] (see formats below).
  • tick — Main loop function, called at rate Hz. Receives the node instance.
  • rate — Tick frequency in Hz. Default: 30. Setting rate auto-enables RT scheduling.
  • order — Priority within scheduler tick (0-9: critical, 10-49: sensors, 50-99: processing, 100+: background).

Example:

# simplified
from horus import Node, CmdVel, LaserScan, Imu

node = Node(
    name="controller",
    pubs=[CmdVel],             # Typed — fast Pod zero-copy (~1.5μs)
    subs=[LaserScan, Imu],     # Auto-named: "scan", "imu"
    tick=control_fn,
    rate=100,
    order=0,
)

Topic declaration formats (determines performance path):

# simplified
# FAST — typed (Pod zero-copy, ~2.7μs send+recv)
pubs=[CmdVel]                 # auto-name from type: "cmd_vel"
pubs=[CmdVel, Pose2D]         # multiple typed topics
pubs={"motor": CmdVel}        # custom name + type

# GENERIC — string (MessagePack, ~10μs send+recv)
pubs=["data"]                 # GenericMessage — for dicts and custom data
pubs="single_topic"           # shorthand for single string topic

Parameters:

  • name (str, optional): Node name (auto-generated if omitted)
  • pubs: Topics to publish — [CmdVel] (typed, fast) or ["name"] (generic)
  • subs: Topics to subscribe — same formats as pubs
  • tick (callable): Function called each cycle, receives (node) as argument
  • rate (float): Execution rate in Hz (default: 30)
  • init (callable, optional): Setup function, called once at start
  • shutdown (callable, optional): Cleanup function, called once at end
  • on_error (callable, optional): Error handler, called if tick raises an exception
  • default_capacity (int, optional): Buffer capacity for auto-created topics (default: 1024)

Alternative: Class as State Container

For nodes with complex state, use a plain class and pass its method as the tick function:

# simplified
import horus

class SensorState:
    def __init__(self):
        self.reading = 0.0

    def tick(self, node):
        self.reading += 0.1
        node.send("temperature", self.reading)

    def init(self, node):
        print("Sensor initialized!")

    def shutdown(self, node):
        print("Sensor shutting down!")

# Use it
state = SensorState()
sensor = horus.Node(name="sensor", tick=state.tick, init=state.init,
                    shutdown=state.shutdown, pubs=["temperature"], rate=10)
horus.run(sensor)

Both patterns work! Use functional style for simplicity or class containers for complex nodes with state.

Node Functions

Your tick function receives the node as a parameter:

# simplified
def my_tick(node):
    # Check for messages
    if node.has_msg("input"):
        data = node.recv("input")  # Get one message

    # Get all messages
    all_msgs = node.recv_all("input")

    # Send messages
    node.send("output", {"value": 42})

Node Methods:

send

# simplified
def send(topic: str, data: Any) -> None

Publish a message to a topic. Non-blocking. Overwrites oldest if buffer is full.

Parameters:

  • topic: str — Topic name (must be in the node's pubs list)
  • data: Any — Message to send. Can be: a dict, a typed message (CmdVel, Image, etc.), or any serializable object
# simplified
node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})  # dict
node.send("cmd_vel", horus.CmdVel(1.0, 0.0))           # typed message

recv

# simplified
def recv(topic: str) -> Optional[Any]

Receive one message from a topic (FIFO order). Returns None if no messages available.

Parameters:

  • topic: str — Topic name (must be in the node's subs list)

Returns: The message, or None if buffer is empty.

# simplified
msg = node.recv("scan")
if msg is not None:
    print(f"Got {len(msg.ranges)} ranges")

node.recv_all(topic) -> list

Receive ALL available messages as a list. Drains the buffer completely. Returns an empty list if none available.

Use this for batch processing when you need to handle every message, not just the latest:

# simplified
def tick(node):
    # Process all queued commands (don't drop any)
    commands = node.recv_all("commands")
    for cmd in commands:
        execute_command(cmd)
    node.log_debug(f"Processed {len(commands)} commands this tick")

node.has_msg(topic) -> bool

Check if at least one unread message is available on the topic without consuming it. The message is buffered internally and returned by the next recv() call.

# simplified
def tick(node):
    if node.has_msg("emergency_stop"):
        stop = node.recv("emergency_stop")
        node.log_warning("Emergency stop received!")
        node.request_stop()

node.request_stop()

Request the scheduler to shut down gracefully after the current tick completes. Use this to stop execution programmatically from within a node.

# simplified
def tick(node):
    if horus.tick() >= 1000:
        node.log_info("Reached 1000 ticks, stopping")
        node.request_stop()

    error = check_safety()
    if error:
        node.log_error(f"Safety violation: {error}")
        node.request_stop()

node.publishers() -> List[str]

Returns the list of topic names this node publishes to.

# simplified
def init(node):
    node.log_info(f"Publishing to: {node.publishers()}")
    node.log_info(f"Subscribing to: {node.subscribers()}")

node.subscribers() -> List[str]

Returns the list of topic names this node subscribes to.

Logging Methods

MethodDescription
node.log_info(msg)Log an informational message
node.log_warning(msg)Log a warning message
node.log_error(msg)Log an error message
node.log_debug(msg)Log a debug message

Important: Logging only works during init(), tick(), or shutdown() callbacks. Calling outside the scheduler raises a RuntimeWarning and the message is silently dropped.

# simplified
def tick(node):
    node.log_info("Processing sensor data")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to process data")
    node.log_debug(f"Raw value: {value}")

# Outside scheduler — message is dropped with RuntimeWarning:
node = horus.Node(name="test", tick=tick)
node.log_info("This will be dropped!")  # RuntimeWarning

Node scheduling kwargs (maps 1:1 to Rust NodeBuilder):

KwargDefaultRust equivalent
rate30.rate()
order100.order()
budgetNone.budget()
deadlineNone.deadline()
on_missNone (warn).on_miss()
failure_policyNone (fatal).failure_policy()
computeFalse.compute()
onNone.on(topic)
priorityNone.priority()
coreNone.core()
watchdogNone.watchdog()
async tickauto-detected.async_io()

:::tip Python Timing Guide Budget/deadline in Python detect overruns, not guarantee timing. Python ticks take milliseconds, not microseconds. Use realistic values:

# simplified
# Rust node: microsecond budget
Node(tick=motor_ctrl, rate=1000, budget=300 * us)  # 300μs — Rust can do this

# Python ML node: millisecond budget — detects when inference is too slow
Node(tick=run_model, rate=30, budget=50 * ms)  # 50ms — triggers on_miss if exceeded

compute=True is useful when your tick calls C extensions that release the GIL (NumPy, PyTorch, OpenCV) — they run in parallel on the thread pool.

priority/core are for mixed Rust+Python systems — tell the OS to schedule Rust RT nodes before Python, and keep Python off RT cores. :::

Running Nodes

# simplified
def run(*nodes: Node, duration: float = None) -> None

Convenience one-liner: creates a Scheduler, adds all nodes, and runs.

Parameters:

  • *nodes: Node — One or more Node instances to run
  • duration: float — Optional. Run for this many seconds, then stop. None = run until Ctrl+C.
# simplified
# Single node — runs until Ctrl+C
horus.run(node)

# Multiple nodes for 10 seconds
horus.run(node1, node2, node3, duration=10)

Examples

1. Simple Publisher

# simplified
import horus

def publish_temperature(node):
    node.send("temperature", 25.5)

sensor = horus.Node(
    name="temp_sensor",
    pubs="temperature",
    tick=publish_temperature,
    rate=1  # 1 Hz
)

horus.run(sensor, duration=10)

2. Subscriber

# simplified
import horus

def display_temperature(node):
    if node.has_msg("temperature"):
        temp = node.recv("temperature")
        print(f"Temperature: {temp}°C")

display = horus.Node(
    name="display",
    subs="temperature",
    tick=display_temperature
)

horus.run(display)

3. Pub/Sub Pipeline

# simplified
import horus

def publish(node):
    node.send("raw", 42.0)

def process(node):
    if node.has_msg("raw"):
        data = node.recv("raw")
        result = data * 2.0
        node.send("processed", result)

def display(node):
    if node.has_msg("processed"):
        value = node.recv("processed")
        print(f"Result: {value}")

# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)

# Run all together
horus.run(publisher, processor, displayer, duration=5)

4. Using Lambda Functions

# simplified
import horus

# Producer (inline)
producer = horus.Node(
    pubs="numbers",
    tick=lambda n: n.send("numbers", 42),
    rate=1
)

# Transformer (inline)
doubler = horus.Node(
    subs="numbers",
    pubs="doubled",
    tick=lambda n: n.send("doubled", n.recv("numbers") * 2) if n.has_msg("numbers") else None
)

horus.run(producer, doubler, duration=5)

5. Multi-Topic Robot Controller

# simplified
import horus

def robot_controller(node):
    # Read from multiple sensors
    lidar_data = None
    camera_data = None

    if node.has_msg("lidar"):
        lidar_data = node.recv("lidar")

    if node.has_msg("camera"):
        camera_data = node.recv("camera")

    # Compute commands
    if lidar_data and camera_data:
        cmd = compute_navigation(lidar_data, camera_data)
        node.send("motors", cmd)
        node.send("status", "navigating")

robot = horus.Node(
    name="robot_controller",
    subs=["lidar", "camera"],
    pubs=["motors", "status"],
    tick=robot_controller,
    rate=50  # 50Hz control loop
)

6. Lifecycle Management

# simplified
import horus

class Context:
    def __init__(self):
        self.count = 0
        self.file = None

ctx = Context()

def init_handler(node):
    print("Starting up!")
    ctx.file = open("data.txt", "w")

def tick_handler(node):
    ctx.count += 1
    data = f"Tick {ctx.count}"
    node.send("data", data)
    ctx.file.write(data + "\n")

def shutdown_handler(node):
    print(f"Processed {ctx.count} messages")
    ctx.file.close()

node = horus.Node(
    pubs="data",
    init=init_handler,
    tick=tick_handler,
    shutdown=shutdown_handler,
    rate=10
)

horus.run(node, duration=5)

Advanced Features (Production-Ready)

HORUS Python includes advanced features that match or exceed ROS2 capabilities while maintaining simplicity.

NodeState

The NodeState enum tracks which lifecycle phase a node is in:

# simplified
from horus import NodeState

# Values:
NodeState.UNINITIALIZED  # Created but not yet running
NodeState.INITIALIZING   # init() is executing
NodeState.RUNNING        # Actively ticking
NodeState.STOPPING       # shutdown() is executing
NodeState.STOPPED        # Clean shutdown complete
NodeState.ERROR          # Recoverable error state
NodeState.CRASHED        # Unrecoverable crash

NodeState values are strings — you can compare directly:

# simplified
if node_state == "running":
    print("Node is active")

Scheduler

# simplified
def Scheduler(
    tick_rate: float = 1000.0,   # Global tick rate in Hz
    rt: bool = False,            # Enable real-time scheduling (prefer_rt)
    watchdog_ms: int = 0,        # Watchdog timeout in ms (0 = disabled)
    deterministic: bool = False, # Deterministic execution mode
    verbose: bool = False,       # Enable verbose logging
) -> Scheduler

The Scheduler orchestrates node execution with priority ordering, per-node rate control, and real-time features.

Methods:

  • scheduler.add(node: Node) -> Scheduler — Register a node (returns self for chaining)
  • scheduler.run(duration: float = None) -> None — Start the main loop. Blocks until Ctrl+C, stop(), or duration expires
  • scheduler.stop() -> None — Request graceful shutdown

Creating a Scheduler:

# simplified
# All config on Node(), scheduler.add() takes only the node
scheduler = horus.Scheduler()
scheduler.add(horus.Node(tick=motor_fn, rate=1000, order=0, budget=200))
scheduler.add(horus.Node(tick=planner_fn, order=5, compute=True))
scheduler.add(horus.Node(tick=telemetry_fn, rate=1, order=10))

Node configuration (kwargs on Node()):

MethodDescription
.order(n)Execution priority (lower = runs first)
.rate(hz)Node tick rate in Hz — auto-derives budget/deadline, marks as RT
.budget(us)Tick budget in microseconds
.on_miss(policy)"warn", "skip", "safe_mode", or "stop"
.on(topic)Event-driven — wakes only when topic has new data
.compute()Offload to worker thread pool (planning, ML)
.async_io()Run on async executor (network, disk)
.failure_policy(name, ...)"fatal", "restart", "skip", or "ignore" — optional kwargs: max_retries, backoff_ms, max_failures, cooldown_ms
.build()Finalize and register — returns Scheduler

Adding Nodes:

All configuration (order, rate, budget, etc.) goes on the Node() constructor. scheduler.add() takes only the node:

# simplified
sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=100, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)

scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

Execution:

MethodDescription
scheduler.run()Run until Ctrl+C or .stop()
scheduler.run(duration=10.0)Run for a specific duration, then shut down
scheduler.stop()Signal graceful shutdown
scheduler.current_tick()Current tick count

Monitoring:

MethodDescription
scheduler.get_node_stats(name)Stats dict: total_ticks, errors_count, avg_tick_duration_ms, etc.
scheduler.set_node_rate(name, rate)Change a node's tick rate at runtime
scheduler.set_tick_budget(name, us)Update per-node tick budget (microseconds)
scheduler.get_all_nodes()List all nodes with their configuration
scheduler.get_node_count()Number of registered nodes
scheduler.has_node(name)Check if a node is registered
scheduler.get_node_names()List of registered node names
scheduler.remove_node(name)Remove a node (returns True if found)
scheduler.status()Formatted status string
scheduler.capabilities()Dict of RT capabilities
scheduler.has_full_rt()True if all RT features available
scheduler.safety_stats()Dict of budget overruns, deadline misses, watchdog expirations

Recording & Replay:

MethodDescription
scheduler.is_recording()Check if recording is active
scheduler.is_replaying()Check if replaying
scheduler.stop_recording()Stop recording, returns list of saved file paths
Scheduler.list_recordings()List available recordings (static method)
Scheduler.delete_recording(name)Delete a recording (static method)

Context Manager:

The Scheduler supports the with statement for automatic cleanup:

# simplified
with horus.Scheduler(tick_rate=100) as sched:
    sched.add(horus.Node(tick=sensor_fn, rate=100, order=0))
    sched.add(horus.Node(tick=ctrl_fn, rate=100, order=1))
    sched.run(duration=10.0)
# stop() called automatically on exit, even if an exception occurs

Expanded Method Details:

scheduler.get_node_stats(name) -> dict

Returns a dictionary with detailed statistics for the named node:

# simplified
stats = scheduler.get_node_stats("motor_ctrl")
print(f"Total ticks: {stats['total_ticks']}")
print(f"Avg tick: {stats.get('avg_tick_duration_ms', 0):.2f} ms")
print(f"Errors: {stats['errors_count']}")

Keys include: name, priority, total_ticks, successful_ticks, failed_ticks, avg_tick_duration_ms, max_tick_duration_ms, errors_count, uptime_seconds.

scheduler.status() -> str

Returns the current scheduler state: "idle", "running", or "stopped".

scheduler.current_tick() -> int

Returns the current tick count (0-indexed).

scheduler.set_node_rate(name, rate)

Change a node's tick rate at runtime. Useful for adaptive control:

# simplified
# Slow down logging when battery is low
if battery_low:
    scheduler.set_node_rate("logger", 1)  # 1 Hz
else:
    scheduler.set_node_rate("logger", 10)  # 10 Hz

scheduler.run(duration=None)

Start the scheduler tick loop. Blocks until Ctrl+C, stop(), or duration expires.

# simplified
scheduler.run()              # Run forever (until Ctrl+C)
scheduler.run(duration=30.0) # Run for 30 seconds, then stop

scheduler.stop()

Signal graceful shutdown. All nodes' shutdown() callbacks run before exit.

# simplified
# From another thread or a node's tick:
scheduler.stop()

scheduler.get_all_nodes() -> List[Dict]

Returns all registered nodes with their configuration.

# simplified
for node in scheduler.get_all_nodes():
    print(f"{node['name']}: order={node.get('order', '?')}")

scheduler.get_node_count() -> int

Number of registered nodes.

# simplified
print(f"Running {scheduler.get_node_count()} nodes")

scheduler.has_node(name) -> bool

Check if a node with the given name is registered.

# simplified
if scheduler.has_node("motor_ctrl"):
    stats = scheduler.get_node_stats("motor_ctrl")

scheduler.get_node_names() -> List[str]

List of all registered node names.

# simplified
print(f"Nodes: {scheduler.get_node_names()}")

scheduler.remove_node(name) -> bool

Remove a node by name. Returns True if found and removed.

# simplified
if scheduler.remove_node("debug_logger"):
    print("Debug logger removed")

scheduler.capabilities() -> Dict

Returns a dict of detected RT capabilities (SCHED_FIFO, memory locking, CPU affinity, etc.).

# simplified
caps = scheduler.capabilities()
print(f"RT priority: {caps.get('max_priority', 'N/A')}")
print(f"Memory lock: {caps.get('memory_locking', False)}")

scheduler.has_full_rt() -> bool

Returns True if all requested RT features are available (no degradations).

# simplified
if not scheduler.has_full_rt():
    print("Warning: running with degraded RT — check capabilities()")

scheduler.safety_stats() -> Dict

Returns safety monitor statistics: budget overruns, deadline misses, watchdog expirations.

# simplified
stats = scheduler.safety_stats()
if stats:
    print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
    print(f"Budget overruns: {stats.get('budget_overruns', 0)}")

scheduler.is_recording() -> bool

Check if session recording is currently active.

scheduler.is_replaying() -> bool

Check if the scheduler is replaying a recorded session.

scheduler.stop_recording() -> List[str]

Stop recording and return the list of saved file paths.

# simplified
if scheduler.is_recording():
    paths = scheduler.stop_recording()
    print(f"Saved recordings: {paths}")

Scheduler.list_recordings() -> List[str]

List available recording sessions (static method).

# simplified
recordings = horus.Scheduler.list_recordings()
for r in recordings:
    print(f"  {r}")

Scheduler.delete_recording(name) -> bool

Delete a recording by name (static method).

scheduler.tick(node_names) -> None

Execute one tick cycle for the specified nodes only. Essential for deterministic testing — run exactly one tick and verify output.

# simplified
scheduler = horus.Scheduler(tick_rate=100)
scheduler.add(sensor)
scheduler.add(controller)

# Test: run one tick, check output
scheduler.tick(["Sensor", "Controller"])
stats = scheduler.get_node_stats("Controller")
assert stats["total_ticks"] == 1

scheduler.tick_for(node_names, duration_seconds) -> None

Execute ticks for the specified nodes over a duration. Useful for benchmarking and timed test runs.

# simplified
# Benchmark sensor processing for 5 seconds
scheduler.tick_for(["SensorProcessor"], 5.0)
stats = scheduler.get_node_stats("SensorProcessor")
print(f"Processed {stats['total_ticks']} ticks in 5s")

scheduler.is_running() -> bool

Check if the scheduler is currently executing its tick loop.

# simplified
import threading

def monitor_thread(sched):
    while sched.is_running():
        print(f"Tick: {sched.current_tick()}")
        time.sleep(1.0)

t = threading.Thread(target=monitor_thread, args=(scheduler,), daemon=True)
t.start()
scheduler.run(duration=10.0)

scheduler.get_node_info(name) -> Optional[int]

Get the execution order (priority) for a named node. Returns None if the node is not registered.

# simplified
order = scheduler.get_node_info("motor_ctrl")
if order is not None:
    print(f"motor_ctrl runs at order {order}")

scheduler.degradations() -> List[Dict]

Returns a list of RT feature degradations — features that were requested but couldn't be applied (e.g., SCHED_FIFO unavailable without root).

# simplified
scheduler = horus.Scheduler(tick_rate=1000, rt=True)
# ... add nodes and run ...

for d in scheduler.degradations():
    print(f"Degraded: {d.get('feature')} — {d.get('reason')}")

Each dict contains feature (what was requested) and reason (why it couldn't be applied).

horus.run() — The ONE way to run nodes:

# simplified
from horus import Node, run, us

sensor = Node(tick=read_lidar, rate=10, order=0, pubs=["scan"])
ctrl = Node(tick=navigate, rate=30, order=1, subs=["scan"], pubs=["cmd"])
motor = Node(tick=drive, rate=1000, order=2, budget=300*us, subs=["cmd"])

# All scheduler config as kwargs
run(sensor, ctrl, motor, rt=True, watchdog_ms=500)
run(node, duration=10, deterministic=True)

All run() kwargs (maps to Rust Scheduler builder):

KwargDefaultRust equivalent
durationNone (forever).run(duration=)
tick_rate1000.0.tick_rate()
rtFalse.prefer_rt()
deterministicFalse.deterministic()
watchdog_ms0.watchdog()
blackbox_mb0.blackbox()
recordingFalse.with_recording()
nameNone.name()
coresNone.cores()
max_deadline_missesNone.max_deadline_misses()
verboseFalse.verbose()
telemetryNone.telemetry()

Miss — Deadline Miss Policy

The Miss class defines what happens when a node exceeds its deadline:

# simplified
from horus import Miss

# Available policies
Miss.WARN        # Log warning and continue (default)
Miss.SKIP        # Skip the node for this tick
Miss.SAFE_MODE   # Call enter_safe_state() on the node
Miss.STOP        # Stop the entire scheduler

Use via the Node constructor:

# simplified
# Config on Node, then add
motor = horus.Node(name="motor", tick=motor_fn, rate=500, order=0, budget=200, on_miss="safe_mode")
scheduler.add(motor)

Scheduler Configuration

All configuration via constructor kwargs:

# simplified
from horus import Scheduler, Node

# Development — simple
scheduler = Scheduler()

# Production — watchdog + RT
scheduler = Scheduler(tick_rate=1000, rt=True, watchdog_ms=500)

# With blackbox + telemetry
scheduler = Scheduler(
    tick_rate=1000,
    watchdog_ms=500,
    blackbox_mb=64,
    telemetry="http://localhost:9090",
    verbose=True,
)

# Deterministic mode for simulation/testing
scheduler = Scheduler(tick_rate=100, deterministic=True)

Testing with short runs:

# simplified
scheduler = Scheduler()
scheduler.add(Node(name="sensor", tick=sensor_fn, rate=100, order=0))
scheduler.add(Node(name="ctrl", tick=ctrl_fn, rate=100, order=1))

# Run for a short duration
scheduler.run(duration=0.1)

Message Timestamps

Timestamps are managed by the Rust Topic backend. Typed messages include a timestamp_ns field for nanosecond-precision timing:

# simplified
import horus
import time

def control_tick(node):
    if node.has_msg("sensor_data"):
        msg = node.recv("sensor_data")

        # Use message-level timestamps for latency checks
        if hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
            age_s = (time.time_ns() - msg.timestamp_ns) / 1e9
            if age_s > 0.1:  # More than 100ms old
                node.log_warning(f"Stale data: {age_s*1000:.1f}ms old")
                return

            latency = age_s
            print(f"Latency: {latency*1000:.1f}ms")

        # Process fresh data
        process(msg)

Timestamp access: Use msg.timestamp_ns on typed messages (CmdVel, Pose2D, Imu, etc.) for nanosecond timestamps set by the Rust backend.

Multiprocess Execution

Run Python nodes in separate processes for isolation and multi-language support:

# Run multiple Python files as separate processes
horus run node1.py node2.py node3.py

# Mix Python and Rust nodes
horus run sensor.rs controller.py visualizer.py

# Mix Rust and Python
horus run lidar_driver.rs planner.py motor_control.rs

All nodes in the same horus run session automatically communicate via shared memory!

Example - Distributed System:

# simplified
# sensor_node.py
import horus

def sensor_tick(node):
    data = read_lidar()  # Your sensor code
    node.send("lidar_data", data)

sensor = horus.Node(name="lidar", pubs="lidar_data", tick=sensor_tick)
horus.run(sensor)
# simplified
# controller_node.py
import horus

def control_tick(node):
    if node.has_msg("lidar_data"):
        data = node.recv("lidar_data")
        cmd = compute_control(data)
        node.send("motor_cmd", cmd)

controller = horus.Node(
    name="controller",
    subs="lidar_data",
    pubs="motor_cmd",
    tick=control_tick
)
horus.run(controller)
# Run both in separate processes
horus run sensor_node.py controller_node.py

Benefits:

  • Process isolation: One crash doesn't kill everything
  • Multi-language: Mix Python and Rust nodes in the same application
  • Parallel execution: True multicore utilization
  • Zero configuration: Shared memory IPC automatically set up

Complete Example: All Features Together

# simplified
import horus
import time

def sensor_tick(node):
    """High-frequency sensor (100Hz)"""
    imu = {"accel_x": 1.0, "accel_y": 0.0, "accel_z": 9.8}
    node.send("imu_data", imu)
    node.log_info("Published IMU data")

def control_tick(node):
    """Medium-frequency control (50Hz)"""
    if node.has_msg("imu_data"):
        imu = node.recv("imu_data")
        cmd = {"linear": 1.0, "angular": 0.0}
        node.send("cmd_vel", cmd)

def logger_tick(node):
    """Low-frequency logging (10Hz)"""
    if node.has_msg("cmd_vel"):
        msg = node.recv("cmd_vel")
        node.log_info(f"Command received: {msg}")

# Create nodes with rate and order configured on the Node
sensor = horus.Node(name="imu", pubs="imu_data", tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", subs="imu_data", pubs="cmd_vel", tick=control_tick, rate=50, order=1)
logger = horus.Node(name="log", subs="cmd_vel", tick=logger_tick, rate=10, order=2)

# Add nodes to scheduler
scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

scheduler.run(duration=5.0)

# Check statistics
stats = scheduler.get_node_stats("imu")
print(f"Sensor: {stats['total_ticks']} ticks in 5 seconds")

Network Communication

HORUS Python supports network communication for distributed multi-machine systems. Topic, and Router all work transparently over the network.

Topic Network Endpoints

Add an endpoint parameter to communicate over the network:

# simplified
from horus import Topic, CmdVel

# Local (shared memory) - default
local_topic = Topic(CmdVel)

# Network (UDP direct)
network_topic = Topic(CmdVel, endpoint="cmdvel@192.168.1.100:8000")

# Router (TCP broker for WAN/NAT traversal)
router_topic = Topic(CmdVel, endpoint="cmdvel@router")

Endpoint Syntax:

  • "topic" - Local shared memory (~500ns latency)
  • "topic@host:port" - Direct UDP (<50μs latency)
  • "topic@router" - Router broker (auto-discovery on localhost:7777)
  • "topic@192.168.1.100:7777" - Router broker at specific address

Topic Methods

MethodDescription
topic.send(msg, node=None)Send a message. Pass optional node for automatic IPC logging. Returns True.
topic.recv(node=None)Receive one message. Returns the message or None if empty.
topic.nameProperty: the topic name string
topic.backend_typeProperty: the active backend name (e.g., "direct", "spsc_shm")
topic.is_network_topicProperty: True if this topic uses network transport
topic.endpointProperty: the endpoint string, or None for local topics
topic.stats()Returns a dict with messages_sent, messages_received, send_failures, recv_failures, is_network, backend
topic.is_generic()Returns True if this is a generic (string-name) topic

Example:

# simplified
from horus import Topic, CmdVel

topic = Topic(CmdVel)

# Send and receive typed messages
topic.send(CmdVel(linear=1.0, angular=0.5))
msg = topic.recv()
if msg:
    print(f"linear={msg.linear}, angular={msg.angular}")

# Check topic properties
print(f"Name: {topic.name}")           # "cmd_vel"
print(f"Backend: {topic.backend_type}") # e.g. "mpmc_shm"
print(f"Stats: {topic.stats()}")

Generic Topics

When you create a Topic with a string name (instead of a typed class), you get a generic topic that accepts any JSON-serializable data:

# simplified
from horus import Topic, CmdVel

# Generic topic (string name = dynamic typing)
topic = Topic("my_topic")

# Typed topic (class = static typing, better performance)
typed_topic = Topic(CmdVel)

Generic topics use the same send() and recv() methods as typed topics, but accept any JSON-serializable Python object. Data is serialized via MessagePack internally.

# simplified
from horus import Topic

topic = Topic("sensor_data")

# Send dict, list, or any JSON-serializable data
topic.send({"temperature": 25.5, "humidity": 60.0})
topic.send([1.0, 2.0, 3.0, 4.0])
topic.send("status: OK")

# Receive (returns Python object)
msg = topic.recv()  # {"temperature": 25.5, "humidity": 60.0}

# Check if generic
print(topic.is_generic())  # True

Typed vs Generic Performance:

Topic TypeSerializationUse Case
Typed (Topic(CmdVel))Direct field extraction (no serde)Production, cross-language, high-frequency
Generic (Topic("name"))Python → JSON → MessagePackDynamic schemas, prototyping, Python-only

Automatic Transport Selection

HORUS automatically selects the fastest communication path based on where publishers and subscribers are located. You never need to configure this manually:

# simplified
from horus import Topic, CmdVel

# Just create a topic — HORUS picks the fastest path automatically:
# Same-thread:     ~3ns  (when pub+sub are in the same node)
# Same-process:    ~18-36ns  (when pub+sub are in different nodes, same process)
# Cross-process:   ~85-171ns  (when pub+sub are in different processes)
topic = Topic("cmd_vel", CmdVel)

Automatic Transport Tiers:

ScenarioLatencyWhen it applies
Same thread~3nsPublisher and subscriber are in the same node
Same process (1:1)~18nsOne publisher, one subscriber, same process
Same process (many:1)~26nsMultiple publishers, one subscriber, same process
Same process (many:many)~36nsMultiple publishers and subscribers, same process
Cross-process (1:1)~85nsOne publisher, one subscriber, different processes
Cross-process (many:many)~91nsMultiple publishers and subscribers, different processes

Router Client (WAN/NAT Traversal)

For communication across networks, through NAT, or for large-scale deployments, use the Router:

# simplified
from horus import RouterClient, Topic, CmdVel

# Create router client for explicit connection management
router = RouterClient("192.168.1.100", 7777)

# Build endpoints through the router
cmd_endpoint = router.endpoint("cmdvel")  # Returns "cmdvel@192.168.1.100:7777"
pose_endpoint = router.endpoint("pose")

# Use endpoints with Topic
topic = Topic(CmdVel, endpoint=cmd_endpoint)

# Router properties
print(f"Address: {router.address}")        # "192.168.1.100:7777"
print(f"Connected: {router.is_connected}") # True
print(f"Topics: {router.topics}")          # ["cmdvel", "pose"]
print(f"Uptime: {router.uptime_seconds}s")

Helper Functions:

# simplified
from horus import default_router_endpoint, router_endpoint

# Default router (localhost:7777)
ep1 = default_router_endpoint("cmdvel")  # "cmdvel@router"

# Custom router address
ep2 = router_endpoint("cmdvel", "192.168.1.100", 7777)  # "cmdvel@192.168.1.100:7777"

Router Server (for testing):

# simplified
from horus import RouterServer

# Start a local router (for development/testing)
server = RouterServer(port=7777)
server.start()

# For production, use CLI instead:
# $ horus router start --port 7777

When to Use What

TransportLatencyUse Case
Same-process (Topic(CmdVel))~18-36nsIn-process communication (automatic)
Cross-process, 1:1 (Topic(CmdVel))~85nsSame machine, one publisher and one subscriber
Cross-process, many:many (Topic(CmdVel))~91nsSame machine, multiple publishers and subscribers
Network (endpoint="topic@host:port")<50μsMulti-machine on LAN (direct UDP)
Router (endpoint="topic@router")10-50msWAN, NAT traversal, cloud deployments

Multi-Machine Example

# simplified
# === ROBOT (192.168.1.50) ===
from horus import Topic, CmdVel, Imu, Odometry

# Local: Critical flight control (ultra-fast)
imu_topic = Topic(Imu)  # ~85ns local shared memory

# Network: Telemetry to ground station
telemetry = Topic(Odometry, endpoint="telem@192.168.1.100:8000")

# Network: Commands from ground station
commands = Topic(CmdVel, endpoint="cmd@0.0.0.0:8001")


# === GROUND STATION (192.168.1.100) ===
from horus import Topic, CmdVel, Odometry

# Receive telemetry from robot
telemetry_sub = Topic(Odometry, endpoint="telem@0.0.0.0:8000")

# Send commands to robot
command_pub = Topic(CmdVel, endpoint="cmd@192.168.1.50:8001")

Integration with Python Ecosystem

NumPy Integration

# simplified
import horus
import numpy as np

def process_array(node):
    if node.has_msg("raw_data"):
        data = node.recv("raw_data")
        # Convert to NumPy array
        arr = np.array(data)
        # Process with NumPy
        result = np.fft.fft(arr)
        node.send("fft_result", result.tolist())

processor = horus.Node(
    subs="raw_data",
    pubs="fft_result",
    tick=process_array
)

OpenCV Integration

# simplified
import horus
import cv2
import numpy as np

def process_image(node):
    if node.has_msg("camera"):
        img_data = node.recv("camera")
        # Convert to OpenCV format
        img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))

        # Apply OpenCV processing
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        # Publish result
        node.send("edges", edges.flatten().tolist())

vision = horus.Node(
    subs="camera",
    pubs="edges",
    tick=process_image,
    rate=30
)

scikit-learn Integration

# simplified
import horus
from sklearn.linear_model import LinearRegression
import numpy as np

model = LinearRegression()

def train_model(node):
    if node.has_msg("training_data"):
        data = node.recv("training_data")
        X = np.array(data['features'])
        y = np.array(data['labels'])

        # Train model
        model.fit(X, y)
        score = model.score(X, y)

        node.send("model_score", score)

trainer = horus.Node(
    subs="training_data",
    pubs="model_score",
    tick=train_model
)

Advanced Patterns

State Management

# simplified
import horus

class RobotState:
    def __init__(self):
        self.position = {"x": 0.0, "y": 0.0}
        self.velocity = 0.0
        self.last_update = 0

state = RobotState()

def update_state(node):
    if node.has_msg("velocity"):
        state.velocity = node.recv("velocity")

    if node.has_msg("position"):
        state.position = node.recv("position")

    # Publish combined state
    node.send("robot_state", {
        "pos": state.position,
        "vel": state.velocity
    })

state_manager = horus.Node(
    subs=["velocity", "position"],
    pubs="robot_state",
    tick=update_state
)

Rate Limiting

# simplified
import horus
import time

class RateLimiter:
    def __init__(self, min_interval):
        self.min_interval = min_interval
        self.last_send = 0

limiter = RateLimiter(min_interval=0.1)  # 100ms minimum

def rate_limited_publish(node):
    current_time = time.time()

    if current_time - limiter.last_send >= limiter.min_interval:
        node.send("output", "data")
        limiter.last_send = current_time

node = horus.Node(
    pubs="output",
    tick=rate_limited_publish,
    rate=100  # Node runs at 100Hz, but publishes at max 10Hz
)

Error Handling

# simplified
import horus

def safe_processing(node):
    try:
        if node.has_msg("input"):
            data = node.recv("input")
            result = risky_operation(data)
            node.send("output", result)
    except Exception as e:
        node.send("errors", str(e))
        print(f"Error: {e}")

processor = horus.Node(
    subs="input",
    pubs=["output", "errors"],
    tick=safe_processing
)

Performance Tips

1. Use Per-Node Rate Control

# simplified
# Configure rate and order on the Node, then add to scheduler
sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=50, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)

scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

scheduler.run()

# Monitor performance with get_node_stats()
stats = scheduler.get_node_stats("sensor")
print(f"Sensor executed {stats['total_ticks']} ticks")

2. Check Message Freshness

# simplified
import time

def control_tick(node):
    if node.has_msg("sensor_data"):
        data = node.recv("sensor_data")
        # Use message-level timestamps for staleness checks
        if hasattr(data, 'timestamp_ns') and data.timestamp_ns:
            age_s = (time.time_ns() - data.timestamp_ns) / 1e9
            if age_s > 0.1:
                node.log_warning("Skipping stale sensor data")
                return
        process(data)

3. Use Dicts for Messages

# simplified
# Send messages as Python dicts (automatically serialized to JSON)
cmd = {"linear": 1.5, "angular": 0.8}
node.send("cmd_vel", cmd)

# For staleness checks, use typed messages with timestamp_ns
# or track send time at the application level

4. Batch Processing

# simplified
# Use node.recv_all() to process all available messages at once
def batch_processor(node):
    messages = node.recv_all("input")
    if messages:
        results = [process(msg) for msg in messages]
        for result in results:
            node.send("output", result)

5. Keep tick() Fast

# simplified
# GOOD: Fast tick
def good_tick(node):
    if node.has_msg("input"):
        data = node.recv("input")
        result = quick_operation(data)
        node.send("output", result)

# BAD: Slow tick
def bad_tick(node):
    time.sleep(1)  # Don't block!
    data = requests.get("http://api.example.com")  # Don't do I/O!

6. Offload Heavy Processing

# simplified
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def heavy_processing_node(node):
    if node.has_msg("input"):
        data = node.recv("input")
        # Offload to thread pool
        future = executor.submit(expensive_operation, data)
        # Don't block - check result later or use callback

7. Use Multiprocess for CPU-Intensive Tasks

# Isolate heavy processing in separate processes
horus run sensor.py heavy_vision.py light_controller.py

# Each node gets its own CPU core

Development

Building from Source

# Debug build (fast compile, slow runtime)
cd horus_py
maturin develop

# Release build (slow compile, fast runtime)
maturin develop --release

# Build wheel for distribution
maturin build --release

Running Tests

# Install test dependencies
pip install pytest

# Run all tests
pytest tests/

# Run specific feature tests
horus run tests/test_rate_control.py    # Phase 1: Per-node rates
horus run tests/test_timestamps.py      # Phase 2: Timestamps
horus run tests/test_typed_messages.py  # Phase 3: Typed messages

# With coverage
pytest --cov=horus tests/

# Test multiprocess execution (Phase 4)
horus run tests/multiprocess_publisher.py tests/multiprocess_subscriber.py

Mock Mode

HORUS Python includes a mock mode for testing without Rust bindings:

# simplified
# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."

# Use for unit testing Python logic without HORUS running

Debugging Tips

# simplified
# Check node statistics
scheduler = horus.Scheduler()
scheduler.add(my_node)

# Check node statistics
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}, Errors: {stats['errors_count']}")

# Monitor message timestamps via message-level fields
msg = node.recv("topic")
if msg and hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
    age = (time.time_ns() - msg.timestamp_ns) / 1e9
    print(f"Message age: {age*1000:.1f}ms")

Interoperability

With Rust Nodes

Important: For cross-language communication, use typed topics by passing a message type to Topic().

Cross-Language with Typed Topics

# simplified
# Python node with typed topic
from horus import Topic, CmdVel

cmd_topic = Topic(CmdVel)  # Typed topic
cmd_topic.send(CmdVel(linear=1.0, angular=0.5))
// Rust node receives
use horus::prelude::*;

let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
if let Some(cmd) = topic.recv() {
    println!("Got: linear={}, angular={}", cmd.linear, cmd.angular);
}

Generic Topic (String Topics)

# simplified
# Generic Topic - for custom topics
from horus import Topic

topic = Topic("my_topic")  # Pass string for generic topic
topic.send({"linear": 1.0, "angular": 0.5})  # Uses JSON serialization

Typed topics: Use Topic(CmdVel), Topic(Pose2D) for cross-language communication. See Python Message Library for details.


Time API

Framework-aware time functions. Use these instead of time.time() — they integrate with deterministic mode and SimClock.

Quick reference:

FunctionReturnsDescription
horus.now()floatCurrent time in seconds
horus.dt()floatTimestep for this tick in seconds
horus.elapsed()floatTime since scheduler start
horus.tick()intCurrent tick number
horus.budget_remaining()floatTime left in tick budget
horus.rng_float()floatRandom float in [0, 1)
horus.timestamp_ns()intNanosecond timestamp

horus.now() -> float

Current framework time in seconds.

  • Normal mode: Wall clock (time.time() equivalent)
  • Deterministic mode: Virtual SimClock that advances by fixed dt each tick
# simplified
def tick(node):
    t = horus.now()
    node.send("timestamp", t)

horus.dt() -> float

Timestep for this tick in seconds. Use this for physics integration instead of measuring elapsed time manually.

  • Normal mode: Actual elapsed time since last tick
  • Deterministic mode: Fixed 1.0 / rate — identical across runs
# simplified
def tick(node):
    # PID controller using dt() for correct integration
    error = target - current
    integral += error * horus.dt()
    derivative = (error - prev_error) / horus.dt()
    output = kp * error + ki * integral + kd * derivative

horus.elapsed() -> float

Time elapsed since the scheduler started, in seconds.

# simplified
def tick(node):
    if horus.elapsed() > 30.0:
        node.log_info("Running for 30+ seconds, stabilized")

horus.tick() -> int

Current tick number (0-indexed, increments each scheduler cycle).

# simplified
def tick(node):
    if horus.tick() % 100 == 0:
        node.log_info(f"Tick {horus.tick()}: system healthy")

horus.budget_remaining() -> float

Time remaining in this tick's budget, in seconds. Returns float('inf') if no budget is configured.

Use this for adaptive quality — do more work when time permits, skip expensive operations when tight.

# simplified
def tick(node):
    # Always do critical work
    process_sensor_data()

    # Only do expensive work if budget allows
    if horus.budget_remaining() > 0.001:  # >1ms remaining
        run_expensive_optimization()

horus.rng_float() -> float

Random float in [0.0, 1.0).

  • Normal mode: System entropy (non-deterministic)
  • Deterministic mode: Tick-seeded RNG — produces identical sequences across runs
# simplified
def tick(node):
    # Simulated sensor noise (reproducible in deterministic mode)
    noise = (horus.rng_float() - 0.5) * 0.1
    reading = true_value + noise

horus.timestamp_ns() -> int

Current timestamp in nanoseconds. Use for TransformFrame queries and message timestamping.

# simplified
def tick(node):
    ts = horus.timestamp_ns()
    transform = tf.lookup("camera", "base_link", ts)

Deterministic Mode

When using horus.run(..., deterministic=True), the time functions switch from wall clock to SimClock:

FunctionNormal ModeDeterministic Mode
now()Wall clockSimClock (virtual)
dt()Actual elapsedFixed 1/rate
elapsed()Real elapsedVirtual elapsed
rng_float()System entropyTick-seeded (reproducible)
tick()SameSame
budget_remaining()SameSame
timestamp_ns()Real nanosecondsVirtual nanoseconds

This ensures identical behavior across runs — critical for simulation, testing, and replay.


Runtime Parameters

horus.Params provides dict-like access to runtime configuration stored in .horus/config/params.yaml. Change PID gains, speed limits, and thresholds without recompiling.

Params(path=None)

Create a parameter store.

  • Params() — loads from .horus/config/params.yaml (default)
  • Params("path/to/file.yaml") — loads from explicit path

Methods

MethodReturnsDescription
get(key, default=None)AnyGet value, return default if missing
params[key]AnyGet value, raise KeyError if missing
params[key] = valueSet value
has(key)boolCheck if key exists
key in paramsboolSame as has(key)
keys()List[str]All parameter names
len(params)intNumber of parameters
save()Persist to disk
remove(key)boolRemove a key, returns True if existed
reset()Reset all parameters to defaults

Example: Live PID Tuning

# simplified
import horus

params = horus.Params()

def controller_tick(node):
    # Read gains from params — change at runtime via CLI or monitor
    kp = params.get("pid_kp", 1.0)
    ki = params.get("pid_ki", 0.1)
    kd = params.get("pid_kd", 0.01)
    max_speed = params.get("max_speed", 1.5)

    error = target - current
    output = min(kp * error, max_speed)
    node.send("cmd_vel", output)

controller = horus.Node(name="PIDController", tick=controller_tick,
                        rate=100, pubs=["cmd_vel"])
horus.run(controller)

Set parameters at runtime:

horus param set pid_kp 2.5
horus param set max_speed 0.8
horus param list

Rate Limiter

horus.Rate provides drift-compensated rate limiting for background threads and standalone loops. For nodes, use the rate= constructor kwarg instead.

Rate(hz)

Create a rate limiter targeting hz iterations per second.

Methods

MethodReturnsDescription
sleep()Sleep until next cycle. Compensates for work time to maintain target rate
actual_hz()floatMeasured frequency (smoothed average)
target_hz()floatTarget frequency
period()floatTarget period in seconds (1/hz)
is_late()boolTrue if current cycle exceeded the target period
reset()Reset timing (call after a pause to avoid burst catch-up)

Example: Camera Capture Thread

# simplified
import threading
from horus import Rate, Topic, Image

def camera_loop():
    rate = Rate(30)  # 30 FPS target
    topic = Topic(Image)

    while running:
        frame = capture_camera()
        topic.send(frame)

        if rate.is_late():
            print(f"Camera behind: {rate.actual_hz():.1f} Hz (target {rate.target_hz():.0f})")

        rate.sleep()

thread = threading.Thread(target=camera_loop, daemon=True)
thread.start()

Hardware Configuration

horus.hardware loads hardware node configs from horus.toml's [hardware] section.

Module Functions

FunctionReturnsDescription
hardware.load()list[(str, obj)]Load hardware entries from horus.toml
hardware.load_from(path)list[(str, obj)]Load from explicit config path
hardware.register_driver(name, cls)Register a Python node class

NodeParams

Dict-like access to a hardware entry's configuration values.

MethodReturnsDescription
params[key]AnyGet value (KeyError if missing)
params.get(key)AnyGet value (KeyError if missing)
params.get_or(key, default)AnyGet with default if missing
params.has(key)boolCheck if key exists
params.keys()List[str]All parameter names

Example

# horus.toml
[hardware.arm]
use = "ArmDriver"
port = "/dev/ttyUSB0"
baudrate = 1000000

[hardware.lidar]
use = "rplidar"
port = "/dev/ttyUSB1"
sim = true
# simplified
import horus

entries = horus.hardware.load()
for name, obj in entries:
    if isinstance(obj, horus.NodeParams):
        port = obj.get_or("port", "/dev/ttyUSB0")
        print(f"{name} on {port}")

Unit Constants

# simplified
from horus import us, ms

us  # 1e-6 — microseconds to seconds
ms  # 1e-3 — milliseconds to seconds

# Use with budget/deadline for readability
node = horus.Node(tick=fn, rate=1000, budget=300 * us, deadline=900 * us)

Error Types

# simplified
from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError

try:
    tf.tf("missing_frame", "base")
except HorusTransformError as e:
    print(f"Transform failed: {e}")

try:
    tf.wait_for_transform("src", "dst", timeout_sec=1.0)
except HorusTimeoutError:
    print("Timed out waiting for transform")
ExceptionRust sourceRaised when
HorusNotFoundErrorNotFound(...)Missing topic, frame, node
HorusTransformErrorTransform(...)TF extrapolation, stale data
HorusTimeoutErrorTimeout(...)Blocking operation timed out

Other Rust errors map to stdlib: ConfigValueError, IoIOError, MemoryMemoryError, etc.


See Also


Common Patterns

Producer-Consumer

# simplified
# Producer
producer = horus.Node(
    pubs="queue",
    tick=lambda n: n.send("queue", generate_work())
)

# Consumer
consumer = horus.Node(
    subs="queue",
    tick=lambda n: process_work(n.recv("queue")) if n.has_msg("queue") else None
)

horus.run(producer, consumer)

Request-Response

# simplified
def request_node(node):
    node.send("requests", {"id": 1, "query": "data"})

def response_node(node):
    if node.has_msg("requests"):
        req = node.recv("requests")
        response = handle_request(req)
        node.send("responses", response)

req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)

Periodic Tasks

# simplified
import time

class PeriodicTask:
    def __init__(self, interval):
        self.interval = interval
        self.last_run = 0

task = PeriodicTask(interval=5.0)  # Every 5 seconds

def periodic_tick(node):
    current = time.time()
    if current - task.last_run >= task.interval:
        node.send("periodic", "task_executed")
        task.last_run = current

node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)

Troubleshooting

Import Errors

# simplified
# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release

Slow Performance

# simplified
# Use release build (not debug)
maturin develop --release

# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30)  # 30Hz is reasonable

Memory Issues

# simplified
# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
    all_data.append(node.recv("input"))  # Memory leak!

# GOOD:
def good_tick(node):
    data = node.recv("input")
    process_and_discard(data)  # Process immediately

Monitor Integration and Logging

Current Limitations

Python nodes currently do NOT appear in the HORUS monitor logs.

The Python bindings do not integrate with the Rust logging system:

# simplified
# Python nodes use standard print() for logging
print("Debug message")  # Visible in console, not in monitor

What this means:

  • Python nodes communicate via shared memory
  • All message passing functionality works
  • Python log messages don't appear in monitor logs
  • Use print() for Python-side debugging

Monitoring Python Nodes

Since Python nodes don't integrate with the monitor logging system, use these alternatives:

  1. Node-level logging methods:
# simplified
def tick(node):
    node.log_info("Processing sensor data")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to process data")
    node.log_debug("Debug information")

# These print to console, not monitor
  1. Manual topic monitoring:
# simplified
def tick(node):
    if node.has_msg("input"):
        data = node.recv("input")
        print(f"[{node.name}] Received: {data}")
        node.send("output", result)
        print(f"[{node.name}] Published: {result}")
  1. Node statistics:
# simplified
scheduler = horus.Scheduler()
scheduler.add(node)
scheduler.run(duration=10)

# Get stats after running
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")

Future Improvements

Monitor integration for Python nodes is planned for a future release. This will include:

  • Full NodeInfo context in Python callbacks
  • LogSummary for Python message types
  • Python node logs visible in the monitor TUI and web dashboard

Custom Exceptions

HORUS defines three custom exception types plus maps internal errors to standard Python exceptions:

# simplified
from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError

try:
    result = some_horus_operation()
except HorusNotFoundError:
    print("Resource not found")
except HorusTransformError:
    print("Transform computation failed")
except HorusTimeoutError:
    print("Operation timed out")

Custom exceptions (inherit from Exception):

ExceptionWhen RaisedRust Source
HorusNotFoundErrorTopic, frame, node, or parent frame not foundHorusError::NotFound
HorusTransformErrorTransform extrapolation or stale dataHorusError::Transform
HorusTimeoutErrorBlocking operation exceeded time limitHorusError::Timeout

Standard Python exceptions raised by HORUS operations:

Python ExceptionWhen RaisedRust Source
IOErrorFile or IPC I/O failuresHorusError::Io
MemoryErrorShared memory or pool allocation failuresHorusError::Memory
ValueErrorInvalid parameters, bad config, parse errorsHorusError::InvalidInput, InvalidDescriptor, Parse, Config
TypeErrorSerialization/deserialization failuresHorusError::Serialization
RuntimeErrorInternal or unmapped errorsAll other variants

All exceptions preserve the original Rust error message, so you get full context:

# simplified
try:
    tf = tf_tree.tf("nonexistent", "world")
except HorusNotFoundError as e:
    print(e)  # "Frame not found: nonexistent"

try:
    img = Image(height=-1, width=640, encoding="rgb8")
except ValueError as e:
    print(e)  # "Invalid input: height must be positive"

Catch hierarchy — order matters when catching:

# simplified
try:
    result = horus_operation()
except HorusNotFoundError:
    pass  # Specific: missing resource
except HorusTransformError:
    pass  # Specific: TF failure
except HorusTimeoutError:
    pass  # Specific: deadline exceeded
except (ValueError, TypeError):
    pass  # Bad input or serialization
except (IOError, MemoryError):
    pass  # System-level failures
except RuntimeError:
    pass  # Catch-all for internal errors

See Also


Remember: With HORUS Python, you focus on what your robot does, not how the framework works!