Python Message Library
HORUS provides 55+ typed message classes for robotics applications in Python. All types are binary-compatible with their Rust counterparts and communicate via zero-copy shared memory.
Availability
All message types are available via the Rust bindings:
from horus import (
# Geometry
CmdVel, Pose2D, Pose3D, Twist, Vector3, Point3, Quaternion,
TransformStamped, PoseStamped, PoseWithCovariance, TwistWithCovariance,
Accel, AccelStamped,
# Control
MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig,
TrajectoryPoint, JointCommand,
# Sensor
Imu, Odometry, LaserScan, JointState, BatteryState, RangeSensor,
NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance,
Clock, TimeReference,
# Diagnostics
Heartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage,
DiagnosticValue, DiagnosticReport, NodeHeartbeat, SafetyStatus,
# Force/Haptics
WrenchStamped, ForceCommand, ContactInfo,
ImpedanceParameters, HapticFeedback,
# Navigation
NavGoal, GoalResult, PathPlan, Waypoint, NavPath,
VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap,
# Input
JoystickInput, KeyboardInput,
# Detection/Perception
BoundingBox2D, BoundingBox3D, Detection, Detection3D,
SegmentationMask, TrackedObject, TrackingHeader,
Landmark, Landmark3D, LandmarkArray,
PointField, PlaneDetection, PlaneArray,
# Vision
CompressedImage, CameraInfo, RegionOfInterest, StereoInfo,
# Pool-backed domain types
Image, PointCloud, DepthImage,
)
Overview
Key Features:
- Full Rust parity — All 55+ Rust message types are available in Python
- Zero-copy IPC — POD types transfer via shared memory with no serialization overhead
- Cross-language compatible — Binary-compatible with Rust message types
- Nanosecond timestamps — All messages include
timestamp_nsfield - Typed Topic support — Use
Topic(CmdVel)for type-safe pub/sub
Geometry Messages
Pose2D
2D robot pose (position + orientation).
from horus import Pose2D
pose = Pose2D(x=1.0, y=2.0, theta=0.5)
pose.x = 1.5
pose.y = 2.5
pose.theta = 0.785
Fields: x (f64), y (f64), theta (f64), timestamp_ns (u64)
Pose3D
3D robot pose (position + quaternion orientation).
from horus import Pose3D
pose = Pose3D(x=1.0, y=2.0, z=3.0, qx=0.0, qy=0.0, qz=0.0, qw=1.0)
Fields: x, y, z (f64), qx, qy, qz, qw (f64), timestamp_ns (u64)
Twist
Full 6-DOF velocity (linear + angular) for 3D robots.
from horus import Twist
twist = Twist(linear_x=1.0, linear_y=0.0, linear_z=0.0,
angular_x=0.0, angular_y=0.0, angular_z=0.5)
Fields: linear_x, linear_y, linear_z, angular_x, angular_y, angular_z (f64), timestamp_ns (u64)
Vector3, Point3, Quaternion
Basic 3D geometric types.
from horus import Vector3, Point3, Quaternion
vec = Vector3(x=1.0, y=0.0, z=0.0)
point = Point3(x=1.0, y=2.0, z=3.0)
quat = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)
TransformStamped
3D transformation with timestamp.
from horus import TransformStamped
tf = TransformStamped(tx=1.0, ty=2.0, tz=0.0, rx=0.0, ry=0.0, rz=0.0, rw=1.0)
Fields: tx, ty, tz (f64 translation), rx, ry, rz, rw (f64 rotation quaternion), timestamp_ns (u64)
PoseStamped, PoseWithCovariance, TwistWithCovariance
Extended pose and velocity types with covariance support.
from horus import PoseStamped, PoseWithCovariance, TwistWithCovariance
ps = PoseStamped(x=1.0, y=2.0, z=3.0, qx=0.0, qy=0.0, qz=0.0, qw=1.0)
pwc = PoseWithCovariance(x=1.0, y=2.0, z=3.0)
twc = TwistWithCovariance(linear_x=0.5, angular_z=1.0)
Accel, AccelStamped
Linear and angular acceleration.
from horus import Accel, AccelStamped
accel = Accel(linear_x=9.81, linear_y=0.0, linear_z=0.0,
angular_x=0.1, angular_y=0.2, angular_z=0.3)
Control Messages
CmdVel
2D velocity command (linear + angular).
from horus import CmdVel
cmd = CmdVel(linear=1.0, angular=0.5)
Fields: linear (f32), angular (f32), timestamp_ns (u64)
MotorCommand
Individual motor control with mode selection.
from horus import MotorCommand
cmd = MotorCommand(motor_id=0, mode=0, target=1.0,
max_velocity=10.0, max_acceleration=5.0, enable=True)
Fields: motor_id (u8), mode (u8), target (f64), max_velocity (f64), max_acceleration (f64), feed_forward (f64), enable (bool), timestamp_ns (u64)
ServoCommand
Servo motor control.
from horus import ServoCommand
cmd = ServoCommand(servo_id=0, position=1.57, speed=0.5, enable=True)
Fields: servo_id (u8), position (f32), speed (f32), enable (bool), timestamp_ns (u64)
DifferentialDriveCommand, PidConfig
from horus import DifferentialDriveCommand, PidConfig
diff = DifferentialDriveCommand(left_velocity=1.0, right_velocity=-1.0)
pid = PidConfig(kp=1.0, ki=0.1, kd=0.01)
TrajectoryPoint, JointCommand
from horus import TrajectoryPoint, JointCommand
tp = TrajectoryPoint(position=[1.0, 2.0, 3.0], time_from_start=1.5)
Sensor Messages
Imu
Inertial Measurement Unit data (acceleration + gyroscope).
from horus import Imu
imu = Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.1)
Fields: accel_x, accel_y, accel_z (f64), gyro_x, gyro_y, gyro_z (f64), timestamp_ns (u64)
Odometry
Robot odometry (2D pose + velocity).
from horus import Odometry
odom = Odometry(x=1.0, y=2.0, theta=0.5, linear_velocity=0.5, angular_velocity=0.1)
Fields: x, y, theta (f64), linear_velocity, angular_velocity (f64), timestamp_ns (u64)
LaserScan
2D LIDAR scan data.
from horus import LaserScan
scan = LaserScan(angle_min=-3.14, angle_max=3.14,
range_min=0.1, range_max=10.0,
ranges=[1.0, 1.1, 1.2])
Fields: ranges (list[f32]), angle_min, angle_max, range_min, range_max, angle_increment (f32), timestamp_ns (u64)
JointState
Multi-joint state (up to 16 joints).
from horus import JointState
js = JointState(names=["j1", "j2"], positions=[1.0, 2.0],
velocities=[0.0, 0.0], efforts=[0.0, 0.0])
print(len(js)) # 2
print(js.names) # ["j1", "j2"]
print(js.positions) # [1.0, 2.0]
BatteryState, RangeSensor, NavSatFix
from horus import BatteryState, RangeSensor, NavSatFix
battery = BatteryState(voltage=12.6, percentage=85.0, current=2.5, temperature=25.0)
battery.is_low(20.0) # Check if below threshold
battery.is_critical() # Check if critical
range_s = RangeSensor(range=2.5, min_range=0.02, max_range=10.0, field_of_view=0.44)
gps = NavSatFix(latitude=37.7749, longitude=-122.4194, altitude=10.0)
gps.has_fix() # Check GPS fix status
MagneticField, Temperature, FluidPressure, Illuminance
from horus import MagneticField, Temperature, FluidPressure, Illuminance
mag = MagneticField(x=0.25, y=-0.1, z=0.45)
temp = Temperature(temperature=72.5, variance=0.1)
pressure = FluidPressure(pressure=101325.0, variance=10.0)
lux = Illuminance(illuminance=500.0, variance=5.0)
Clock, TimeReference
Time synchronization messages.
from horus import Clock, TimeReference
clk = Clock(clock_ns=1000000000, sim_speed=2.0, source=1)
clk.is_paused() # Check pause state
tref = TimeReference(time_ref_ns=1000000000, source="gps", offset_ns=-500)
corrected = tref.correct_timestamp(local_ns) # Apply offset correction
print(tref.source) # "gps"
Diagnostics Messages
from horus import (Heartbeat, EmergencyStop, DiagnosticStatus,
ResourceUsage, DiagnosticValue, DiagnosticReport,
NodeHeartbeat, SafetyStatus)
hb = Heartbeat(node_name="controller", node_id=1)
estop = EmergencyStop(engaged=True, reason="collision")
ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")
ru = ResourceUsage(cpu_percent=45.5, memory_bytes=1024000)
# Structured diagnostic reports
dr = DiagnosticReport(component="sensor_hub", level=1)
dr.add_value(DiagnosticValue(key="temperature", value="42"))
dr.add_value(DiagnosticValue.int("count", 100))
dr.add_value(DiagnosticValue.float("voltage", 24.1))
ss = SafetyStatus()
ss.estop_engaged # False (default)
ss.watchdog_ok # True (default)
Force/Haptics Messages
from horus import (WrenchStamped, ForceCommand, ContactInfo,
ImpedanceParameters, HapticFeedback)
wrench = WrenchStamped(fx=10.0, fy=0.0, fz=-9.81, tx=0.0, ty=0.5, tz=0.0)
force_cmd = ForceCommand(fx=0.0, fy=0.0, fz=-10.0, timeout=1.0)
contact = ContactInfo(state=1, contact_force=15.5)
impedance = ImpedanceParameters.compliant() # Low stiffness preset
impedance = ImpedanceParameters.stiff() # High stiffness preset
haptic = HapticFeedback(vibration_intensity=0.8, vibration_frequency=200.0, duration_seconds=0.5)
Navigation Messages
from horus import (NavGoal, GoalResult, PathPlan, Waypoint, NavPath,
VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap)
goal = NavGoal(x=10.0, y=5.0, theta=0.0, position_tolerance=0.1)
result = GoalResult(goal_id=1, status=3, progress=1.0)
# Path planning
plan = PathPlan(goal_x=10.0, goal_y=5.0)
plan.add_waypoint(0.0, 0.0, 0.0)
plan.add_waypoint(5.0, 0.0, 0.0)
# NavPath with waypoints
path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=5.0, y=5.0, theta=1.57))
print(path.waypoint_count) # 2
# Occupancy grid
grid = OccupancyGrid(width=100, height=100, resolution=0.05)
grid.set_occupancy(50, 50, 100) # Mark as occupied
grid.is_free(2.5, 2.5) # Check if cell is free
# Cost map
costmap = CostMap(grid=grid, inflation_radius=0.3)
Input Messages
from horus import JoystickInput, KeyboardInput
joy = JoystickInput(joystick_id=0, element_id=1, value=0.75, pressed=True)
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)
Detection/Perception Messages
from horus import (BoundingBox2D, BoundingBox3D, Detection, Detection3D,
SegmentationMask, TrackedObject, TrackingHeader,
Landmark, Landmark3D, LandmarkArray,
PointField, PlaneDetection, PlaneArray)
# 2D detection
det = Detection(class_name="person", confidence=0.95,
x=10.0, y=20.0, width=100.0, height=200.0)
# 3D detection
det3d = Detection3D(class_name="box", confidence=0.9,
cx=1.0, cy=2.0, cz=3.0, length=0.5, width=0.5, height=0.5)
# Object tracking
tracked = TrackedObject(track_id=42, class_id=1, confidence=0.9,
x=1.0, y=2.0, width=3.0, height=4.0)
# Landmarks (e.g., body pose keypoints)
lm = Landmark(x=100.0, y=200.0, visibility=0.95, index=5)
lm3d = Landmark3D(x=1.0, y=2.0, z=3.0, visibility=0.8, index=10)
Vision Messages
from horus import (CompressedImage, CameraInfo, RegionOfInterest, StereoInfo,
Image, PointCloud, DepthImage)
# Compressed image (serde-based)
img = CompressedImage(format="jpeg", data=jpeg_bytes, width=640, height=480)
# Camera calibration
cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)
print(cam.focal_lengths()) # (525.0, 525.0)
print(cam.principal_point()) # (320.0, 240.0)
# Region of interest
roi = RegionOfInterest(x=100, y=200, width=50, height=60)
print(roi.area()) # 3000
print(roi.contains(120, 220)) # True
# Stereo camera
stereo = StereoInfo(left_camera=cam, right_camera=cam, baseline=0.12)
# Pool-backed types (zero-copy)
image = Image(height=480, width=640, encoding=0) # RGB8
cloud = PointCloud(num_points=1000)
depth = DepthImage(height=480, width=640)
Cross-Language Compatibility
All 55+ Python message types are binary-compatible with Rust via zero-copy shared memory:
| Category | Python Classes | Count |
|---|---|---|
| Geometry | Pose2D, Pose3D, Twist, Vector3, Point3, Quaternion, TransformStamped, PoseStamped, PoseWithCovariance, TwistWithCovariance, Accel, AccelStamped | 12 |
| Control | CmdVel, MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig, TrajectoryPoint, JointCommand | 7 |
| Sensor | Imu, Odometry, LaserScan, JointState, BatteryState, RangeSensor, NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance, Clock, TimeReference | 13 |
| Diagnostics | Heartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage, DiagnosticValue, DiagnosticReport, NodeHeartbeat, SafetyStatus | 8 |
| Force/Haptics | WrenchStamped, ForceCommand, ContactInfo, ImpedanceParameters, HapticFeedback | 5 |
| Navigation | NavGoal, GoalResult, PathPlan, Waypoint, NavPath, VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap | 9 |
| Input | JoystickInput, KeyboardInput | 2 |
| Detection/Perception | BoundingBox2D, BoundingBox3D, Detection, Detection3D, SegmentationMask, TrackedObject, TrackingHeader, Landmark, Landmark3D, LandmarkArray, PointField, PlaneDetection, PlaneArray | 13 |
| Vision | CompressedImage, CameraInfo, RegionOfInterest, StereoInfo | 4 |
| Domain (pool) | Image, PointCloud, DepthImage | 3 |
Example — Python to Rust:
# Python sender
from horus import Topic, Twist
topic = Topic(Twist)
topic.send(Twist(linear_x=1.0, angular_z=0.5))
// Rust receiver
use horus::prelude::*;
let topic: Topic<Twist> = Topic::new("twist")?;
if let Some(twist) = topic.recv() {
println!("linear_x={}, angular_z={}", twist.linear[0], twist.angular[2]);
}
Usage Patterns
Robot Controller with Multiple Sensors
from horus import Node, Topic, CmdVel, LaserScan, Imu
scan_topic = Topic(LaserScan)
imu_topic = Topic(Imu)
cmd_topic = Topic(CmdVel)
def controller_tick(node):
scan = scan_topic.recv()
imu = imu_topic.recv()
if scan:
min_dist = min(scan.ranges) if scan.ranges else None
if min_dist and min_dist < 0.5:
cmd_topic.send(CmdVel(0.0, 0.0)) # Stop
else:
cmd_topic.send(CmdVel(linear=0.5, angular=0.0))
node = Node(name="controller", tick=controller_tick, rate=10)
See Also
- Python Bindings — Full Python API guide
- Python Perception Types — DetectionList, TrackedObject, COCOPose
- Custom Messages — Runtime and compiled message generation
- Rust Standard Messages — Rust API reference for all message types
- Message Types Overview — Conceptual message type documentation