ML Integration

Use ML frameworks directly in horus nodes — no wrapper library needed. Import PyTorch, ONNX Runtime, TensorFlow, or OpenCV and use them in your tick function.

Zero-Copy Interop Matrix

horus data types integrate with the Python ML ecosystem via three protocols: __array_interface__ (NumPy), __dlpack__ (universal), and __cuda_array_interface__ (GPU).

horus typeNumPyPyTorchJAXOpenCVONNX RT
Imageto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()via to_numpy()
PointCloudto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()
DepthImageto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()via to_numpy()

All conversions are zero-copy (~3μs constant time, regardless of data size). The Python side gets a view into horus shared memory — no pixel data is copied.

# simplified
img = node.recv("camera")

# Any of these — all zero-copy, all ~3μs:
np_arr = img.to_numpy()          # NumPy ndarray
tensor = img.to_torch()          # PyTorch tensor
jax_arr = img.to_jax()           # JAX array
dlpack = np.from_dlpack(img)     # DLPack protocol (979ns)

Performance: A 1920×1080 RGB image (6MB) takes 3μs to access as NumPy vs 178μs to copy — 59x faster. See Benchmarks for full numbers.

# simplified
import horus
import onnxruntime as ort
import numpy as np

session = ort.InferenceSession("yolov8n.onnx", providers=["CUDAExecutionProvider"])

def detect(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_numpy()
        img = img.astype(np.float32) / 255.0
        img = np.transpose(img, (2, 0, 1))[np.newaxis]  # HWC→NCHW
        output = session.run(None, {"images": img})
        node.send("detections", output[0])

horus.run(
    horus.Node(tick=detect, rate=30, subs=["camera"], pubs=["detections"], order=0),
)

PyTorch

# simplified
import horus
import torch

model = torch.jit.load("resnet50.pt", map_location="cuda:0")
model.eval()

def classify(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_torch()  # Zero-copy to PyTorch tensor
        with torch.no_grad():
            output = model(img.unsqueeze(0).cuda())
        class_id = output.argmax(dim=1).item()
        node.send("class", {"id": class_id, "confidence": output.max().item()})

horus.run(
    horus.Node(tick=classify, rate=10, subs=["camera"], pubs=["class"]),
)

OpenCV

# simplified
import horus
import cv2
import numpy as np

def process_frame(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_numpy()
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        result = horus.Image.from_numpy(edges)
        node.send("edges", result)

horus.run(
    horus.Node(tick=process_frame, rate=30, subs=["camera"], pubs=["edges"]),
)

TensorFlow / TFLite

# simplified
import horus
import tensorflow as tf

model = tf.saved_model.load("saved_model")

def infer(node):
    if node.has_msg("input"):
        data = node.recv("input")
        tensor = tf.convert_to_tensor(data, dtype=tf.float32)
        output = model(tensor)
        node.send("output", output.numpy())

horus.run(horus.Node(tick=infer, rate=10, subs=["input"], pubs=["output"]))

Performance Tips

  • Use compute=True for CPU-bound inference — runs on thread pool, releases GIL during C extension calls (NumPy, ONNX, PyTorch):
# simplified
horus.Node(tick=detect, rate=30, compute=True, ...)
  • Set realistic budget to detect slow inference:
# simplified
horus.Node(tick=detect, rate=30, budget=50 * horus.ms, on_miss="skip")
  • Use horus.Image.to_torch() for zero-copy GPU transfer — no pixel data copied.

  • Batch with recv_all() if messages queue up:

# simplified
def batch_infer(node):
    frames = node.recv_all("camera")
    if frames:
        batch = np.stack([f.to_numpy() for f in frames])
        outputs = session.run(None, {"images": batch})
        for det in outputs[0]:
            node.send("detections", det)

GPU Memory Management

Critical for Jetson and other embedded devices with 4-8GB shared RAM between CPU and GPU.

# simplified
import torch

# Limit GPU memory on embedded devices
torch.cuda.set_per_process_memory_fraction(0.5)  # Use max 50% of VRAM

# Always use no_grad for inference
with torch.no_grad():
    output = model(input_tensor)

# Periodically clear cache
torch.cuda.empty_cache()
  • Prefer FP16 or INT8 quantized models on embedded
  • Monitor with torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()

Error Handling

# simplified
def my_init(node):
    try:
        model = torch.load("model.pt", map_location="cuda")
    except FileNotFoundError:
        node.log_error("Model file not found — running without ML")
        model = None
    except RuntimeError as e:
        if "CUDA out of memory" in str(e):
            node.log_error("GPU OOM — try a smaller model or reduce batch size")
            model = None
        else:
            raise

Model Warmup

First inference is 10-100x slower than steady-state due to CUDA kernel compilation and memory allocation. Run a dummy inference in init() before the tick loop:

# simplified
def my_init(node):
    model.eval()
    dummy = torch.zeros(1, 3, 640, 640).cuda()
    with torch.no_grad():
        model(dummy)  # warmup — first call compiles CUDA kernels
    node.log_info('Model warmed up')

Quick Reference

FrameworkImportZero-Copy From ImageInference Pattern
ONNX Runtimeimport onnxruntime as ortimg.to_numpy()session.run(None, input_dict)
PyTorchimport torchimg.to_torch()model(tensor.unsqueeze(0).cuda())
OpenCVimport cv2img.to_numpy()cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
TensorFlowimport tensorflow as tfimg.to_numpy()model(tf.convert_to_tensor(arr))
JAXimport jaximg.to_jax()model.apply(params, arr)

Design Decisions

Why no ML wrapper library? HORUS provides zero-copy data types (Image, PointCloud, DepthImage) with direct interop to ML frameworks via to_numpy(), to_torch(), to_jax(). Adding a wrapper would hide the framework API, limit flexibility, and add maintenance burden as frameworks evolve. Instead, import your framework directly and use HORUS types as the data bridge.

Why ONNX Runtime recommended for production? ONNX Runtime provides consistent cross-platform inference with hardware acceleration (CUDA, TensorRT, OpenVINO) and does not require a full training framework at runtime. PyTorch models export to ONNX via torch.onnx.export(), giving you PyTorch for training and ONNX RT for deployment.

Why compute=True for CPU inference instead of async? ML inference is CPU-bound (or GPU-bound), not I/O-bound. Async nodes are designed for I/O waits (HTTP, database). The compute=True flag runs the node on a compute thread pool and releases the GIL during C extension calls (NumPy, ONNX, PyTorch), giving better throughput than async for number-crunching workloads.

Why set budget for inference nodes? ML inference time varies with input complexity (more detections = slower NMS). Setting a budget (e.g., budget=50 * horus.ms) lets the scheduler detect when inference exceeds its time allocation and take action (on_miss="skip" drops the frame, on_miss="warn" logs it). This prevents a slow model from starving downstream control nodes.

See Also