NumPy & Zero-Copy

HORUS transfers images, point clouds, and tensors between Rust and Python with zero memory copies. This page shows how.


The Three Paths

MethodLatencyCopy?Use When
np.from_dlpack(img)~1.1μsNoML inference, GPU pipelines
img.to_numpy()~3.0μsNo (SHM view)General numpy processing
np.array(img) / np.copy()~14μsYesNeed to modify data or hold past next recv()

Image to NumPy

import horus
import numpy as np

def detect_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # Zero-copy — returns a numpy view backed by shared memory
    frame = img.to_numpy()  # shape: (480, 640, 3), dtype: uint8
    # ~3μs — no data movement

    # Or use DLPack for maximum performance
    frame = np.from_dlpack(img)  # ~1.1μs — true zero-copy

    # Process with numpy/OpenCV
    mean_brightness = frame.mean()
    node.send("brightness", {"value": float(mean_brightness)})

When Copies Happen

The zero-copy view is backed by the HORUS shared memory pool. It becomes invalid when:

  • Next recv() overwrites the slot — the ring buffer reuses memory. If you need to hold the frame across ticks, copy it: frame = img.to_numpy().copy()
  • You modify the arrayto_numpy() returns a read-only view. To modify, copy first: frame = img.to_numpy().copy(); frame[0,0] = 255
  • You pass to a function that requires contiguous/owned memory — some libraries need owned arrays
# SAFE: process immediately, don't hold across ticks
def tick(node):
    img = node.recv("camera")
    if img:
        result = model.predict(img.to_numpy())  # Used immediately, no copy needed

# UNSAFE: holding reference across ticks
stored_frame = None
def tick(node):
    global stored_frame
    img = node.recv("camera")
    if img:
        stored_frame = img.to_numpy()  # BAD — will be overwritten on next recv()

# SAFE: copy if you need to hold it
def tick(node):
    global stored_frame
    img = node.recv("camera")
    if img:
        stored_frame = img.to_numpy().copy()  # OK — owned copy

Image to PyTorch

import torch

def inference_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # Zero-copy to PyTorch tensor via DLPack
    tensor = torch.from_dlpack(img)  # (H, W, C) uint8 on CPU

    # Move to GPU for inference
    tensor = tensor.permute(2, 0, 1).unsqueeze(0).float() / 255.0
    tensor = tensor.to("cuda")

    with torch.no_grad():
        output = model(tensor)

    node.send("predictions", process_output(output))

PointCloud to NumPy

def lidar_tick(node):
    cloud = node.recv("lidar.points")
    if cloud is None:
        return

    # Zero-copy to numpy — shape depends on point type
    points = cloud.to_numpy()
    # XYZ: shape (N, 3), dtype float32
    # XYZI: shape (N, 4), dtype float32
    # XYZRGB: shape (N, 6), dtype float32

    # Filter points within 5m range
    distances = np.linalg.norm(points[:, :3], axis=1)
    nearby = points[distances < 5.0]

    node.send("nearby_points", {"count": len(nearby)})

DepthImage to NumPy

def depth_tick(node):
    depth = node.recv("camera.depth")
    if depth is None:
        return

    # Zero-copy — shape (H, W), dtype float32 (meters)
    depth_map = depth.to_numpy()

    # Find closest obstacle
    valid = depth_map[depth_map > 0]
    if len(valid) > 0:
        min_dist = valid.min()
        node.send("closest", {"distance_m": float(min_dist)})

Performance Summary

Data from Benchmarks page, measured on i9-14900K:

OperationLatencyThroughput
np.from_dlpack() (640x480 RGB)1.1μs3.5M/s
img.to_numpy() (640x480 RGB)3.0μs1.5M/s
np.copy() (640x480 RGB)14.0μs334K/s
Typed message send+recv (CmdVel)1.7μs2.7M/s
Dict send+recv (small)6.2μs714K/s

DLPack is 13x faster than copying — it returns a numpy/torch array backed directly by the shared memory pool.


Complete ML Pipeline

import horus
import numpy as np

def detect_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # Zero-copy to numpy (1.1μs via DLPack)
    frame = np.from_dlpack(img)

    # Run YOLO inference (~20-100ms)
    results = model.predict(frame)

    # Publish detections
    for r in results:
        node.send("detections", horus.Detection(
            class_id=r.class_id,
            class_name=r.class_name,
            confidence=float(r.confidence),
            bbox=horus.BoundingBox2D(
                x_min=r.x1, y_min=r.y1, x_max=r.x2, y_max=r.y2,
            ),
        ))

detector = horus.Node(
    name="yolo",
    subs=[horus.Image],
    pubs=[horus.Detection],
    tick=detect_tick,
    rate=30,
    compute=True,
    budget=50 * horus.ms,
    on_miss="skip",
)

horus.run(detector, tick_rate=100)

See Also