Vision Messages
Vision messages carry image data, point clouds, depth maps, and camera calibration. The pool-backed types (Image, PointCloud, DepthImage) use zero-copy shared memory for maximum throughput — no serialization overhead when passing images between nodes.
from horus import (
Image, PointCloud, DepthImage,
CameraInfo, CompressedImage,
RegionOfInterest, StereoInfo,
)
Image (Pool-Backed)
Zero-copy image with numpy and PyTorch interop. Memory lives in shared memory — passing an Image between nodes costs ~3us regardless of resolution.
Constructor
img = Image(height=480, width=640, encoding=0) # RGB8
Fields
| Field | Type | Description |
|---|---|---|
height | int | Image height in pixels |
width | int | Image width in pixels |
encoding | int | Pixel format (0=RGB8, 1=BGR8, 2=RGBA8, 3=GRAY8, 4=GRAY16, 5=DEPTH32F) |
timestamp_ns | int | Capture timestamp in nanoseconds |
.from_numpy(array) — Create from Numpy Array
import numpy as np
pixels = np.zeros((480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(pixels)
Creates an Image from a numpy array. The array shape determines height, width, and channel count. Accepted shapes: (H, W, 3) for RGB/BGR, (H, W, 4) for RGBA, (H, W) for grayscale.
.to_numpy() — Convert to Numpy (Zero-Copy)
arr = img.to_numpy()
arr[100, 200] = [255, 0, 0] # Set a red pixel
Returns a numpy array view of the image data. This is zero-copy when possible — the numpy array points directly at the shared memory buffer.
.to_torch() — Convert to PyTorch Tensor
tensor = img.to_torch() # CHW format for PyTorch models
Returns a PyTorch tensor in CHW (Channel, Height, Width) format, which is what most PyTorch vision models expect. Values are float32 in range [0.0, 1.0].
Common mistake: Holding references to Image across ticks. Pool-backed images are recycled — the same memory slot may be reused on the next tick. Process the image within the current tick, or copy the data with
img.to_numpy().copy().
Example — Image Processing Pipeline:
from horus import Image, Detection, Topic
import numpy as np
cam_topic = Topic(Image)
det_topic = Topic(Detection)
def detect_objects(node):
img = cam_topic.recv(node)
if img is None:
return
arr = img.to_numpy() # Zero-copy view
# Run your detector (YOLO, SSD, etc.)
results = my_model.predict(arr)
for r in results:
det = Detection(class_name=r.label, confidence=r.score,
x=r.x, y=r.y, width=r.w, height=r.h)
det_topic.send(det, node)
PointCloud (Pool-Backed)
Zero-copy 3D point cloud with field descriptors. Used for LiDAR data, stereo reconstruction, and depth camera output.
Constructor
cloud = PointCloud(num_points=1000)
Fields
| Field | Type | Description |
|---|---|---|
num_points | int | Number of 3D points |
timestamp_ns | int | Capture timestamp in nanoseconds |
.from_xyz(array) — Create from Numpy XYZ Array
import numpy as np
xyz = np.random.randn(1000, 3).astype(np.float32)
cloud = PointCloud.from_xyz(xyz)
Creates a point cloud from an Nx3 float32 numpy array. Each row is (x, y, z) in meters.
.point_count() — Number of Points
print(cloud.point_count()) # 1000
Example — Obstacle Detection from Point Cloud:
from horus import PointCloud, Topic
import numpy as np
cloud_topic = Topic(PointCloud)
def check_obstacles(node):
cloud = cloud_topic.recv(node)
if cloud is None:
return
xyz = cloud.to_numpy() # Nx3 array
distances = np.linalg.norm(xyz[:, :2], axis=1) # 2D distance from robot
close_points = np.sum(distances < 1.0)
if close_points > 50:
print(f"WARNING: {close_points} points within 1m!")
DepthImage (Pool-Backed)
Depth map from a depth camera (RealSense, Kinect, stereo). Each pixel stores a distance in meters.
Constructor
depth = DepthImage(height=480, width=640)
Fields
| Field | Type | Description |
|---|---|---|
height | int | Image height in pixels |
width | int | Image width in pixels |
timestamp_ns | int | Capture timestamp in nanoseconds |
.get_depth(u, v) — Depth at a Pixel
d = depth.get_depth(320, 240) # Depth at image center (meters)
Returns the depth value at pixel (u, v) in meters, or None if the pixel is out of bounds or has no valid depth. Invalid depth is typically 0.0 or NaN.
.set_depth(u, v, value) — Set Depth at a Pixel
depth.set_depth(320, 240, 1.5) # Set depth to 1.5m
.depth_statistics() — Summary Statistics
stats = depth.depth_statistics() # min, max, mean, valid_count
print(f"Range: {stats[0]:.2f}m - {stats[1]:.2f}m, Mean: {stats[2]:.2f}m")
print(f"Valid pixels: {stats[3]} / {depth.width * depth.height}")
Returns (min, max, mean, valid_count) across all non-zero pixels. Use this for quick quality checks: if valid_count is low, the depth camera may be obstructed or miscalibrated.
Example — Depth-Based Floor Detection:
from horus import DepthImage, Topic
depth_topic = Topic(DepthImage)
def check_floor(node):
depth = depth_topic.recv(node)
if depth is None:
return
# Sample the bottom row of the image
floor_depths = []
for u in range(0, depth.width, 10):
d = depth.get_depth(u, depth.height - 1)
if d is not None and d > 0:
floor_depths.append(d)
if floor_depths:
avg = sum(floor_depths) / len(floor_depths)
print(f"Average floor distance: {avg:.2f}m")
CameraInfo
Camera intrinsic calibration parameters — the link between 2D pixel coordinates and 3D world coordinates.
Constructor
cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)
.focal_lengths() — Camera Focal Length
fx, fy = cam.focal_lengths()
print(f"Focal length: ({fx:.0f}, {fy:.0f}) pixels")
Returns (fx, fy) in pixels. The focal length determines how "zoomed in" the camera is — higher values = narrower field of view. In the pinhole camera model, a 3D point (X, Y, Z) projects to pixel (fxX/Z + cx, fyY/Z + cy).
.principal_point() — Optical Center
cx, cy = cam.principal_point()
print(f"Principal point: ({cx:.0f}, {cy:.0f})")
Returns (cx, cy) — the pixel where the optical axis hits the image sensor. Usually near the image center, but not exactly — calibration corrects for manufacturing imprecision.
.with_distortion_model(model) — Set Lens Distortion Type
cam = cam.with_distortion_model("plumb_bob")
Returns a new CameraInfo with the distortion model name. Common models:
"plumb_bob": Standard radial + tangential (5 coefficients)"equidistant": Fisheye cameras"none": Synthetic cameras (no distortion)
Example — 3D Projection from Depth:
from horus import CameraInfo, DepthImage, Point3
cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)
def pixel_to_3d(cam, depth, u, v):
"""Convert pixel (u,v) + depth to 3D point."""
fx, fy = cam.focal_lengths()
cx, cy = cam.principal_point()
z = depth.get_depth(u, v)
if z is None or z <= 0:
return None
x = (u - cx) * z / fx
y = (v - cy) * z / fy
return Point3(x=x, y=y, z=z)
CompressedImage
JPEG/PNG compressed image for network transmission. Smaller than raw Image but requires decompression.
Constructor
img = CompressedImage(format="jpeg", data=jpeg_bytes, width=640, height=480)
Fields
| Field | Type | Description |
|---|---|---|
format | str | Compression format ("jpeg", "png") |
data | bytes | Compressed image data |
width | int | Image width in pixels |
height | int | Image height in pixels |
timestamp_ns | int | Capture timestamp in nanoseconds |
.format_str() — Get Format Name
print(img.format_str()) # "jpeg"
Use CompressedImage for network topics or when bandwidth is limited. Use Image for local processing (zero-copy, no decompression needed).
Example — Compress and Send:
from horus import Image, CompressedImage, Topic
import cv2
import numpy as np
cam_topic = Topic(Image)
net_topic = Topic(CompressedImage)
def compress_and_send(node):
img = cam_topic.recv(node)
if img is None:
return
arr = img.to_numpy()
_, jpeg_data = cv2.imencode('.jpg', arr, [cv2.IMWRITE_JPEG_QUALITY, 80])
compressed = CompressedImage(
format="jpeg", data=jpeg_data.tobytes(),
width=img.width, height=img.height
)
net_topic.send(compressed, node)
RegionOfInterest
Rectangular region in an image — for cropping detections or specifying areas of interest.
Constructor
roi = RegionOfInterest(x=100, y=200, width=50, height=60)
Fields
| Field | Type | Description |
|---|---|---|
x | int | Left edge (pixels from left) |
y | int | Top edge (pixels from top) |
width | int | Region width in pixels |
height | int | Region height in pixels |
.contains(x, y) — Is a Pixel Inside?
if roi.contains(120, 220):
print("Pixel is inside the ROI")
.area() — Region Size
print(f"ROI area: {roi.area()} pixels") # 3000
Use case — Crop a detection from an image:
arr = img.to_numpy()
cropped = arr[roi.y:roi.y+roi.height, roi.x:roi.x+roi.width]
StereoInfo
Stereo camera pair calibration with depth-to-disparity conversion.
Constructor
stereo = StereoInfo(baseline=0.12, fx=525.0)
Fields
| Field | Type | Description |
|---|---|---|
baseline | float | Distance between camera centers in meters |
fx | float | Horizontal focal length in pixels |
.depth_from_disparity(disparity) — Convert Disparity to Depth
stereo = StereoInfo(baseline=0.12, fx=525.0) # 12cm between cameras
depth = stereo.depth_from_disparity(disparity=30.0)
# depth = baseline * fx / disparity
The fundamental stereo vision equation: Z = B * f / d. Given disparity (pixel offset between left and right images), compute real-world depth. Larger baseline = better depth accuracy at long range.
.disparity_from_depth(depth) — Convert Depth to Disparity
disp = stereo.disparity_from_depth(depth=2.0)
# Expected disparity for an object at 2 meters
The inverse — useful for setting depth range limits in stereo matching algorithms.
Example — Stereo Depth Range Check:
from horus import StereoInfo
stereo = StereoInfo(baseline=0.12, fx=525.0)
# What disparity range corresponds to 0.5m - 10m depth?
near_disp = stereo.disparity_from_depth(0.5) # ~126 pixels
far_disp = stereo.disparity_from_depth(10.0) # ~6.3 pixels
print(f"Search range: {far_disp:.0f} - {near_disp:.0f} pixels")
Design Decisions
Why pool-backed images instead of heap-allocated buffers? A 1080p RGB image is 6MB. Allocating and freeing 6MB at 30fps causes GC pressure and memory fragmentation. Pool-backed images recycle pre-allocated shared memory slots, eliminating allocation overhead. The tradeoff is a fixed pool size (configurable) and the requirement to process images within a single tick.
Why to_numpy() returns a view instead of a copy? Copying 6MB per frame at 30fps wastes 180MB/s of memory bandwidth. A view points directly at the shared memory buffer, so there is zero overhead. The tradeoff: the view is only valid during the current tick (the pool may recycle the slot on the next tick). If you need the data longer, call .copy() on the numpy array.
Why separate Image and CompressedImage? They serve different transport needs. Image is raw pixels in shared memory (zero-copy, ~3us transfer, local only). CompressedImage is JPEG/PNG bytes (requires encoding/decoding, smaller size, suitable for network transport). Using the wrong one wastes either bandwidth (raw over network) or CPU (decompressing locally).
Why does CameraInfo store focal length in pixels, not millimeters? Pixel-based focal lengths are what you need for the pinhole camera model: pixel_x = fx * X/Z + cx. Millimeter focal lengths require an additional conversion using sensor pixel size. Since calibration tools (OpenCV, ROS camera_calibration) output pixel-based values, storing them directly avoids a conversion step and a potential source of error.
Why does StereoInfo take baseline in meters? The stereo depth equation is Z = baseline * fx / disparity. Baseline must be in the same units as the desired depth output. Since horus uses meters for all spatial measurements, baseline is in meters. If your stereo camera spec lists baseline in centimeters or millimeters, convert before construction.
See Also
- Perception Messages — Detection, SegmentationMask
- Geometry Messages — Point3 for 3D positions
- Sensor Messages — LaserScan, Imu for non-vision sensors
- Python Message Library — All 55+ message types overview