Python Memory Types

HORUS provides pool-backed memory types for high-performance image, point cloud, and tensor data. These types allocate from a global tensor pool and support zero-copy interop with NumPy, PyTorch, and JAX via the DLPack protocol.

Image

Pool-backed camera image with zero-copy framework conversions.

Creating Images

from horus import Image

# Create empty RGB image (height, width, encoding)
img = Image(height=480, width=640, encoding="rgb8")

# From NumPy array (zero-copy when possible)
import numpy as np
pixels = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(pixels, encoding="rgb8")

# From PyTorch tensor
import torch
tensor = torch.zeros(480, 640, 3, dtype=torch.uint8)
img = Image.from_torch(tensor, encoding="rgb8")

# From raw bytes
img = Image.from_bytes(raw_data, height=480, width=640, encoding="rgb8")

Supported Encodings

EncodingChannelsBytes/PixelDescription
"mono8"118-bit grayscale
"mono16"1216-bit grayscale
"rgb8"338-bit RGB
"bgr8"338-bit BGR (OpenCV)
"rgba8"448-bit RGBA
"bgra8"448-bit BGRA
"yuv422"22YUV 4:2:2
"mono32f"1432-bit float mono
"rgb32f"31232-bit float RGB
"bayer_rggb8"11Bayer raw
"depth16"1216-bit depth (mm)

Properties

img.height          # Image height in pixels
img.width           # Image width in pixels
img.channels        # Number of channels (e.g., 3 for RGB)
img.encoding        # Encoding string (e.g., "rgb8")
img.dtype           # Data type string
img.nbytes          # Total data size in bytes
img.step            # Row stride in bytes
img.frame_id        # Sensor frame identifier
img.timestamp_ns    # Timestamp in nanoseconds

Framework Conversions (Zero-Copy)

# To NumPy — zero-copy, shared memory
np_array = img.to_numpy()  # Shape: (H, W, C) for color, (H, W) for mono

# To PyTorch — zero-copy via DLPack
torch_tensor = img.to_torch()

# To JAX — zero-copy via DLPack
jax_array = img.to_jax()

Pixel Access

# Read pixel at (x, y)
pixel = img.pixel(320, 240)  # Returns list, e.g., [128, 64, 255]

# Write pixel
img.set_pixel(320, 240, [255, 0, 0])  # Red pixel

# Fill entire image with a color
img.fill([0, 0, 0])  # Black

# Copy data from bytes
img.copy_from(raw_bytes)

# Extract region of interest (raw bytes)
roi_data = img.roi(x=100, y=100, w=200, h=200)

Metadata

img.set_frame_id("camera_front")
img.set_timestamp_ns(1234567890)

# Device info
img.is_cpu()   # True (always CPU-backed currently)

DLPack Protocol

Image implements the DLPack protocol for framework-agnostic zero-copy:

# NumPy array protocol
np_array = np.asarray(img)  # Uses __array_interface__

# DLPack (PyTorch, JAX, CuPy, etc.)
capsule = img.__dlpack__()
device = img.__dlpack_device__()

PointCloud

Pool-backed 3D point cloud with zero-copy ML framework interop.

Creating Point Clouds

from horus import PointCloud

# Create XYZ point cloud (num_points, fields_per_point, dtype)
cloud = PointCloud(num_points=10000, fields=3, dtype="float32")

# From NumPy array — shape (N, F) where F = fields per point
import numpy as np
points = np.random.randn(10000, 3).astype(np.float32)
cloud = PointCloud.from_numpy(points)

# From PyTorch tensor
import torch
tensor = torch.randn(10000, 3)
cloud = PointCloud.from_torch(tensor)

Properties

cloud.point_count       # Number of points
cloud.fields_per_point  # Floats per point (3=XYZ, 4=XYZI, 6=XYZRGB)
cloud.dtype             # Data type string
cloud.nbytes            # Total data size in bytes
cloud.frame_id          # Sensor frame identifier
cloud.timestamp_ns      # Timestamp in nanoseconds

# Point format queries
cloud.is_xyz()          # True if 3 fields (XYZ)
cloud.has_intensity()   # True if 4+ fields (XYZI)
cloud.has_color()       # True if 6+ fields (XYZRGB)

Framework Conversions

# To NumPy — shape (N, F), zero-copy
np_points = cloud.to_numpy()

# To PyTorch — zero-copy via DLPack
torch_points = cloud.to_torch()

# To JAX — zero-copy via DLPack
jax_points = cloud.to_jax()

Point Access

# Get i-th point as list of floats (float32 clouds only)
point = cloud.point_at(0)  # e.g., [1.0, 2.0, 3.0]

Metadata and DLPack

cloud.set_frame_id("lidar_front")
cloud.set_timestamp_ns(1234567890)

cloud.is_cpu()   # True (always CPU-backed currently)

# DLPack protocol
capsule = cloud.__dlpack__()

DepthImage

Pool-backed depth image supporting F32 (meters) and U16 (millimeters) formats.

Creating Depth Images

from horus import DepthImage

# Create F32 depth image (meters)
depth = DepthImage(height=480, width=640, dtype="float32")

# Create U16 depth image (millimeters)
depth_u16 = DepthImage(height=480, width=640, dtype="uint16")

# From NumPy — shape (H, W)
import numpy as np
depth_data = np.random.uniform(0.5, 5.0, (480, 640)).astype(np.float32)
depth = DepthImage.from_numpy(depth_data)

# From PyTorch
import torch
depth = DepthImage.from_torch(torch.randn(480, 640))

Properties

depth.height        # Image height
depth.width         # Image width
depth.dtype         # "float32" or "uint16"
depth.nbytes        # Total data size
depth.frame_id      # Camera frame identifier
depth.timestamp_ns  # Timestamp
depth.depth_scale   # Scale factor

depth.is_meters()       # True if F32 (meters)
depth.is_millimeters()  # True if U16 (millimeters)

Depth Access

# Get depth at pixel (always returns meters as float)
d = depth.get_depth(320, 240)
print(f"Depth at center: {d:.3f}m")

# Set depth at pixel (value in meters)
depth.set_depth(100, 100, 1.5)

# Get statistics (min, max, mean) — None if no valid data
stats = depth.depth_statistics()
if stats:
    min_d, max_d, mean_d = stats
    print(f"Range: {min_d:.2f}-{max_d:.2f}m, mean: {mean_d:.2f}m")

Framework Conversions

np_depth = depth.to_numpy()    # Shape: (H, W)
torch_depth = depth.to_torch()
jax_depth = depth.to_jax()

TensorHandle

Low-level tensor handle for advanced zero-copy scenarios. Most users should use Image, PointCloud, or DepthImage instead.

Creating Tensors

from horus import TensorPool, TensorHandle

# Allocate from pool
pool = TensorPool(pool_id=1, size_mb=1024, max_slots=1024)
tensor = pool.alloc(shape=[480, 640, 3], dtype="float32", device="cpu")

# Import from DLPack (any framework)
import torch
t = torch.randn(100, 100)
tensor = TensorHandle.from_dlpack(t)

Properties

tensor.shape      # List of dimensions, e.g., [480, 640, 3]
tensor.dtype      # Data type string, e.g., "float32"
tensor.device     # Device string, e.g., "cpu"
tensor.numel      # Total number of elements
tensor.nbytes     # Total size in bytes
tensor.refcount   # Reference count in the pool

Framework Conversions

np_array = tensor.numpy()   # Zero-copy to NumPy
torch_t = tensor.torch()    # Zero-copy to PyTorch

Tensor Operations

# Reshape (must be contiguous, same total elements)
reshaped = tensor.view([480 * 640, 3])

# Slice along first dimension
sliced = tensor.slice(0, 100)  # First 100 rows

# Check properties
tensor.is_cpu()
tensor.is_contiguous()

# Explicit cleanup (optional — automatic on drop)
tensor.release()

TensorPool

from horus import TensorPool

pool = TensorPool(pool_id=1, size_mb=1024, max_slots=1024)

# Allocate tensors
t1 = pool.alloc([480, 640, 3], dtype="float32")
t2 = pool.alloc([1000, 3], dtype="float32")

# Pool statistics
stats = pool.stats()
print(f"Allocated: {stats['allocated_slots']} slots")
print(f"Used: {stats['used_bytes']} bytes")
print(f"Free: {stats['free_bytes']} bytes")

Usage with Topics

All memory types work seamlessly with typed topics for zero-copy IPC:

from horus import Topic, Image, PointCloud, DepthImage
import numpy as np

# Publish an image
img_topic = Topic(Image)
img = Image.from_numpy(np.zeros((480, 640, 3), dtype=np.uint8), encoding="rgb8")
img_topic.send(img)

# Receive an image
received = img_topic.recv()
if received:
    np_img = received.to_numpy()  # Zero-copy access
    print(f"Received {received.width}x{received.height} image")

ML Pipeline Example

from horus import Node, Scheduler, Topic, Image
import numpy as np

def camera_tick(node):
    frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    img = Image.from_numpy(frame, encoding="rgb8")
    img.set_frame_id("camera_front")
    img_topic.send(img)

def inference_tick(node):
    img = img_topic.recv()
    if img:
        # Zero-copy to PyTorch for inference
        tensor = img.to_torch()  # No data copy!
        # ... run model ...

img_topic = Topic(Image)
camera = Node(name="camera", tick=camera_tick, rate=30)
model = Node(name="model", tick=inference_tick, rate=30)

scheduler = Scheduler()
scheduler.add(camera, order=0)
scheduler.add(model, order=1)
scheduler.run()

See Also