Python API
You have a Python ML model running inference at 30Hz. You need it to receive camera images from a Rust sensor driver and publish detections to a Rust planner — all through shared memory at microsecond latency. This page is the complete reference for doing that.
Two layers:
_horus(Rust PyO3 internals) andhorus(Python wrapper you import). This page documents thehoruslayer — the user-facing API.
# simplified
import horus
Quick Reference
| API | Description | Page |
|---|---|---|
horus.Node(...) | Computation unit — tick, init, shutdown via kwargs | Node API |
horus.Scheduler(...) | Node orchestrator — tick rate, RT, watchdog, recording | Scheduler API |
horus.run(*nodes) | One-liner: create scheduler, add nodes, run | Scheduler API |
horus.Topic(type) | Standalone pub/sub (outside node lifecycle) | Topic API |
horus.now() / horus.dt() | Framework clock (wall clock or SimClock) | Clock API |
horus.CmdVel, horus.Imu, ... | 75+ standard robotics message types | Messages |
horus.Image, horus.PointCloud | Zero-copy domain types (NumPy/DLPack) | Image, PointCloud |
horus.drivers.load() | Load hardware drivers from horus.toml | Drivers |
horus.TransformFrame | Coordinate frame tree | TransformFrame |
horus.Params() | Runtime parameter store | Rate & Params |
horus.Rate(hz) | Drift-compensated rate limiter | Rate & Params |
Node
The primary building block — configure with kwargs, no inheritance needed. Constructor, methods, topic specs, lifecycle callbacks, and examples.
Scheduler
Orchestrates node execution — tick-rate control, RT scheduling, watchdogs, recording, deterministic mode. 25+ methods across lifecycle, introspection, safety, and recording. Includes run() one-liner.
Clock
Framework clock — now(), dt(), elapsed(), tick(), budget_remaining(), rng_float(). Wall clock vs SimClock behavior. Unit constants horus.us, horus.ms.
Topic
Standalone pub/sub for scripts, tests, and tools. Typed topics (~1.7μs, cross-language) vs GenericMessage (~6-50μs, Python-only). Performance comparison and cross-language compatibility rules.
Drivers
Load hardware node configs from horus.toml:
# simplified
entries = horus.hardware.load() # From horus.toml [hardware]
entries = horus.hardware.load_from("path") # From custom path
horus.hardware.register_driver("name", MyClass) # Register Python node class
Returns list[(name, obj)] — registered classes are instantiated, others returned as NodeParams.
NodeParams: get(key), get_or(key, default), has(key), keys(), params[key]
Error Types
| Exception | When |
|---|---|
horus.HorusNotFoundError | Topic, node, or resource not found |
horus.HorusTransformError | Transform lookup fails (frame not in tree) |
horus.HorusTimeoutError | Operation timed out |
Supporting Types
See Node API for NodeInfo, NodeState, and Miss.
Mixed Rust + Python Projects
The most common production pattern: Rust for control loops, Python for ML inference. Both communicate via shared memory topics.
Project Structure
my-robot/
├── horus.toml # Shared config
├── src/
│ ├── sensor.rs # Rust: hardware driver at 1kHz
│ └── detector.py # Python: YOLO inference at 30Hz
└── launch.yaml # Launch both together
Which Types Cross the Boundary?
| Type | Rust → Python | Python → Rust | Transport |
|---|---|---|---|
Typed messages (CmdVel, Imu, LaserScan, etc.) | Yes | Yes | Zero-copy Pod (~1.7μs) |
Image, PointCloud, DepthImage | Yes | Yes | Pool-backed descriptor |
| Python dicts (GenericMessage) | No | No | Python-only (MessagePack) |
Custom message! types | Yes | Yes | If same #[repr(C)] layout |
Rule: Use typed message classes for cross-language topics. Python dicts only work between Python nodes.
Launch File
# launch.yaml
nodes:
- name: sensor
command: "horus run src/sensor.rs"
rate_hz: 1000
- name: detector
command: "horus run src/detector.py"
rate_hz: 30
depends_on: [sensor]
horus launch launch.yaml
Debugging Cross-Language Issues
# Verify both processes see the same topics
horus topic list --verbose
# Check message rate from Rust publisher
horus topic hz camera.rgb
# Watch messages flowing between languages
horus topic echo camera.rgb
If a Python node can't receive from a Rust node: verify both use the same typed message class (e.g., horus.Imu in Python, horus_library::messages::Imu in Rust). String topics (GenericMessage) do not cross the boundary.
Differences from Rust
| Aspect | Python | Rust |
|---|---|---|
| Node definition | horus.Node(tick=fn, ...) kwargs | impl Node for MyStruct trait |
| Topic creation | Auto-created from pubs/subs | Manual Topic::new("name") |
| Async support | Auto-detected from async def | Explicit .async_io() builder |
| Safety methods | Not available (is_safe_state, enter_safe_state) | Available on Node trait |
| GIL | Acquired per tick, released during run() | N/A |
| Performance | ~3μs overhead per tick (GIL acquire) | Zero overhead |
| Error routing | Python exceptions → Rust FailurePolicy | Panics → catch_unwind → FailurePolicy |
| Unit syntax | 300 * horus.us | 300_u64.us() (DurationExt) |
Production Example: ML Inference Node
# simplified
import horus
import numpy as np
# Assume a pre-loaded model
model = load_model("yolov8n.pt")
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Convert to numpy (zero-copy via DLPack)
frame = img.to_numpy()
# Run inference
detections = model.predict(frame)
# Publish results
for det in detections:
node.send("detections", {
"class": det.class_name,
"confidence": float(det.confidence),
"bbox": [det.x1, det.y1, det.x2, det.y2],
"timestamp_ns": horus.timestamp_ns(),
})
detector = horus.Node(
name="yolo_detector",
subs=[horus.Image],
pubs=["detections"],
tick=detect_tick,
rate=30,
compute=True, # Run on thread pool (CPU-bound inference)
budget=30 * horus.ms,
on_miss="skip", # Skip frame if inference takes too long
)
horus.run(detector, tick_rate=100)
Design Decisions
Why a Python wrapper layer over PyO3? The raw PyO3 bindings (_horus) expose Rust types directly, which have Rust-idiomatic APIs (builders, traits, enums). The Python wrapper (horus) translates these into Pythonic patterns: kwargs instead of builders, plain functions instead of trait methods, None instead of Option. This means the Python API can evolve independently of the Rust API.
Why kwargs, not class inheritance? Class inheritance (class MyNode(horus.Node): def tick(self):...) requires boilerplate and doesn't work with plain functions or lambdas. The kwargs API (horus.Node(tick=my_fn, rate=30)) is more Pythonic, matches FastAPI/Click patterns, and all config happens in one call.
Why recv() returns data, not a callback? Pull-based reception keeps timing deterministic — your tick controls when data is consumed. Push-based callbacks fire at unpredictable times, making budget compliance harder. This matches the Rust try_recv() pattern.
Why release the GIL during run()? The scheduler's tick loop is Rust code. Releasing the GIL via py.detach() during run() lets other Python threads (e.g., a Flask telemetry server) run concurrently. The GIL is re-acquired only when calling Python tick/init/shutdown callbacks.
Why has_msg() uses peek buffering? has_msg() internally calls recv() and buffers the result. The next recv() returns the buffered value. This avoids a separate "peek" API while keeping the common if node.has_msg("x"): data = node.recv("x") pattern zero-overhead.
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
| Kwargs over inheritance | Concise, works with lambdas | No IDE auto-complete for tick signature |
| Auto-create topics from specs | Zero boilerplate for pubs/subs | Topics created even if never used |
| GenericMessage for string topics | Any Python dict/object works | ~5-50μs vs ~1.5μs for typed |
| GIL release during run() | Other threads run freely | ~3μs GIL re-acquire per tick |
| Peek buffering for has_msg() | Clean API, no separate peek | Consumes message on has_msg() check |
No is_safe_state/enter_safe_state | Simpler Python API | Safety-critical nodes must use Rust |
See Also
Core API:
- Node API — Constructor, methods, topic specs, lifecycle
- Scheduler API — Orchestration, RT, recording,
run() - Topic API — Standalone pub/sub, typed vs generic
- Clock API — Time functions, deterministic mode
Data Types:
- Standard Messages — 75+ typed robotics messages
- Image API — Zero-copy camera frames with NumPy/PyTorch
- PointCloud API — Zero-copy 3D point clouds
- TransformFrame — Coordinate frame management
Advanced:
- Python Bindings Deep Dive — Full PyO3 binding reference
- Async Nodes — Async I/O patterns
- Custom Messages — Runtime and compiled messages
- Getting Started (Python) — First Python application