Vision Messages

Vision messages carry image data, point clouds, depth maps, and camera calibration. The pool-backed types (Image, PointCloud, DepthImage) use zero-copy shared memory for maximum throughput — no serialization overhead when passing images between nodes.

from horus import (
    Image, PointCloud, DepthImage,
    CameraInfo, CompressedImage,
    RegionOfInterest, StereoInfo,
)

Image (Pool-Backed)

Zero-copy image with numpy and PyTorch interop. Memory lives in shared memory — passing an Image between nodes costs ~3us regardless of resolution.

Constructor

img = Image(height=480, width=640, encoding=0)  # RGB8

Fields

FieldTypeDescription
heightintImage height in pixels
widthintImage width in pixels
encodingintPixel format (0=RGB8, 1=BGR8, 2=RGBA8, 3=GRAY8, 4=GRAY16, 5=DEPTH32F)
timestamp_nsintCapture timestamp in nanoseconds

.from_numpy(array) — Create from Numpy Array

import numpy as np

pixels = np.zeros((480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(pixels)

Creates an Image from a numpy array. The array shape determines height, width, and channel count. Accepted shapes: (H, W, 3) for RGB/BGR, (H, W, 4) for RGBA, (H, W) for grayscale.

.to_numpy() — Convert to Numpy (Zero-Copy)

arr = img.to_numpy()
arr[100, 200] = [255, 0, 0]  # Set a red pixel

Returns a numpy array view of the image data. This is zero-copy when possible — the numpy array points directly at the shared memory buffer.

.to_torch() — Convert to PyTorch Tensor

tensor = img.to_torch()  # CHW format for PyTorch models

Returns a PyTorch tensor in CHW (Channel, Height, Width) format, which is what most PyTorch vision models expect. Values are float32 in range [0.0, 1.0].

Common mistake: Holding references to Image across ticks. Pool-backed images are recycled — the same memory slot may be reused on the next tick. Process the image within the current tick, or copy the data with img.to_numpy().copy().

Example — Image Processing Pipeline:

from horus import Image, Detection, Topic
import numpy as np

cam_topic = Topic(Image)
det_topic = Topic(Detection)

def detect_objects(node):
    img = cam_topic.recv(node)
    if img is None:
        return
    arr = img.to_numpy()          # Zero-copy view
    # Run your detector (YOLO, SSD, etc.)
    results = my_model.predict(arr)
    for r in results:
        det = Detection(class_name=r.label, confidence=r.score,
                        x=r.x, y=r.y, width=r.w, height=r.h)
        det_topic.send(det, node)

PointCloud (Pool-Backed)

Zero-copy 3D point cloud with field descriptors. Used for LiDAR data, stereo reconstruction, and depth camera output.

Constructor

cloud = PointCloud(num_points=1000)

Fields

FieldTypeDescription
num_pointsintNumber of 3D points
timestamp_nsintCapture timestamp in nanoseconds

.from_xyz(array) — Create from Numpy XYZ Array

import numpy as np
xyz = np.random.randn(1000, 3).astype(np.float32)
cloud = PointCloud.from_xyz(xyz)

Creates a point cloud from an Nx3 float32 numpy array. Each row is (x, y, z) in meters.

.point_count() — Number of Points

print(cloud.point_count())  # 1000

Example — Obstacle Detection from Point Cloud:

from horus import PointCloud, Topic
import numpy as np

cloud_topic = Topic(PointCloud)

def check_obstacles(node):
    cloud = cloud_topic.recv(node)
    if cloud is None:
        return
    xyz = cloud.to_numpy()  # Nx3 array
    distances = np.linalg.norm(xyz[:, :2], axis=1)  # 2D distance from robot
    close_points = np.sum(distances < 1.0)
    if close_points > 50:
        print(f"WARNING: {close_points} points within 1m!")

DepthImage (Pool-Backed)

Depth map from a depth camera (RealSense, Kinect, stereo). Each pixel stores a distance in meters.

Constructor

depth = DepthImage(height=480, width=640)

Fields

FieldTypeDescription
heightintImage height in pixels
widthintImage width in pixels
timestamp_nsintCapture timestamp in nanoseconds

.get_depth(u, v) — Depth at a Pixel

d = depth.get_depth(320, 240)  # Depth at image center (meters)

Returns the depth value at pixel (u, v) in meters, or None if the pixel is out of bounds or has no valid depth. Invalid depth is typically 0.0 or NaN.

.set_depth(u, v, value) — Set Depth at a Pixel

depth.set_depth(320, 240, 1.5)  # Set depth to 1.5m

.depth_statistics() — Summary Statistics

stats = depth.depth_statistics()  # min, max, mean, valid_count
print(f"Range: {stats[0]:.2f}m - {stats[1]:.2f}m, Mean: {stats[2]:.2f}m")
print(f"Valid pixels: {stats[3]} / {depth.width * depth.height}")

Returns (min, max, mean, valid_count) across all non-zero pixels. Use this for quick quality checks: if valid_count is low, the depth camera may be obstructed or miscalibrated.

Example — Depth-Based Floor Detection:

from horus import DepthImage, Topic

depth_topic = Topic(DepthImage)

def check_floor(node):
    depth = depth_topic.recv(node)
    if depth is None:
        return
    # Sample the bottom row of the image
    floor_depths = []
    for u in range(0, depth.width, 10):
        d = depth.get_depth(u, depth.height - 1)
        if d is not None and d > 0:
            floor_depths.append(d)
    if floor_depths:
        avg = sum(floor_depths) / len(floor_depths)
        print(f"Average floor distance: {avg:.2f}m")

CameraInfo

Camera intrinsic calibration parameters — the link between 2D pixel coordinates and 3D world coordinates.

Constructor

cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)

.focal_lengths() — Camera Focal Length

fx, fy = cam.focal_lengths()
print(f"Focal length: ({fx:.0f}, {fy:.0f}) pixels")

Returns (fx, fy) in pixels. The focal length determines how "zoomed in" the camera is — higher values = narrower field of view. In the pinhole camera model, a 3D point (X, Y, Z) projects to pixel (fxX/Z + cx, fyY/Z + cy).

.principal_point() — Optical Center

cx, cy = cam.principal_point()
print(f"Principal point: ({cx:.0f}, {cy:.0f})")

Returns (cx, cy) — the pixel where the optical axis hits the image sensor. Usually near the image center, but not exactly — calibration corrects for manufacturing imprecision.

.with_distortion_model(model) — Set Lens Distortion Type

cam = cam.with_distortion_model("plumb_bob")

Returns a new CameraInfo with the distortion model name. Common models:

  • "plumb_bob": Standard radial + tangential (5 coefficients)
  • "equidistant": Fisheye cameras
  • "none": Synthetic cameras (no distortion)

Example — 3D Projection from Depth:

from horus import CameraInfo, DepthImage, Point3

cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)

def pixel_to_3d(cam, depth, u, v):
    """Convert pixel (u,v) + depth to 3D point."""
    fx, fy = cam.focal_lengths()
    cx, cy = cam.principal_point()
    z = depth.get_depth(u, v)
    if z is None or z <= 0:
        return None
    x = (u - cx) * z / fx
    y = (v - cy) * z / fy
    return Point3(x=x, y=y, z=z)

CompressedImage

JPEG/PNG compressed image for network transmission. Smaller than raw Image but requires decompression.

Constructor

img = CompressedImage(format="jpeg", data=jpeg_bytes, width=640, height=480)

Fields

FieldTypeDescription
formatstrCompression format ("jpeg", "png")
databytesCompressed image data
widthintImage width in pixels
heightintImage height in pixels
timestamp_nsintCapture timestamp in nanoseconds

.format_str() — Get Format Name

print(img.format_str())  # "jpeg"

Use CompressedImage for network topics or when bandwidth is limited. Use Image for local processing (zero-copy, no decompression needed).

Example — Compress and Send:

from horus import Image, CompressedImage, Topic
import cv2
import numpy as np

cam_topic = Topic(Image)
net_topic = Topic(CompressedImage)

def compress_and_send(node):
    img = cam_topic.recv(node)
    if img is None:
        return
    arr = img.to_numpy()
    _, jpeg_data = cv2.imencode('.jpg', arr, [cv2.IMWRITE_JPEG_QUALITY, 80])
    compressed = CompressedImage(
        format="jpeg", data=jpeg_data.tobytes(),
        width=img.width, height=img.height
    )
    net_topic.send(compressed, node)

RegionOfInterest

Rectangular region in an image — for cropping detections or specifying areas of interest.

Constructor

roi = RegionOfInterest(x=100, y=200, width=50, height=60)

Fields

FieldTypeDescription
xintLeft edge (pixels from left)
yintTop edge (pixels from top)
widthintRegion width in pixels
heightintRegion height in pixels

.contains(x, y) — Is a Pixel Inside?

if roi.contains(120, 220):
    print("Pixel is inside the ROI")

.area() — Region Size

print(f"ROI area: {roi.area()} pixels")  # 3000

Use case — Crop a detection from an image:

arr = img.to_numpy()
cropped = arr[roi.y:roi.y+roi.height, roi.x:roi.x+roi.width]

StereoInfo

Stereo camera pair calibration with depth-to-disparity conversion.

Constructor

stereo = StereoInfo(baseline=0.12, fx=525.0)

Fields

FieldTypeDescription
baselinefloatDistance between camera centers in meters
fxfloatHorizontal focal length in pixels

.depth_from_disparity(disparity) — Convert Disparity to Depth

stereo = StereoInfo(baseline=0.12, fx=525.0)  # 12cm between cameras
depth = stereo.depth_from_disparity(disparity=30.0)
# depth = baseline * fx / disparity

The fundamental stereo vision equation: Z = B * f / d. Given disparity (pixel offset between left and right images), compute real-world depth. Larger baseline = better depth accuracy at long range.

.disparity_from_depth(depth) — Convert Depth to Disparity

disp = stereo.disparity_from_depth(depth=2.0)
# Expected disparity for an object at 2 meters

The inverse — useful for setting depth range limits in stereo matching algorithms.

Example — Stereo Depth Range Check:

from horus import StereoInfo

stereo = StereoInfo(baseline=0.12, fx=525.0)

# What disparity range corresponds to 0.5m - 10m depth?
near_disp = stereo.disparity_from_depth(0.5)   # ~126 pixels
far_disp = stereo.disparity_from_depth(10.0)    # ~6.3 pixels
print(f"Search range: {far_disp:.0f} - {near_disp:.0f} pixels")

Design Decisions

Why pool-backed images instead of heap-allocated buffers? A 1080p RGB image is 6MB. Allocating and freeing 6MB at 30fps causes GC pressure and memory fragmentation. Pool-backed images recycle pre-allocated shared memory slots, eliminating allocation overhead. The tradeoff is a fixed pool size (configurable) and the requirement to process images within a single tick.

Why to_numpy() returns a view instead of a copy? Copying 6MB per frame at 30fps wastes 180MB/s of memory bandwidth. A view points directly at the shared memory buffer, so there is zero overhead. The tradeoff: the view is only valid during the current tick (the pool may recycle the slot on the next tick). If you need the data longer, call .copy() on the numpy array.

Why separate Image and CompressedImage? They serve different transport needs. Image is raw pixels in shared memory (zero-copy, ~3us transfer, local only). CompressedImage is JPEG/PNG bytes (requires encoding/decoding, smaller size, suitable for network transport). Using the wrong one wastes either bandwidth (raw over network) or CPU (decompressing locally).

Why does CameraInfo store focal length in pixels, not millimeters? Pixel-based focal lengths are what you need for the pinhole camera model: pixel_x = fx * X/Z + cx. Millimeter focal lengths require an additional conversion using sensor pixel size. Since calibration tools (OpenCV, ROS camera_calibration) output pixel-based values, storing them directly avoids a conversion step and a potential source of error.

Why does StereoInfo take baseline in meters? The stereo depth equation is Z = baseline * fx / disparity. Baseline must be in the same units as the desired depth output. Since horus uses meters for all spatial measurements, baseline is in meters. If your stereo camera spec lists baseline in centimeters or millimeters, convert before construction.


See Also