Python Message Library

HORUS provides 55+ typed message classes for robotics applications in Python. All types are binary-compatible with their Rust counterparts and communicate via zero-copy shared memory.

Availability

All message types are available via the Rust bindings:

from horus import (
    # Geometry
    CmdVel, Pose2D, Pose3D, Twist, Vector3, Point3, Quaternion,
    TransformStamped, PoseStamped, PoseWithCovariance, TwistWithCovariance,
    Accel, AccelStamped,
    # Control
    MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig,
    TrajectoryPoint, JointCommand,
    # Sensor
    Imu, Odometry, LaserScan, JointState, BatteryState, RangeSensor,
    NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance,
    Clock, TimeReference,
    # Diagnostics
    Heartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage,
    DiagnosticValue, DiagnosticReport, NodeHeartbeat, SafetyStatus,
    # Force/Haptics
    WrenchStamped, ForceCommand, ContactInfo,
    ImpedanceParameters, HapticFeedback,
    # Navigation
    NavGoal, GoalResult, PathPlan, Waypoint, NavPath,
    VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap,
    # Input
    JoystickInput, KeyboardInput,
    # Detection/Perception
    BoundingBox2D, BoundingBox3D, Detection, Detection3D,
    SegmentationMask, TrackedObject, TrackingHeader,
    Landmark, Landmark3D, LandmarkArray,
    PointField, PlaneDetection, PlaneArray,
    # Vision
    CompressedImage, CameraInfo, RegionOfInterest, StereoInfo,
    # Pool-backed domain types
    Image, PointCloud, DepthImage,
)

Overview

Key Features:

  • Full Rust parity — All 55+ Rust message types are available in Python
  • Zero-copy IPC — POD types transfer via shared memory with no serialization overhead
  • Cross-language compatible — Binary-compatible with Rust message types
  • Nanosecond timestamps — All messages include timestamp_ns field
  • Typed Topic support — Use Topic(CmdVel) for type-safe pub/sub

Geometry Messages

Pose2D

2D robot pose (position + orientation).

from horus import Pose2D

pose = Pose2D(x=1.0, y=2.0, theta=0.5)
pose.x = 1.5
pose.y = 2.5
pose.theta = 0.785

Fields: x (f64), y (f64), theta (f64), timestamp_ns (u64)


Pose3D

3D robot pose (position + quaternion orientation).

from horus import Pose3D

pose = Pose3D(x=1.0, y=2.0, z=3.0, qx=0.0, qy=0.0, qz=0.0, qw=1.0)

Fields: x, y, z (f64), qx, qy, qz, qw (f64), timestamp_ns (u64)


Twist

Full 6-DOF velocity (linear + angular) for 3D robots.

from horus import Twist

twist = Twist(linear_x=1.0, linear_y=0.0, linear_z=0.0,
              angular_x=0.0, angular_y=0.0, angular_z=0.5)

Fields: linear_x, linear_y, linear_z, angular_x, angular_y, angular_z (f64), timestamp_ns (u64)


Vector3, Point3, Quaternion

Basic 3D geometric types.

from horus import Vector3, Point3, Quaternion

vec = Vector3(x=1.0, y=0.0, z=0.0)
point = Point3(x=1.0, y=2.0, z=3.0)
quat = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)

TransformStamped

3D transformation with timestamp.

from horus import TransformStamped

tf = TransformStamped(tx=1.0, ty=2.0, tz=0.0, rx=0.0, ry=0.0, rz=0.0, rw=1.0)

Fields: tx, ty, tz (f64 translation), rx, ry, rz, rw (f64 rotation quaternion), timestamp_ns (u64)


PoseStamped, PoseWithCovariance, TwistWithCovariance

Extended pose and velocity types with covariance support.

from horus import PoseStamped, PoseWithCovariance, TwistWithCovariance

ps = PoseStamped(x=1.0, y=2.0, z=3.0, qx=0.0, qy=0.0, qz=0.0, qw=1.0)
pwc = PoseWithCovariance(x=1.0, y=2.0, z=3.0)
twc = TwistWithCovariance(linear_x=0.5, angular_z=1.0)

Accel, AccelStamped

Linear and angular acceleration.

from horus import Accel, AccelStamped

accel = Accel(linear_x=9.81, linear_y=0.0, linear_z=0.0,
              angular_x=0.1, angular_y=0.2, angular_z=0.3)

Control Messages

CmdVel

2D velocity command (linear + angular).

from horus import CmdVel

cmd = CmdVel(linear=1.0, angular=0.5)

Fields: linear (f32), angular (f32), timestamp_ns (u64)


MotorCommand

Individual motor control with mode selection.

from horus import MotorCommand

cmd = MotorCommand(motor_id=0, mode=0, target=1.0,
                   max_velocity=10.0, max_acceleration=5.0, enable=True)

Fields: motor_id (u8), mode (u8), target (f64), max_velocity (f64), max_acceleration (f64), feed_forward (f64), enable (bool), timestamp_ns (u64)


ServoCommand

Servo motor control.

from horus import ServoCommand

cmd = ServoCommand(servo_id=0, position=1.57, speed=0.5, enable=True)

Fields: servo_id (u8), position (f32), speed (f32), enable (bool), timestamp_ns (u64)


DifferentialDriveCommand, PidConfig

from horus import DifferentialDriveCommand, PidConfig

diff = DifferentialDriveCommand(left_velocity=1.0, right_velocity=-1.0)
pid = PidConfig(kp=1.0, ki=0.1, kd=0.01)

TrajectoryPoint, JointCommand

from horus import TrajectoryPoint, JointCommand

tp = TrajectoryPoint(position=[1.0, 2.0, 3.0], time_from_start=1.5)

Sensor Messages

Imu

Inertial Measurement Unit data (acceleration + gyroscope).

from horus import Imu

imu = Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
          gyro_x=0.0, gyro_y=0.0, gyro_z=0.1)

Fields: accel_x, accel_y, accel_z (f64), gyro_x, gyro_y, gyro_z (f64), timestamp_ns (u64)


Odometry

Robot odometry (2D pose + velocity).

from horus import Odometry

odom = Odometry(x=1.0, y=2.0, theta=0.5, linear_velocity=0.5, angular_velocity=0.1)

Fields: x, y, theta (f64), linear_velocity, angular_velocity (f64), timestamp_ns (u64)


LaserScan

2D LIDAR scan data.

from horus import LaserScan

scan = LaserScan(angle_min=-3.14, angle_max=3.14,
                 range_min=0.1, range_max=10.0,
                 ranges=[1.0, 1.1, 1.2])

Fields: ranges (list[f32]), angle_min, angle_max, range_min, range_max, angle_increment (f32), timestamp_ns (u64)


JointState

Multi-joint state (up to 16 joints).

from horus import JointState

js = JointState(names=["j1", "j2"], positions=[1.0, 2.0],
                velocities=[0.0, 0.0], efforts=[0.0, 0.0])
print(len(js))        # 2
print(js.names)       # ["j1", "j2"]
print(js.positions)   # [1.0, 2.0]

BatteryState, RangeSensor, NavSatFix

from horus import BatteryState, RangeSensor, NavSatFix

battery = BatteryState(voltage=12.6, percentage=85.0, current=2.5, temperature=25.0)
battery.is_low(20.0)       # Check if below threshold
battery.is_critical()      # Check if critical

range_s = RangeSensor(range=2.5, min_range=0.02, max_range=10.0, field_of_view=0.44)

gps = NavSatFix(latitude=37.7749, longitude=-122.4194, altitude=10.0)
gps.has_fix()              # Check GPS fix status

MagneticField, Temperature, FluidPressure, Illuminance

from horus import MagneticField, Temperature, FluidPressure, Illuminance

mag = MagneticField(x=0.25, y=-0.1, z=0.45)
temp = Temperature(temperature=72.5, variance=0.1)
pressure = FluidPressure(pressure=101325.0, variance=10.0)
lux = Illuminance(illuminance=500.0, variance=5.0)

Clock, TimeReference

Time synchronization messages.

from horus import Clock, TimeReference

clk = Clock(clock_ns=1000000000, sim_speed=2.0, source=1)
clk.is_paused()  # Check pause state

tref = TimeReference(time_ref_ns=1000000000, source="gps", offset_ns=-500)
corrected = tref.correct_timestamp(local_ns)  # Apply offset correction
print(tref.source)  # "gps"

Diagnostics Messages

from horus import (Heartbeat, EmergencyStop, DiagnosticStatus,
                    ResourceUsage, DiagnosticValue, DiagnosticReport,
                    NodeHeartbeat, SafetyStatus)

hb = Heartbeat(node_name="controller", node_id=1)
estop = EmergencyStop(engaged=True, reason="collision")
ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")
ru = ResourceUsage(cpu_percent=45.5, memory_bytes=1024000)

# Structured diagnostic reports
dr = DiagnosticReport(component="sensor_hub", level=1)
dr.add_value(DiagnosticValue(key="temperature", value="42"))
dr.add_value(DiagnosticValue.int("count", 100))
dr.add_value(DiagnosticValue.float("voltage", 24.1))

ss = SafetyStatus()
ss.estop_engaged  # False (default)
ss.watchdog_ok    # True (default)

Force/Haptics Messages

from horus import (WrenchStamped, ForceCommand, ContactInfo,
                    ImpedanceParameters, HapticFeedback)

wrench = WrenchStamped(fx=10.0, fy=0.0, fz=-9.81, tx=0.0, ty=0.5, tz=0.0)
force_cmd = ForceCommand(fx=0.0, fy=0.0, fz=-10.0, timeout=1.0)
contact = ContactInfo(state=1, contact_force=15.5)

impedance = ImpedanceParameters.compliant()   # Low stiffness preset
impedance = ImpedanceParameters.stiff()       # High stiffness preset

haptic = HapticFeedback(vibration_intensity=0.8, vibration_frequency=200.0, duration_seconds=0.5)

from horus import (NavGoal, GoalResult, PathPlan, Waypoint, NavPath,
                    VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap)

goal = NavGoal(x=10.0, y=5.0, theta=0.0, position_tolerance=0.1)
result = GoalResult(goal_id=1, status=3, progress=1.0)

# Path planning
plan = PathPlan(goal_x=10.0, goal_y=5.0)
plan.add_waypoint(0.0, 0.0, 0.0)
plan.add_waypoint(5.0, 0.0, 0.0)

# NavPath with waypoints
path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=5.0, y=5.0, theta=1.57))
print(path.waypoint_count)  # 2

# Occupancy grid
grid = OccupancyGrid(width=100, height=100, resolution=0.05)
grid.set_occupancy(50, 50, 100)   # Mark as occupied
grid.is_free(2.5, 2.5)            # Check if cell is free

# Cost map
costmap = CostMap(grid=grid, inflation_radius=0.3)

Input Messages

from horus import JoystickInput, KeyboardInput

joy = JoystickInput(joystick_id=0, element_id=1, value=0.75, pressed=True)
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)

Detection/Perception Messages

from horus import (BoundingBox2D, BoundingBox3D, Detection, Detection3D,
                    SegmentationMask, TrackedObject, TrackingHeader,
                    Landmark, Landmark3D, LandmarkArray,
                    PointField, PlaneDetection, PlaneArray)

# 2D detection
det = Detection(class_name="person", confidence=0.95,
                x=10.0, y=20.0, width=100.0, height=200.0)

# 3D detection
det3d = Detection3D(class_name="box", confidence=0.9,
                     cx=1.0, cy=2.0, cz=3.0, length=0.5, width=0.5, height=0.5)

# Object tracking
tracked = TrackedObject(track_id=42, class_id=1, confidence=0.9,
                         x=1.0, y=2.0, width=3.0, height=4.0)

# Landmarks (e.g., body pose keypoints)
lm = Landmark(x=100.0, y=200.0, visibility=0.95, index=5)
lm3d = Landmark3D(x=1.0, y=2.0, z=3.0, visibility=0.8, index=10)

Vision Messages

from horus import (CompressedImage, CameraInfo, RegionOfInterest, StereoInfo,
                    Image, PointCloud, DepthImage)

# Compressed image (serde-based)
img = CompressedImage(format="jpeg", data=jpeg_bytes, width=640, height=480)

# Camera calibration
cam = CameraInfo(width=640, height=480, fx=525.0, fy=525.0, cx=320.0, cy=240.0)
print(cam.focal_lengths())     # (525.0, 525.0)
print(cam.principal_point())   # (320.0, 240.0)

# Region of interest
roi = RegionOfInterest(x=100, y=200, width=50, height=60)
print(roi.area())              # 3000
print(roi.contains(120, 220))  # True

# Stereo camera
stereo = StereoInfo(left_camera=cam, right_camera=cam, baseline=0.12)

# Pool-backed types (zero-copy)
image = Image(height=480, width=640, encoding=0)  # RGB8
cloud = PointCloud(num_points=1000)
depth = DepthImage(height=480, width=640)

Cross-Language Compatibility

All 55+ Python message types are binary-compatible with Rust via zero-copy shared memory:

CategoryPython ClassesCount
GeometryPose2D, Pose3D, Twist, Vector3, Point3, Quaternion, TransformStamped, PoseStamped, PoseWithCovariance, TwistWithCovariance, Accel, AccelStamped12
ControlCmdVel, MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig, TrajectoryPoint, JointCommand7
SensorImu, Odometry, LaserScan, JointState, BatteryState, RangeSensor, NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance, Clock, TimeReference13
DiagnosticsHeartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage, DiagnosticValue, DiagnosticReport, NodeHeartbeat, SafetyStatus8
Force/HapticsWrenchStamped, ForceCommand, ContactInfo, ImpedanceParameters, HapticFeedback5
NavigationNavGoal, GoalResult, PathPlan, Waypoint, NavPath, VelocityObstacle, VelocityObstacles, OccupancyGrid, CostMap9
InputJoystickInput, KeyboardInput2
Detection/PerceptionBoundingBox2D, BoundingBox3D, Detection, Detection3D, SegmentationMask, TrackedObject, TrackingHeader, Landmark, Landmark3D, LandmarkArray, PointField, PlaneDetection, PlaneArray13
VisionCompressedImage, CameraInfo, RegionOfInterest, StereoInfo4
Domain (pool)Image, PointCloud, DepthImage3

Example — Python to Rust:

# Python sender
from horus import Topic, Twist
topic = Topic(Twist)
topic.send(Twist(linear_x=1.0, angular_z=0.5))
// Rust receiver
use horus::prelude::*;
let topic: Topic<Twist> = Topic::new("twist")?;
if let Some(twist) = topic.recv() {
    println!("linear_x={}, angular_z={}", twist.linear[0], twist.angular[2]);
}

Usage Patterns

Robot Controller with Multiple Sensors

from horus import Node, Topic, CmdVel, LaserScan, Imu

scan_topic = Topic(LaserScan)
imu_topic = Topic(Imu)
cmd_topic = Topic(CmdVel)

def controller_tick(node):
    scan = scan_topic.recv()
    imu = imu_topic.recv()
    if scan:
        min_dist = min(scan.ranges) if scan.ranges else None
        if min_dist and min_dist < 0.5:
            cmd_topic.send(CmdVel(0.0, 0.0))  # Stop
        else:
            cmd_topic.send(CmdVel(linear=0.5, angular=0.0))

node = Node(name="controller", tick=controller_tick, rate=10)

See Also