Python Examples
Complete Python examples demonstrating HORUS capabilities.
Available Examples
| Example | Description | Key Features |
|---|---|---|
| Basic Node | Simple sensor-to-motor node | Node, get(), send() |
| Typed Topics | Direct pub/sub with typed messages | Topic, CmdVel, LaserScan |
| Async Node | Non-blocking I/O with async/await | AsyncNode, AsyncTopic |
| ML Inference | ONNX model inference pipeline | ONNXInferenceNode, PerformanceMonitor |
| Cross-Language | Python-Rust communication | Typed topics, shared memory |
| Multi-Node System | Scheduler with multiple nodes | Scheduler, run() |
| Sensor Processing | LaserScan obstacle avoidance | Topic, LaserScan, CmdVel |
Basic Node
A minimal node that reads sensor data and publishes motor commands:
from horus import Node, run
def controller(node):
"""Simple obstacle avoidance"""
if node.has_msg("sensor.distance"):
distance = node.get("sensor.distance")
if distance < 0.5:
node.send("motor.cmd", {"linear": 0.0, "angular": 0.5})
else:
node.send("motor.cmd", {"linear": 1.0, "angular": 0.0})
node = Node(
name="obstacle_avoider",
subs=["sensor.distance"],
pubs=["motor.cmd"],
tick=controller,
rate=10
)
run(node)
Typed Pub/Sub
Using typed Topic for direct pub/sub with message classes:
from horus import Topic, CmdVel, LaserScan
# Create typed topics
scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)
# Send a velocity command
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))
# Receive laser scan (returns None if no message)
scan = scan_topic.recv()
if scan:
print(f"Got {len(scan)} range readings")
min_dist = min(scan.ranges) if scan.ranges else float('inf')
print(f"Closest obstacle: {min_dist:.2f}m")
With backend hints for performance tuning:
from horus import Topic, CmdVel
# Same-thread callbacks (~3ns)
fast_topic = Topic(CmdVel, backend="direct")
# Single producer/consumer intra-process (~18ns)
spsc_topic = Topic(CmdVel, backend="spsc")
# Cross-process shared memory (~167ns)
shm_topic = Topic(CmdVel, backend="mpmc_shm")
Async Node
Non-blocking I/O with AsyncNode and AsyncTopic:
import horus
import asyncio
class WeatherFetcher(horus.AsyncNode):
"""Fetch data from HTTP API without blocking"""
async def async_init(self):
self.weather_pub = horus.AsyncTopic(dict)
async def async_tick(self):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/weather") as resp:
if resp.status == 200:
data = await resp.json()
await self.weather_pub.send(data)
await asyncio.sleep(60.0) # Fetch every 60 seconds
async def async_shutdown(self):
print("Weather fetcher stopping")
class WeatherLogger(horus.AsyncNode):
"""Log weather data to database"""
async def async_init(self):
self.weather_sub = horus.AsyncTopic(dict)
async def async_tick(self):
weather = await self.weather_sub.try_recv()
if weather:
print(f"Temperature: {weather.get('temp')}°C")
# Run both nodes together
scheduler = horus.Scheduler()
scheduler.add(WeatherFetcher(), order=0)
scheduler.add(WeatherLogger(), order=1)
scheduler.run()
ML Inference
Using ONNX models with the ML utilities:
import numpy as np
from horus.ml_utils import ONNXInferenceNode, PerformanceMonitor
class PoseEstimationNode(ONNXInferenceNode):
"""Human pose estimation using ONNX model"""
def __init__(self, model_path="models/movenet_lightning.onnx"):
super().__init__(
model_path=model_path,
input_topic="camera.raw",
output_topic="poses"
)
self.monitor = PerformanceMonitor(window_size=30)
def load_model(self):
import onnxruntime as ort
providers = ['CPUExecutionProvider']
self.session = ort.InferenceSession(self.model_path, providers=providers)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
def preprocess(self, image_data):
"""Resize and normalize for pose model"""
img = np.array(image_data).astype(np.float32)
img = (img / 127.5) - 1.0 # Normalize to [-1, 1]
img = np.transpose(img, (2, 0, 1)) # HWC -> CHW
return np.expand_dims(img, 0) # Add batch dimension
def infer(self, preprocessed):
return self.session.run(None, {self.input_name: preprocessed})[0]
def postprocess(self, output):
"""Parse keypoints from model output"""
keypoints = []
for i in range(17): # 17 body keypoints
y, x, conf = output[0, i, 0], output[0, i, 1], output[0, i, 2]
if conf > 0.3:
keypoints.append({'id': i, 'x': float(x), 'y': float(y), 'confidence': float(conf)})
return keypoints
Performance monitoring:
from horus.ml_utils import PerformanceMonitor
monitor = PerformanceMonitor(window_size=100)
# Record inference latencies
monitor.record(12.5) # ms
monitor.record(13.1)
# Get statistics
stats = monitor.get_stats()
print(f"Avg: {stats['avg_latency_ms']:.1f}ms, P95: {stats['p95_latency_ms']:.1f}ms, FPS: {stats['fps']:.0f}")
# Pretty print
monitor.print_stats()
Cross-Language Communication
Python and Rust nodes communicate via typed topics through shared memory (same machine):
from horus import Topic, CmdVel
import time
# Create typed topic — shared memory, readable by Rust
topic = Topic(CmdVel)
# Publish velocity commands
while True:
t = time.time()
linear = 1.0 + 0.5 * (t % 10) / 10.0
angular = 0.2 * ((t % 20) - 10) / 10.0
topic.send(CmdVel(linear=linear, angular=angular))
time.sleep(0.1)
Rust subscriber (in another process on the same machine):
use horus::prelude::*;
let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
loop {
if let Some(cmd) = topic.recv() {
println!("linear={}, angular={}", cmd.linear, cmd.angular);
}
}
Multi-Node System
Running multiple nodes with the Scheduler:
from horus import Node, Scheduler
# Sensor node - reads and publishes data
def sensor_tick(node):
# Simulated sensor reading
node.send("sensor.distance", 1.5)
# Controller node - processes sensor data, outputs commands
def controller_tick(node):
if node.has_msg("sensor.distance"):
dist = node.get("sensor.distance")
if dist < 0.5:
node.send("cmd_vel", {"linear": 0.0, "angular": 0.5})
else:
node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})
# Logger node - records data
def logger_tick(node):
if node.has_msg("cmd_vel"):
cmd = node.get("cmd_vel")
print(f"Command: {cmd}")
sensor = Node(name="sensor", pubs=["sensor.distance"], tick=sensor_tick, rate=30)
controller = Node(name="controller", subs=["sensor.distance"], pubs=["cmd_vel"], tick=controller_tick, rate=30)
logger = Node(name="logger", subs=["cmd_vel"], tick=logger_tick, rate=10)
# Create scheduler with execution order
scheduler = Scheduler()
scheduler.add(sensor, order=0)
scheduler.add(controller, order=1)
scheduler.add(logger, order=2)
scheduler.run()
Or use the run() helper for quick prototyping:
from horus import Node, run
node = Node(
name="echo",
subs=["input"],
pubs=["output"],
tick=lambda n: n.send("output", n.get("input")) if n.has_msg("input") else None,
rate=30
)
run(node, duration=10) # Run for 10 seconds
Sensor Processing Pipeline
Processing typed sensor data and publishing commands:
from horus import Node, Scheduler, Topic, LaserScan, CmdVel
scan_topic = None
cmd_topic = None
def init(node):
global scan_topic, cmd_topic
scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)
def obstacle_avoidance(node):
scan = scan_topic.recv()
if scan and scan.ranges:
min_dist = min(r for r in scan.ranges if r > scan.range_min)
if min_dist < 0.5:
# Too close — turn away
cmd_topic.send(CmdVel(linear=0.0, angular=0.5))
else:
# Clear ahead — drive forward
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))
node = Node(
name="obstacle_avoider",
tick=obstacle_avoidance,
init=init,
rate=10
)
scheduler = Scheduler()
scheduler.add(node, order=0)
scheduler.run()
Camera Image Pipeline
Send and receive camera images using the Image domain type with zero-copy shared memory.
Camera Sender
import horus
import numpy as np
# Create a 480x640 RGB8 image backed by shared memory
img = horus.Image(480, 640, "rgb8")
# Fill from a NumPy array (zero-copy when possible)
pixels = np.zeros((480, 640, 3), dtype=np.uint8)
pixels[:, :, 2] = 255 # Blue channel
img = horus.Image.from_numpy(pixels)
# Send over a topic
topic = horus.Topic("camera.rgb")
topic.send(img)
Camera Receiver
import horus
topic = horus.Topic("camera.rgb")
img = topic.recv()
if img is not None:
# Convert to NumPy for processing (zero-copy)
arr = img.to_numpy()
print(f"Received {arr.shape[1]}x{arr.shape[0]} image")
# Convert to PyTorch tensor (zero-copy via DLPack)
tensor = img.to_torch()
print(f"Torch tensor: {tensor.shape}, {tensor.dtype}")
Key Concepts:
horus.Image(height, width, encoding)— allocates from shared memoryImage.from_numpy(arr)— wrap a NumPy array as a HORUS Imageimg.to_numpy()/img.to_torch()— zero-copy conversion to frameworks- Same
TopicAPI as Rust — Python and Rust processes share topics automatically
See Also
- Python Bindings - Core Python API
- Message Library - Available message types
- Multi-Language Support - Python-Rust cross-language communication