Python Examples

Complete Python examples demonstrating HORUS capabilities.

Available Examples

ExampleDescriptionKey Features
Basic NodeSimple sensor-to-motor nodeNode, get(), send()
Typed TopicsDirect pub/sub with typed messagesTopic, CmdVel, LaserScan
Async NodeNon-blocking I/O with async/awaitAsyncNode, AsyncTopic
ML InferenceONNX model inference pipelineONNXInferenceNode, PerformanceMonitor
Cross-LanguagePython-Rust communicationTyped topics, shared memory
Multi-Node SystemScheduler with multiple nodesScheduler, run()
Sensor ProcessingLaserScan obstacle avoidanceTopic, LaserScan, CmdVel

Basic Node

A minimal node that reads sensor data and publishes motor commands:

from horus import Node, run

def controller(node):
    """Simple obstacle avoidance"""
    if node.has_msg("sensor.distance"):
        distance = node.get("sensor.distance")
        if distance < 0.5:
            node.send("motor.cmd", {"linear": 0.0, "angular": 0.5})
        else:
            node.send("motor.cmd", {"linear": 1.0, "angular": 0.0})

node = Node(
    name="obstacle_avoider",
    subs=["sensor.distance"],
    pubs=["motor.cmd"],
    tick=controller,
    rate=10
)

run(node)

Typed Pub/Sub

Using typed Topic for direct pub/sub with message classes:

from horus import Topic, CmdVel, LaserScan

# Create typed topics
scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)

# Send a velocity command
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))

# Receive laser scan (returns None if no message)
scan = scan_topic.recv()
if scan:
    print(f"Got {len(scan)} range readings")
    min_dist = min(scan.ranges) if scan.ranges else float('inf')
    print(f"Closest obstacle: {min_dist:.2f}m")

With backend hints for performance tuning:

from horus import Topic, CmdVel

# Same-thread callbacks (~3ns)
fast_topic = Topic(CmdVel, backend="direct")

# Single producer/consumer intra-process (~18ns)
spsc_topic = Topic(CmdVel, backend="spsc")

# Cross-process shared memory (~167ns)
shm_topic = Topic(CmdVel, backend="mpmc_shm")

Async Node

Non-blocking I/O with AsyncNode and AsyncTopic:

import horus
import asyncio

class WeatherFetcher(horus.AsyncNode):
    """Fetch data from HTTP API without blocking"""

    async def async_init(self):
        self.weather_pub = horus.AsyncTopic(dict)

    async def async_tick(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get("https://api.example.com/weather") as resp:
                if resp.status == 200:
                    data = await resp.json()
                    await self.weather_pub.send(data)
        await asyncio.sleep(60.0)  # Fetch every 60 seconds

    async def async_shutdown(self):
        print("Weather fetcher stopping")


class WeatherLogger(horus.AsyncNode):
    """Log weather data to database"""

    async def async_init(self):
        self.weather_sub = horus.AsyncTopic(dict)

    async def async_tick(self):
        weather = await self.weather_sub.try_recv()
        if weather:
            print(f"Temperature: {weather.get('temp')}°C")


# Run both nodes together
scheduler = horus.Scheduler()
scheduler.add(WeatherFetcher(), order=0)
scheduler.add(WeatherLogger(), order=1)
scheduler.run()

ML Inference

Using ONNX models with the ML utilities:

import numpy as np
from horus.ml_utils import ONNXInferenceNode, PerformanceMonitor

class PoseEstimationNode(ONNXInferenceNode):
    """Human pose estimation using ONNX model"""

    def __init__(self, model_path="models/movenet_lightning.onnx"):
        super().__init__(
            model_path=model_path,
            input_topic="camera.raw",
            output_topic="poses"
        )
        self.monitor = PerformanceMonitor(window_size=30)

    def load_model(self):
        import onnxruntime as ort
        providers = ['CPUExecutionProvider']
        self.session = ort.InferenceSession(self.model_path, providers=providers)
        self.input_name = self.session.get_inputs()[0].name
        self.output_names = [o.name for o in self.session.get_outputs()]

    def preprocess(self, image_data):
        """Resize and normalize for pose model"""
        img = np.array(image_data).astype(np.float32)
        img = (img / 127.5) - 1.0  # Normalize to [-1, 1]
        img = np.transpose(img, (2, 0, 1))  # HWC -> CHW
        return np.expand_dims(img, 0)  # Add batch dimension

    def infer(self, preprocessed):
        return self.session.run(None, {self.input_name: preprocessed})[0]

    def postprocess(self, output):
        """Parse keypoints from model output"""
        keypoints = []
        for i in range(17):  # 17 body keypoints
            y, x, conf = output[0, i, 0], output[0, i, 1], output[0, i, 2]
            if conf > 0.3:
                keypoints.append({'id': i, 'x': float(x), 'y': float(y), 'confidence': float(conf)})
        return keypoints

Performance monitoring:

from horus.ml_utils import PerformanceMonitor

monitor = PerformanceMonitor(window_size=100)

# Record inference latencies
monitor.record(12.5)  # ms
monitor.record(13.1)

# Get statistics
stats = monitor.get_stats()
print(f"Avg: {stats['avg_latency_ms']:.1f}ms, P95: {stats['p95_latency_ms']:.1f}ms, FPS: {stats['fps']:.0f}")

# Pretty print
monitor.print_stats()

Cross-Language Communication

Python and Rust nodes communicate via typed topics through shared memory (same machine):

from horus import Topic, CmdVel
import time

# Create typed topic — shared memory, readable by Rust
topic = Topic(CmdVel)

# Publish velocity commands
while True:
    t = time.time()
    linear = 1.0 + 0.5 * (t % 10) / 10.0
    angular = 0.2 * ((t % 20) - 10) / 10.0

    topic.send(CmdVel(linear=linear, angular=angular))
    time.sleep(0.1)

Rust subscriber (in another process on the same machine):

use horus::prelude::*;

let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
loop {
    if let Some(cmd) = topic.recv() {
        println!("linear={}, angular={}", cmd.linear, cmd.angular);
    }
}

Multi-Node System

Running multiple nodes with the Scheduler:

from horus import Node, Scheduler

# Sensor node - reads and publishes data
def sensor_tick(node):
    # Simulated sensor reading
    node.send("sensor.distance", 1.5)

# Controller node - processes sensor data, outputs commands
def controller_tick(node):
    if node.has_msg("sensor.distance"):
        dist = node.get("sensor.distance")
        if dist < 0.5:
            node.send("cmd_vel", {"linear": 0.0, "angular": 0.5})
        else:
            node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})

# Logger node - records data
def logger_tick(node):
    if node.has_msg("cmd_vel"):
        cmd = node.get("cmd_vel")
        print(f"Command: {cmd}")

sensor = Node(name="sensor", pubs=["sensor.distance"], tick=sensor_tick, rate=30)
controller = Node(name="controller", subs=["sensor.distance"], pubs=["cmd_vel"], tick=controller_tick, rate=30)
logger = Node(name="logger", subs=["cmd_vel"], tick=logger_tick, rate=10)

# Create scheduler with execution order
scheduler = Scheduler()
scheduler.add(sensor, order=0)
scheduler.add(controller, order=1)
scheduler.add(logger, order=2)
scheduler.run()

Or use the run() helper for quick prototyping:

from horus import Node, run

node = Node(
    name="echo",
    subs=["input"],
    pubs=["output"],
    tick=lambda n: n.send("output", n.get("input")) if n.has_msg("input") else None,
    rate=30
)

run(node, duration=10)  # Run for 10 seconds

Sensor Processing Pipeline

Processing typed sensor data and publishing commands:

from horus import Node, Scheduler, Topic, LaserScan, CmdVel

scan_topic = None
cmd_topic = None

def init(node):
    global scan_topic, cmd_topic
    scan_topic = Topic(LaserScan)
    cmd_topic = Topic(CmdVel)

def obstacle_avoidance(node):
    scan = scan_topic.recv()
    if scan and scan.ranges:
        min_dist = min(r for r in scan.ranges if r > scan.range_min)
        if min_dist < 0.5:
            # Too close — turn away
            cmd_topic.send(CmdVel(linear=0.0, angular=0.5))
        else:
            # Clear ahead — drive forward
            cmd_topic.send(CmdVel(linear=1.0, angular=0.0))

node = Node(
    name="obstacle_avoider",
    tick=obstacle_avoidance,
    init=init,
    rate=10
)

scheduler = Scheduler()
scheduler.add(node, order=0)
scheduler.run()

Camera Image Pipeline

Send and receive camera images using the Image domain type with zero-copy shared memory.

Camera Sender

import horus
import numpy as np

# Create a 480x640 RGB8 image backed by shared memory
img = horus.Image(480, 640, "rgb8")

# Fill from a NumPy array (zero-copy when possible)
pixels = np.zeros((480, 640, 3), dtype=np.uint8)
pixels[:, :, 2] = 255  # Blue channel
img = horus.Image.from_numpy(pixels)

# Send over a topic
topic = horus.Topic("camera.rgb")
topic.send(img)

Camera Receiver

import horus

topic = horus.Topic("camera.rgb")

img = topic.recv()
if img is not None:
    # Convert to NumPy for processing (zero-copy)
    arr = img.to_numpy()
    print(f"Received {arr.shape[1]}x{arr.shape[0]} image")

    # Convert to PyTorch tensor (zero-copy via DLPack)
    tensor = img.to_torch()
    print(f"Torch tensor: {tensor.shape}, {tensor.dtype}")

Key Concepts:

  • horus.Image(height, width, encoding) — allocates from shared memory
  • Image.from_numpy(arr) — wrap a NumPy array as a HORUS Image
  • img.to_numpy() / img.to_torch() — zero-copy conversion to frameworks
  • Same Topic API as Rust — Python and Rust processes share topics automatically

See Also