NumPy & Zero-Copy
HORUS transfers images, point clouds, and tensors between Rust and Python with zero memory copies. This page shows how.
The Three Paths
| Method | Latency | Copy? | Use When |
|---|---|---|---|
np.from_dlpack(img) | ~1.1μs | No | ML inference, GPU pipelines |
img.to_numpy() | ~3.0μs | No (SHM view) | General numpy processing |
np.array(img) / np.copy() | ~14μs | Yes | Need to modify data or hold past next recv() |
Image to NumPy
import horus
import numpy as np
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Zero-copy — returns a numpy view backed by shared memory
frame = img.to_numpy() # shape: (480, 640, 3), dtype: uint8
# ~3μs — no data movement
# Or use DLPack for maximum performance
frame = np.from_dlpack(img) # ~1.1μs — true zero-copy
# Process with numpy/OpenCV
mean_brightness = frame.mean()
node.send("brightness", {"value": float(mean_brightness)})
When Copies Happen
The zero-copy view is backed by the HORUS shared memory pool. It becomes invalid when:
- Next
recv()overwrites the slot — the ring buffer reuses memory. If you need to hold the frame across ticks, copy it:frame = img.to_numpy().copy() - You modify the array —
to_numpy()returns a read-only view. To modify, copy first:frame = img.to_numpy().copy(); frame[0,0] = 255 - You pass to a function that requires contiguous/owned memory — some libraries need owned arrays
# SAFE: process immediately, don't hold across ticks
def tick(node):
img = node.recv("camera")
if img:
result = model.predict(img.to_numpy()) # Used immediately, no copy needed
# UNSAFE: holding reference across ticks
stored_frame = None
def tick(node):
global stored_frame
img = node.recv("camera")
if img:
stored_frame = img.to_numpy() # BAD — will be overwritten on next recv()
# SAFE: copy if you need to hold it
def tick(node):
global stored_frame
img = node.recv("camera")
if img:
stored_frame = img.to_numpy().copy() # OK — owned copy
Image to PyTorch
import torch
def inference_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Zero-copy to PyTorch tensor via DLPack
tensor = torch.from_dlpack(img) # (H, W, C) uint8 on CPU
# Move to GPU for inference
tensor = tensor.permute(2, 0, 1).unsqueeze(0).float() / 255.0
tensor = tensor.to("cuda")
with torch.no_grad():
output = model(tensor)
node.send("predictions", process_output(output))
PointCloud to NumPy
def lidar_tick(node):
cloud = node.recv("lidar.points")
if cloud is None:
return
# Zero-copy to numpy — shape depends on point type
points = cloud.to_numpy()
# XYZ: shape (N, 3), dtype float32
# XYZI: shape (N, 4), dtype float32
# XYZRGB: shape (N, 6), dtype float32
# Filter points within 5m range
distances = np.linalg.norm(points[:, :3], axis=1)
nearby = points[distances < 5.0]
node.send("nearby_points", {"count": len(nearby)})
DepthImage to NumPy
def depth_tick(node):
depth = node.recv("camera.depth")
if depth is None:
return
# Zero-copy — shape (H, W), dtype float32 (meters)
depth_map = depth.to_numpy()
# Find closest obstacle
valid = depth_map[depth_map > 0]
if len(valid) > 0:
min_dist = valid.min()
node.send("closest", {"distance_m": float(min_dist)})
Performance Summary
Data from Benchmarks page, measured on i9-14900K:
| Operation | Latency | Throughput |
|---|---|---|
np.from_dlpack() (640x480 RGB) | 1.1μs | 3.5M/s |
img.to_numpy() (640x480 RGB) | 3.0μs | 1.5M/s |
np.copy() (640x480 RGB) | 14.0μs | 334K/s |
Typed message send+recv (CmdVel) | 1.7μs | 2.7M/s |
| Dict send+recv (small) | 6.2μs | 714K/s |
DLPack is 13x faster than copying — it returns a numpy/torch array backed directly by the shared memory pool.
Complete ML Pipeline
import horus
import numpy as np
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Zero-copy to numpy (1.1μs via DLPack)
frame = np.from_dlpack(img)
# Run YOLO inference (~20-100ms)
results = model.predict(frame)
# Publish detections
for r in results:
node.send("detections", horus.Detection(
class_id=r.class_id,
class_name=r.class_name,
confidence=float(r.confidence),
bbox=horus.BoundingBox2D(
x_min=r.x1, y_min=r.y1, x_max=r.x2, y_max=r.y2,
),
))
detector = horus.Node(
name="yolo",
subs=[horus.Image],
pubs=[horus.Detection],
tick=detect_tick,
rate=30,
compute=True,
budget=50 * horus.ms,
on_miss="skip",
)
horus.run(detector, tick_rate=100)
See Also
- Image API — Image constructor, encoding, pool allocation
- PointCloud API — Point cloud types and formats
- DepthImage API — Depth map types
- GIL & Performance — Tick rate ceilings, optimization patterns
- Benchmarks — Full measured performance data