Python Perception Types
Types for computer vision pipelines — object detection, tracking, pose estimation, and point cloud processing.
Quick example — publish YOLO detection results:
# simplified
import horus
from horus import Image, Detection, DetectionList, BoundingBox2D, Topic
model = load_yolo("model.pt")
sub_image = Topic(Image, "camera.rgb")
pub_detections = Topic(DetectionList, "detections")
def detector_tick(node):
img = sub_image.recv()
if img is not None:
results = model.predict(img.to_numpy())
detections = DetectionList()
for r in results:
det = Detection(
label=r.class_name,
confidence=r.score,
bbox=BoundingBox2D(
x=r.x, y=r.y,
width=r.w, height=r.h,
),
)
detections.add(det)
pub_detections.send(detections)
detector = horus.Node(name="detector", tick=detector_tick, rate=30,
subs=["camera.rgb"], pubs=["detections"])
# simplified
from horus import (
BoundingBox2D, Detection, DetectionList,
PointXYZ, PointXYZRGB, PointCloudBuffer,
Landmark, Landmark3D, LandmarkArray,
TrackedObject, COCOPose,
)
BoundingBox2D
2D bounding box in pixel coordinates.
# simplified
bbox = BoundingBox2D(x=10.0, y=20.0, width=100.0, height=200.0)
bbox = BoundingBox2D.from_center(cx=60.0, cy=120.0, width=100.0, height=200.0)
| Property / Method | Returns | Description |
|---|---|---|
.x, .y, .width, .height | float | Box coordinates |
.center_x(), .center_y() | float | Center point |
.area() | float | Area in pixels² |
.iou(other) | float | Intersection over Union |
.as_tuple() | (x, y, w, h) | XYWH format |
.as_xyxy() | (x1, y1, x2, y2) | Corner format |
Detection
2D object detection result.
# simplified
det = Detection(class_name="person", confidence=0.95,
x=10.0, y=20.0, width=100.0, height=200.0)
det = Detection.from_bbox(bbox, class_name="car", confidence=0.87)
| Property / Method | Returns | Description |
|---|---|---|
.bbox | BoundingBox2D | Bounding box |
.confidence | float | Detection confidence (0-1) |
.class_id | int | Numeric class identifier |
.class_name | str | Class label string |
.instance_id | int | Instance tracking ID |
.is_confident(threshold) | bool | Check if above threshold |
.to_bytes() / .from_bytes(data) | bytes / Detection | Serialization |
DetectionList
Filterable collection of detections with iteration support.
# simplified
detections = DetectionList()
detections.append(Detection("person", 0.95, 10, 20, 100, 200))
detections.append(Detection("car", 0.72, 300, 150, 80, 60))
detections.append(Detection("person", 0.45, 500, 100, 90, 180))
# Filter by confidence
confident = detections.filter_confidence(0.7) # 2 detections
# Filter by class
people = detections.filter_class("person") # 2 detections
# Iterate
for det in detections:
print(f"{det.class_name}: {det.confidence:.2f}")
# Index access
first = detections[0]
count = len(detections)
# Convert to dicts (for JSON/logging)
dicts = detections.to_dicts()
# Serialization
data = detections.to_bytes()
restored = DetectionList.from_bytes(data)
| Method | Returns | Description |
|---|---|---|
.append(det) | — | Add a detection |
.filter_confidence(threshold) | DetectionList | Keep detections above threshold |
.filter_class(name) | DetectionList | Keep only matching class |
.to_dicts() | list[dict] | Convert to list of Python dicts |
.to_bytes() / .from_bytes(data) | bytes / DetectionList | Serialization |
len(detections) | int | Number of detections |
detections[i] | Detection | Index access |
for det in detections | — | Iteration |
PointXYZ / PointXYZRGB
Individual 3D point types.
# simplified
point = PointXYZ(x=1.0, y=2.0, z=3.0)
print(point.distance()) # Distance from origin
print(point.distance_to(other_point)) # Distance between points
np_arr = point.to_numpy() # [1.0, 2.0, 3.0]
colored = PointXYZRGB(x=1.0, y=2.0, z=3.0, r=255, g=0, b=0)
print(colored.rgb()) # (255, 0, 0)
print(colored.xyz()) # PointXYZ(1.0, 2.0, 3.0)
PointCloudBuffer
Mutable point cloud buffer for building point clouds incrementally.
# simplified
buffer = PointCloudBuffer(capacity=10000, frame_id="lidar_front")
# Add points one at a time
buffer.add_point(1.0, 2.0, 3.0)
buffer.add_point(4.0, 5.0, 6.0)
# From NumPy — shape (N, 3)
buffer = PointCloudBuffer.from_numpy(np_points, frame_id="lidar")
# Access
point = buffer[0] # PointXYZ
count = len(buffer)
np_arr = buffer.to_numpy() # Shape (N, 3)
data = buffer.to_bytes() # Serialization
TrackedObject
Object with tracking state (for multi-object tracking pipelines).
# simplified
tracked = TrackedObject(
track_id=42,
bbox=BoundingBox2D(10, 20, 100, 200),
class_name="person",
confidence=0.95,
)
| Property / Method | Returns | Description |
|---|---|---|
.track_id | int | Unique track identifier |
.bbox | BoundingBox2D | Current bounding box |
.confidence | float | Detection confidence |
.class_id / .class_name | int / str | Class info |
.velocity | (float, float) | Estimated (vx, vy) in pixels/frame |
.speed() | float | Speed magnitude |
.age | int | Frames since creation |
.hits | int | Successful detections |
.is_tentative() | bool | Not yet confirmed |
.is_confirmed() | bool | Track is confirmed |
.is_deleted() | bool | Track marked for deletion |
.update(bbox, confidence) | — | Update with new detection |
.mark_missed() | — | No detection this frame |
.confirm() / .delete() | — | State transitions |
Tracking Pipeline Example
# simplified
from horus import DetectionList, TrackedObject, Topic
tracks: dict[int, TrackedObject] = {}
def tracker_tick(node):
detections = det_topic.recv()
if not detections:
return
# Simple nearest-neighbor matching
matched = match_detections(tracks, detections)
for track_id, det in matched.items():
tracks[track_id].update(det.bbox, det.confidence)
for track_id in unmatched_tracks:
tracks[track_id].mark_missed()
if tracks[track_id].age > 30:
tracks[track_id].delete()
COCOPose
Constants for COCO 17-keypoint pose estimation.
# simplified
from horus import COCOPose
# Keypoint indices
COCOPose.NOSE # 0
COCOPose.LEFT_EYE # 1
COCOPose.RIGHT_EYE # 2
COCOPose.LEFT_EAR # 3
COCOPose.RIGHT_EAR # 4
COCOPose.LEFT_SHOULDER # 5
COCOPose.RIGHT_SHOULDER # 6
COCOPose.LEFT_ELBOW # 7
COCOPose.RIGHT_ELBOW # 8
COCOPose.LEFT_WRIST # 9
COCOPose.RIGHT_WRIST # 10
COCOPose.LEFT_HIP # 11
COCOPose.RIGHT_HIP # 12
COCOPose.LEFT_KNEE # 13
COCOPose.RIGHT_KNEE # 14
COCOPose.LEFT_ANKLE # 15
COCOPose.RIGHT_ANKLE # 16
COCOPose.NUM_KEYPOINTS # 17
Pose Estimation Example
# simplified
from horus import Landmark, LandmarkArray, COCOPose
# Create landmarks from pose model output
landmarks = LandmarkArray(num_landmarks=17, dimension=2)
landmarks.confidence = 0.92
# Access specific keypoints
nose = Landmark(x=320.0, y=200.0, visibility=0.99, index=COCOPose.NOSE)
left_wrist = Landmark(x=450.0, y=380.0, visibility=0.85, index=COCOPose.LEFT_WRIST)
if nose.is_visible(0.5) and left_wrist.is_visible(0.5):
dist = nose.distance_to(left_wrist)
print(f"Nose to wrist: {dist:.1f}px")
Error Handling
Perception types raise standard Python exceptions:
| Operation | Exception | When |
|---|---|---|
Detection(confidence=-1) | ValueError | Confidence outside 0.0–1.0 |
BoundingBox2D(width=-5) | ValueError | Negative dimensions |
detections[99] on 3-item list | IndexError | Index out of range |
TrackedObject.update(None, 0.5) | TypeError | Wrong argument type |
DetectionList.from_bytes(bad_data) | ValueError | Corrupt or incompatible bytes |
PointCloudBuffer(capacity=0) | ValueError | Zero capacity |
# simplified
# Safe detection pipeline
try:
detections = DetectionList.from_bytes(raw_data)
confident = detections.filter_confidence(0.7)
except ValueError as e:
print(f"Bad detection data: {e}")
confident = DetectionList() # Empty fallback
Design Decisions
Why is Detection immutable but DetectionList is mutable? A detection is a single observation from a model — changing its confidence or bounding box after creation would make debugging impossible ("where did this value come from?"). The list, however, needs filtering, appending, and iteration as detections flow through the pipeline. Immutable atoms in a mutable container is a common pattern in data processing.
Why DetectionList instead of a plain Python list? DetectionList provides domain-specific operations (filter_confidence, filter_class, to_bytes) and efficient serialization for IPC. A plain list would require manual filtering and custom serialization code in every node. The wrapper keeps pipeline code concise: detections.filter_confidence(0.7).filter_class("person").
Why does TrackedObject have explicit state transitions (confirm, delete, mark_missed)? Object tracking requires lifecycle management — a detection must be seen multiple times before it's trusted ("confirmed"), and must be missing for several frames before it's removed ("deleted"). Explicit state methods make the lifecycle visible in code and prevent silent state corruption from ad-hoc flag setting. The states follow the standard SORT/DeepSORT tracking pattern.
Why separate PointXYZ and PointCloudBuffer? PointXYZ is a value type for individual point operations (distance, transform). PointCloudBuffer is a container optimized for bulk operations (NumPy conversion, serialization). Trying to make one type serve both roles would compromise the API — individual point access would be slow on bulk containers, and bulk operations would be awkward on individual points.
See Also
- Python Message Library — All 55+ message types
- Image, PointCloud, DepthImage
- ML Utilities — ML framework integration