Python Memory Types
HORUS provides pool-backed memory types for high-performance image, point cloud, and tensor data. These types allocate from a global tensor pool and support zero-copy interop with NumPy, PyTorch, and JAX via the DLPack protocol.
Image
Pool-backed camera image with zero-copy framework conversions.
Creating Images
from horus import Image
# Create empty RGB image (height, width, encoding)
img = Image(height=480, width=640, encoding="rgb8")
# From NumPy array (zero-copy when possible)
import numpy as np
pixels = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(pixels, encoding="rgb8")
# From PyTorch tensor
import torch
tensor = torch.zeros(480, 640, 3, dtype=torch.uint8)
img = Image.from_torch(tensor, encoding="rgb8")
# From raw bytes
img = Image.from_bytes(raw_data, height=480, width=640, encoding="rgb8")
Supported Encodings
| Encoding | Channels | Bytes/Pixel | Description |
|---|---|---|---|
"mono8" | 1 | 1 | 8-bit grayscale |
"mono16" | 1 | 2 | 16-bit grayscale |
"rgb8" | 3 | 3 | 8-bit RGB |
"bgr8" | 3 | 3 | 8-bit BGR (OpenCV) |
"rgba8" | 4 | 4 | 8-bit RGBA |
"bgra8" | 4 | 4 | 8-bit BGRA |
"yuv422" | 2 | 2 | YUV 4:2:2 |
"mono32f" | 1 | 4 | 32-bit float mono |
"rgb32f" | 3 | 12 | 32-bit float RGB |
"bayer_rggb8" | 1 | 1 | Bayer raw |
"depth16" | 1 | 2 | 16-bit depth (mm) |
Properties
img.height # Image height in pixels
img.width # Image width in pixels
img.channels # Number of channels (e.g., 3 for RGB)
img.encoding # Encoding string (e.g., "rgb8")
img.dtype # Data type string
img.nbytes # Total data size in bytes
img.step # Row stride in bytes
img.frame_id # Sensor frame identifier
img.timestamp_ns # Timestamp in nanoseconds
Framework Conversions (Zero-Copy)
# To NumPy — zero-copy, shared memory
np_array = img.to_numpy() # Shape: (H, W, C) for color, (H, W) for mono
# To PyTorch — zero-copy via DLPack
torch_tensor = img.to_torch()
# To JAX — zero-copy via DLPack
jax_array = img.to_jax()
Pixel Access
# Read pixel at (x, y)
pixel = img.pixel(320, 240) # Returns list, e.g., [128, 64, 255]
# Write pixel
img.set_pixel(320, 240, [255, 0, 0]) # Red pixel
# Fill entire image with a color
img.fill([0, 0, 0]) # Black
# Copy data from bytes
img.copy_from(raw_bytes)
# Extract region of interest (raw bytes)
roi_data = img.roi(x=100, y=100, w=200, h=200)
Metadata
img.set_frame_id("camera_front")
img.set_timestamp_ns(1234567890)
# Device info
img.is_cpu() # True (always CPU-backed currently)
DLPack Protocol
Image implements the DLPack protocol for framework-agnostic zero-copy:
# NumPy array protocol
np_array = np.asarray(img) # Uses __array_interface__
# DLPack (PyTorch, JAX, CuPy, etc.)
capsule = img.__dlpack__()
device = img.__dlpack_device__()
PointCloud
Pool-backed 3D point cloud with zero-copy ML framework interop.
Creating Point Clouds
from horus import PointCloud
# Create XYZ point cloud (num_points, fields_per_point, dtype)
cloud = PointCloud(num_points=10000, fields=3, dtype="float32")
# From NumPy array — shape (N, F) where F = fields per point
import numpy as np
points = np.random.randn(10000, 3).astype(np.float32)
cloud = PointCloud.from_numpy(points)
# From PyTorch tensor
import torch
tensor = torch.randn(10000, 3)
cloud = PointCloud.from_torch(tensor)
Properties
cloud.point_count # Number of points
cloud.fields_per_point # Floats per point (3=XYZ, 4=XYZI, 6=XYZRGB)
cloud.dtype # Data type string
cloud.nbytes # Total data size in bytes
cloud.frame_id # Sensor frame identifier
cloud.timestamp_ns # Timestamp in nanoseconds
# Point format queries
cloud.is_xyz() # True if 3 fields (XYZ)
cloud.has_intensity() # True if 4+ fields (XYZI)
cloud.has_color() # True if 6+ fields (XYZRGB)
Framework Conversions
# To NumPy — shape (N, F), zero-copy
np_points = cloud.to_numpy()
# To PyTorch — zero-copy via DLPack
torch_points = cloud.to_torch()
# To JAX — zero-copy via DLPack
jax_points = cloud.to_jax()
Point Access
# Get i-th point as list of floats (float32 clouds only)
point = cloud.point_at(0) # e.g., [1.0, 2.0, 3.0]
Metadata and DLPack
cloud.set_frame_id("lidar_front")
cloud.set_timestamp_ns(1234567890)
cloud.is_cpu() # True (always CPU-backed currently)
# DLPack protocol
capsule = cloud.__dlpack__()
DepthImage
Pool-backed depth image supporting F32 (meters) and U16 (millimeters) formats.
Creating Depth Images
from horus import DepthImage
# Create F32 depth image (meters)
depth = DepthImage(height=480, width=640, dtype="float32")
# Create U16 depth image (millimeters)
depth_u16 = DepthImage(height=480, width=640, dtype="uint16")
# From NumPy — shape (H, W)
import numpy as np
depth_data = np.random.uniform(0.5, 5.0, (480, 640)).astype(np.float32)
depth = DepthImage.from_numpy(depth_data)
# From PyTorch
import torch
depth = DepthImage.from_torch(torch.randn(480, 640))
Properties
depth.height # Image height
depth.width # Image width
depth.dtype # "float32" or "uint16"
depth.nbytes # Total data size
depth.frame_id # Camera frame identifier
depth.timestamp_ns # Timestamp
depth.depth_scale # Scale factor
depth.is_meters() # True if F32 (meters)
depth.is_millimeters() # True if U16 (millimeters)
Depth Access
# Get depth at pixel (always returns meters as float)
d = depth.get_depth(320, 240)
print(f"Depth at center: {d:.3f}m")
# Set depth at pixel (value in meters)
depth.set_depth(100, 100, 1.5)
# Get statistics (min, max, mean) — None if no valid data
stats = depth.depth_statistics()
if stats:
min_d, max_d, mean_d = stats
print(f"Range: {min_d:.2f}-{max_d:.2f}m, mean: {mean_d:.2f}m")
Framework Conversions
np_depth = depth.to_numpy() # Shape: (H, W)
torch_depth = depth.to_torch()
jax_depth = depth.to_jax()
TensorHandle
Low-level tensor handle for advanced zero-copy scenarios. Most users should use Image, PointCloud, or DepthImage instead.
Creating Tensors
from horus import TensorPool, TensorHandle
# Allocate from pool
pool = TensorPool(pool_id=1, size_mb=1024, max_slots=1024)
tensor = pool.alloc(shape=[480, 640, 3], dtype="float32", device="cpu")
# Import from DLPack (any framework)
import torch
t = torch.randn(100, 100)
tensor = TensorHandle.from_dlpack(t)
Properties
tensor.shape # List of dimensions, e.g., [480, 640, 3]
tensor.dtype # Data type string, e.g., "float32"
tensor.device # Device string, e.g., "cpu"
tensor.numel # Total number of elements
tensor.nbytes # Total size in bytes
tensor.refcount # Reference count in the pool
Framework Conversions
np_array = tensor.numpy() # Zero-copy to NumPy
torch_t = tensor.torch() # Zero-copy to PyTorch
Tensor Operations
# Reshape (must be contiguous, same total elements)
reshaped = tensor.view([480 * 640, 3])
# Slice along first dimension
sliced = tensor.slice(0, 100) # First 100 rows
# Check properties
tensor.is_cpu()
tensor.is_contiguous()
# Explicit cleanup (optional — automatic on drop)
tensor.release()
TensorPool
from horus import TensorPool
pool = TensorPool(pool_id=1, size_mb=1024, max_slots=1024)
# Allocate tensors
t1 = pool.alloc([480, 640, 3], dtype="float32")
t2 = pool.alloc([1000, 3], dtype="float32")
# Pool statistics
stats = pool.stats()
print(f"Allocated: {stats['allocated_slots']} slots")
print(f"Used: {stats['used_bytes']} bytes")
print(f"Free: {stats['free_bytes']} bytes")
Usage with Topics
All memory types work seamlessly with typed topics for zero-copy IPC:
from horus import Topic, Image, PointCloud, DepthImage
import numpy as np
# Publish an image
img_topic = Topic(Image)
img = Image.from_numpy(np.zeros((480, 640, 3), dtype=np.uint8), encoding="rgb8")
img_topic.send(img)
# Receive an image
received = img_topic.recv()
if received:
np_img = received.to_numpy() # Zero-copy access
print(f"Received {received.width}x{received.height} image")
ML Pipeline Example
from horus import Node, Scheduler, Topic, Image
import numpy as np
def camera_tick(node):
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(frame, encoding="rgb8")
img.set_frame_id("camera_front")
img_topic.send(img)
def inference_tick(node):
img = img_topic.recv()
if img:
# Zero-copy to PyTorch for inference
tensor = img.to_torch() # No data copy!
# ... run model ...
img_topic = Topic(Image)
camera = Node(name="camera", tick=camera_tick, rate=30)
model = Node(name="model", tick=inference_tick, rate=30)
scheduler = Scheduler()
scheduler.add(camera, order=0)
scheduler.add(model, order=1)
scheduler.run()
See Also
- Python Bindings — Core Python API
- ML Utilities — ML framework integration
- Image API (Rust) — Rust Image reference
- PointCloud API (Rust) — Rust PointCloud reference
- DepthImage API (Rust) — Rust DepthImage reference