ML Integration
Use ML frameworks directly in horus nodes — no wrapper library needed. Import PyTorch, ONNX Runtime, TensorFlow, or OpenCV and use them in your tick function.
Zero-Copy Interop Matrix
horus data types integrate with the Python ML ecosystem via three protocols: __array_interface__ (NumPy), __dlpack__ (universal), and __cuda_array_interface__ (GPU).
| horus type | NumPy | PyTorch | JAX | OpenCV | ONNX RT |
|---|---|---|---|---|---|
| Image | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | via to_numpy() | via to_numpy() |
| PointCloud | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | — | via to_numpy() |
| DepthImage | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | via to_numpy() | via to_numpy() |
All conversions are zero-copy (~3μs constant time, regardless of data size). The Python side gets a view into horus shared memory — no pixel data is copied.
# simplified
img = node.recv("camera")
# Any of these — all zero-copy, all ~3μs:
np_arr = img.to_numpy() # NumPy ndarray
tensor = img.to_torch() # PyTorch tensor
jax_arr = img.to_jax() # JAX array
dlpack = np.from_dlpack(img) # DLPack protocol (979ns)
Performance: A 1920×1080 RGB image (6MB) takes 3μs to access as NumPy vs 178μs to copy — 59x faster. See Benchmarks for full numbers.
ONNX Runtime (Recommended for Production)
# simplified
import horus
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession("yolov8n.onnx", providers=["CUDAExecutionProvider"])
def detect(node):
if node.has_msg("camera"):
img = node.recv("camera").to_numpy()
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[np.newaxis] # HWC→NCHW
output = session.run(None, {"images": img})
node.send("detections", output[0])
horus.run(
horus.Node(tick=detect, rate=30, subs=["camera"], pubs=["detections"], order=0),
)
PyTorch
# simplified
import horus
import torch
model = torch.jit.load("resnet50.pt", map_location="cuda:0")
model.eval()
def classify(node):
if node.has_msg("camera"):
img = node.recv("camera").to_torch() # Zero-copy to PyTorch tensor
with torch.no_grad():
output = model(img.unsqueeze(0).cuda())
class_id = output.argmax(dim=1).item()
node.send("class", {"id": class_id, "confidence": output.max().item()})
horus.run(
horus.Node(tick=classify, rate=10, subs=["camera"], pubs=["class"]),
)
OpenCV
# simplified
import horus
import cv2
import numpy as np
def process_frame(node):
if node.has_msg("camera"):
img = node.recv("camera").to_numpy()
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, 50, 150)
result = horus.Image.from_numpy(edges)
node.send("edges", result)
horus.run(
horus.Node(tick=process_frame, rate=30, subs=["camera"], pubs=["edges"]),
)
TensorFlow / TFLite
# simplified
import horus
import tensorflow as tf
model = tf.saved_model.load("saved_model")
def infer(node):
if node.has_msg("input"):
data = node.recv("input")
tensor = tf.convert_to_tensor(data, dtype=tf.float32)
output = model(tensor)
node.send("output", output.numpy())
horus.run(horus.Node(tick=infer, rate=10, subs=["input"], pubs=["output"]))
Performance Tips
- Use
compute=Truefor CPU-bound inference — runs on thread pool, releases GIL during C extension calls (NumPy, ONNX, PyTorch):
# simplified
horus.Node(tick=detect, rate=30, compute=True, ...)
- Set realistic
budgetto detect slow inference:
# simplified
horus.Node(tick=detect, rate=30, budget=50 * horus.ms, on_miss="skip")
-
Use
horus.Image.to_torch()for zero-copy GPU transfer — no pixel data copied. -
Batch with
recv_all()if messages queue up:
# simplified
def batch_infer(node):
frames = node.recv_all("camera")
if frames:
batch = np.stack([f.to_numpy() for f in frames])
outputs = session.run(None, {"images": batch})
for det in outputs[0]:
node.send("detections", det)
GPU Memory Management
Critical for Jetson and other embedded devices with 4-8GB shared RAM between CPU and GPU.
# simplified
import torch
# Limit GPU memory on embedded devices
torch.cuda.set_per_process_memory_fraction(0.5) # Use max 50% of VRAM
# Always use no_grad for inference
with torch.no_grad():
output = model(input_tensor)
# Periodically clear cache
torch.cuda.empty_cache()
- Prefer FP16 or INT8 quantized models on embedded
- Monitor with
torch.cuda.memory_allocated()/torch.cuda.max_memory_allocated()
Error Handling
# simplified
def my_init(node):
try:
model = torch.load("model.pt", map_location="cuda")
except FileNotFoundError:
node.log_error("Model file not found — running without ML")
model = None
except RuntimeError as e:
if "CUDA out of memory" in str(e):
node.log_error("GPU OOM — try a smaller model or reduce batch size")
model = None
else:
raise
Model Warmup
First inference is 10-100x slower than steady-state due to CUDA kernel compilation and memory allocation. Run a dummy inference in init() before the tick loop:
# simplified
def my_init(node):
model.eval()
dummy = torch.zeros(1, 3, 640, 640).cuda()
with torch.no_grad():
model(dummy) # warmup — first call compiles CUDA kernels
node.log_info('Model warmed up')
Quick Reference
| Framework | Import | Zero-Copy From Image | Inference Pattern |
|---|---|---|---|
| ONNX Runtime | import onnxruntime as ort | img.to_numpy() | session.run(None, input_dict) |
| PyTorch | import torch | img.to_torch() | model(tensor.unsqueeze(0).cuda()) |
| OpenCV | import cv2 | img.to_numpy() | cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) |
| TensorFlow | import tensorflow as tf | img.to_numpy() | model(tf.convert_to_tensor(arr)) |
| JAX | import jax | img.to_jax() | model.apply(params, arr) |
Design Decisions
Why no ML wrapper library? HORUS provides zero-copy data types (Image, PointCloud, DepthImage) with direct interop to ML frameworks via to_numpy(), to_torch(), to_jax(). Adding a wrapper would hide the framework API, limit flexibility, and add maintenance burden as frameworks evolve. Instead, import your framework directly and use HORUS types as the data bridge.
Why ONNX Runtime recommended for production? ONNX Runtime provides consistent cross-platform inference with hardware acceleration (CUDA, TensorRT, OpenVINO) and does not require a full training framework at runtime. PyTorch models export to ONNX via torch.onnx.export(), giving you PyTorch for training and ONNX RT for deployment.
Why compute=True for CPU inference instead of async? ML inference is CPU-bound (or GPU-bound), not I/O-bound. Async nodes are designed for I/O waits (HTTP, database). The compute=True flag runs the node on a compute thread pool and releases the GIL during C extension calls (NumPy, ONNX, PyTorch), giving better throughput than async for number-crunching workloads.
Why set budget for inference nodes? ML inference time varies with input complexity (more detections = slower NMS). Setting a budget (e.g., budget=50 * horus.ms) lets the scheduler detect when inference exceeds its time allocation and take action (on_miss="skip" drops the frame, on_miss="warn" logs it). This prevents a slow model from starving downstream control nodes.
See Also
- Image, PointCloud, DepthImage — zero-copy types
- Async Nodes — Non-blocking inference with
async def - Perception Types — Detection, BoundingBox, Landmark types
- Image — Camera image type for ML pipelines
- Detection — Object detection output types