Python Perception Types

Types for computer vision pipelines — object detection, tracking, pose estimation, and point cloud processing.

Quick example — publish YOLO detection results:

# simplified
import horus
from horus import Image, Detection, DetectionList, BoundingBox2D, Topic

model = load_yolo("model.pt")
sub_image = Topic(Image, "camera.rgb")
pub_detections = Topic(DetectionList, "detections")

def detector_tick(node):
    img = sub_image.recv()
    if img is not None:
        results = model.predict(img.to_numpy())

        detections = DetectionList()
        for r in results:
            det = Detection(
                label=r.class_name,
                confidence=r.score,
                bbox=BoundingBox2D(
                    x=r.x, y=r.y,
                    width=r.w, height=r.h,
                ),
            )
            detections.add(det)

        pub_detections.send(detections)

detector = horus.Node(name="detector", tick=detector_tick, rate=30,
                      subs=["camera.rgb"], pubs=["detections"])
# simplified
from horus import (
    BoundingBox2D, Detection, DetectionList,
    PointXYZ, PointXYZRGB, PointCloudBuffer,
    Landmark, Landmark3D, LandmarkArray,
    TrackedObject, COCOPose,
)

BoundingBox2D

2D bounding box in pixel coordinates.

# simplified
bbox = BoundingBox2D(x=10.0, y=20.0, width=100.0, height=200.0)
bbox = BoundingBox2D.from_center(cx=60.0, cy=120.0, width=100.0, height=200.0)
Property / MethodReturnsDescription
.x, .y, .width, .heightfloatBox coordinates
.center_x(), .center_y()floatCenter point
.area()floatArea in pixels²
.iou(other)floatIntersection over Union
.as_tuple()(x, y, w, h)XYWH format
.as_xyxy()(x1, y1, x2, y2)Corner format

Detection

2D object detection result.

# simplified
det = Detection(class_name="person", confidence=0.95,
                x=10.0, y=20.0, width=100.0, height=200.0)

det = Detection.from_bbox(bbox, class_name="car", confidence=0.87)
Property / MethodReturnsDescription
.bboxBoundingBox2DBounding box
.confidencefloatDetection confidence (0-1)
.class_idintNumeric class identifier
.class_namestrClass label string
.instance_idintInstance tracking ID
.is_confident(threshold)boolCheck if above threshold
.to_bytes() / .from_bytes(data)bytes / DetectionSerialization

DetectionList

Filterable collection of detections with iteration support.

# simplified
detections = DetectionList()
detections.append(Detection("person", 0.95, 10, 20, 100, 200))
detections.append(Detection("car", 0.72, 300, 150, 80, 60))
detections.append(Detection("person", 0.45, 500, 100, 90, 180))

# Filter by confidence
confident = detections.filter_confidence(0.7)  # 2 detections

# Filter by class
people = detections.filter_class("person")  # 2 detections

# Iterate
for det in detections:
    print(f"{det.class_name}: {det.confidence:.2f}")

# Index access
first = detections[0]
count = len(detections)

# Convert to dicts (for JSON/logging)
dicts = detections.to_dicts()

# Serialization
data = detections.to_bytes()
restored = DetectionList.from_bytes(data)
MethodReturnsDescription
.append(det)Add a detection
.filter_confidence(threshold)DetectionListKeep detections above threshold
.filter_class(name)DetectionListKeep only matching class
.to_dicts()list[dict]Convert to list of Python dicts
.to_bytes() / .from_bytes(data)bytes / DetectionListSerialization
len(detections)intNumber of detections
detections[i]DetectionIndex access
for det in detectionsIteration

PointXYZ / PointXYZRGB

Individual 3D point types.

# simplified
point = PointXYZ(x=1.0, y=2.0, z=3.0)
print(point.distance())                  # Distance from origin
print(point.distance_to(other_point))    # Distance between points
np_arr = point.to_numpy()                # [1.0, 2.0, 3.0]

colored = PointXYZRGB(x=1.0, y=2.0, z=3.0, r=255, g=0, b=0)
print(colored.rgb())    # (255, 0, 0)
print(colored.xyz())    # PointXYZ(1.0, 2.0, 3.0)

PointCloudBuffer

Mutable point cloud buffer for building point clouds incrementally.

# simplified
buffer = PointCloudBuffer(capacity=10000, frame_id="lidar_front")

# Add points one at a time
buffer.add_point(1.0, 2.0, 3.0)
buffer.add_point(4.0, 5.0, 6.0)

# From NumPy — shape (N, 3)
buffer = PointCloudBuffer.from_numpy(np_points, frame_id="lidar")

# Access
point = buffer[0]           # PointXYZ
count = len(buffer)
np_arr = buffer.to_numpy()  # Shape (N, 3)
data = buffer.to_bytes()    # Serialization

TrackedObject

Object with tracking state (for multi-object tracking pipelines).

# simplified
tracked = TrackedObject(
    track_id=42,
    bbox=BoundingBox2D(10, 20, 100, 200),
    class_name="person",
    confidence=0.95,
)
Property / MethodReturnsDescription
.track_idintUnique track identifier
.bboxBoundingBox2DCurrent bounding box
.confidencefloatDetection confidence
.class_id / .class_nameint / strClass info
.velocity(float, float)Estimated (vx, vy) in pixels/frame
.speed()floatSpeed magnitude
.ageintFrames since creation
.hitsintSuccessful detections
.is_tentative()boolNot yet confirmed
.is_confirmed()boolTrack is confirmed
.is_deleted()boolTrack marked for deletion
.update(bbox, confidence)Update with new detection
.mark_missed()No detection this frame
.confirm() / .delete()State transitions

Tracking Pipeline Example

# simplified
from horus import DetectionList, TrackedObject, Topic

tracks: dict[int, TrackedObject] = {}

def tracker_tick(node):
    detections = det_topic.recv()
    if not detections:
        return

    # Simple nearest-neighbor matching
    matched = match_detections(tracks, detections)

    for track_id, det in matched.items():
        tracks[track_id].update(det.bbox, det.confidence)

    for track_id in unmatched_tracks:
        tracks[track_id].mark_missed()
        if tracks[track_id].age > 30:
            tracks[track_id].delete()

COCOPose

Constants for COCO 17-keypoint pose estimation.

# simplified
from horus import COCOPose

# Keypoint indices
COCOPose.NOSE           # 0
COCOPose.LEFT_EYE       # 1
COCOPose.RIGHT_EYE      # 2
COCOPose.LEFT_EAR       # 3
COCOPose.RIGHT_EAR      # 4
COCOPose.LEFT_SHOULDER  # 5
COCOPose.RIGHT_SHOULDER # 6
COCOPose.LEFT_ELBOW     # 7
COCOPose.RIGHT_ELBOW    # 8
COCOPose.LEFT_WRIST     # 9
COCOPose.RIGHT_WRIST    # 10
COCOPose.LEFT_HIP       # 11
COCOPose.RIGHT_HIP      # 12
COCOPose.LEFT_KNEE      # 13
COCOPose.RIGHT_KNEE     # 14
COCOPose.LEFT_ANKLE     # 15
COCOPose.RIGHT_ANKLE    # 16
COCOPose.NUM_KEYPOINTS  # 17

Pose Estimation Example

# simplified
from horus import Landmark, LandmarkArray, COCOPose

# Create landmarks from pose model output
landmarks = LandmarkArray(num_landmarks=17, dimension=2)
landmarks.confidence = 0.92

# Access specific keypoints
nose = Landmark(x=320.0, y=200.0, visibility=0.99, index=COCOPose.NOSE)
left_wrist = Landmark(x=450.0, y=380.0, visibility=0.85, index=COCOPose.LEFT_WRIST)

if nose.is_visible(0.5) and left_wrist.is_visible(0.5):
    dist = nose.distance_to(left_wrist)
    print(f"Nose to wrist: {dist:.1f}px")

Error Handling

Perception types raise standard Python exceptions:

OperationExceptionWhen
Detection(confidence=-1)ValueErrorConfidence outside 0.0–1.0
BoundingBox2D(width=-5)ValueErrorNegative dimensions
detections[99] on 3-item listIndexErrorIndex out of range
TrackedObject.update(None, 0.5)TypeErrorWrong argument type
DetectionList.from_bytes(bad_data)ValueErrorCorrupt or incompatible bytes
PointCloudBuffer(capacity=0)ValueErrorZero capacity
# simplified
# Safe detection pipeline
try:
    detections = DetectionList.from_bytes(raw_data)
    confident = detections.filter_confidence(0.7)
except ValueError as e:
    print(f"Bad detection data: {e}")
    confident = DetectionList()  # Empty fallback

Design Decisions

Why is Detection immutable but DetectionList is mutable? A detection is a single observation from a model — changing its confidence or bounding box after creation would make debugging impossible ("where did this value come from?"). The list, however, needs filtering, appending, and iteration as detections flow through the pipeline. Immutable atoms in a mutable container is a common pattern in data processing.

Why DetectionList instead of a plain Python list? DetectionList provides domain-specific operations (filter_confidence, filter_class, to_bytes) and efficient serialization for IPC. A plain list would require manual filtering and custom serialization code in every node. The wrapper keeps pipeline code concise: detections.filter_confidence(0.7).filter_class("person").

Why does TrackedObject have explicit state transitions (confirm, delete, mark_missed)? Object tracking requires lifecycle management — a detection must be seen multiple times before it's trusted ("confirmed"), and must be missing for several frames before it's removed ("deleted"). Explicit state methods make the lifecycle visible in code and prevent silent state corruption from ad-hoc flag setting. The states follow the standard SORT/DeepSORT tracking pattern.

Why separate PointXYZ and PointCloudBuffer? PointXYZ is a value type for individual point operations (distance, transform). PointCloudBuffer is a container optimized for bulk operations (NumPy conversion, serialization). Trying to make one type serve both roles would compromise the API — individual point access would be slow on bulk containers, and bulk operations would be awkward on individual points.


See Also