Custom Messages (horus.msggen)

The horus.msggen module lets you define custom typed messages directly in Python. Two approaches are available:

ApproachBuild StepLatencyBest For
Runtime MessagesNone~20-40μsPrototyping, quick iteration
Compiled Messagesmaturin develop~3-5μsProduction, high-frequency

Runtime Messages (No Build Step)

Create custom messages instantly without any compilation. Uses Python's struct module for fixed-layout binary serialization.

Basic Usage

# simplified
from horus.msggen import define_message

# Define a custom message type
RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
    ('is_active', 'bool'),
    ('timestamp', 'u64'),
])

# Create instances
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)

# Access fields
print(status.battery_level)  # 85.0
status.error_code = 5

# Serialize for IPC
raw_bytes = status.to_bytes()  # 17 bytes

# Reconstruct from bytes
status2 = RobotStatus.from_bytes(raw_bytes)

Supported Types

Type StringSizeDescription
f32 / float324 bytes32-bit float
f64 / float648 bytes64-bit float
i81 byteSigned 8-bit int
i162 bytesSigned 16-bit int
i324 bytesSigned 32-bit int
i648 bytesSigned 64-bit int
u81 byteUnsigned 8-bit int
u162 bytesUnsigned 16-bit int
u324 bytesUnsigned 32-bit int
u648 bytesUnsigned 64-bit int
bool1 byteBoolean

NumPy Messages (Better Performance)

If NumPy is available, use define_numpy_message for better performance:

# simplified
from horus.msggen import define_numpy_message
import numpy as np

# NumPy-based message (uses structured arrays internally)
SensorData = define_numpy_message('SensorData', 'sensor.data', [
    ('x', np.float32),
    ('y', np.float32),
    ('z', np.float32),
    ('temperature', np.float32),
    ('timestamp', np.uint64),
])

# Create instance
data = SensorData(x=1.0, y=2.0, z=3.0, temperature=25.5, timestamp=0)

# Get underlying numpy structured array
arr = data.to_numpy()

# Zero-copy bytes access
raw = data.to_bytes()

With Topic (IPC)

Runtime messages work with Topic for inter-process communication. Use to_bytes() / from_bytes() for serialization over generic topics:

# simplified
from horus import Topic
from horus.msggen import define_message

# Define message
RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
])

# Publisher — use generic topic with manual serialization
pub_topic = Topic("robot.status")
status = RobotStatus(battery_level=85.0, error_code=0)
pub_topic.send(status.to_bytes())

# Subscriber (different process)
sub_topic = Topic("robot.status")
raw = sub_topic.recv()
if raw:
    received = RobotStatus.from_bytes(raw)
    print(received.battery_level)  # 85.0

Or use the Node convenience API (handles serialization automatically):

# simplified
import horus
from horus.msggen import define_message

RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
])

def publisher_tick(node):
    status = RobotStatus(battery_level=85.0, error_code=0)
    node.send("robot.status", status.to_bytes())

def subscriber_tick(node):
    if node.has_msg("robot.status"):
        raw = node.recv("robot.status")
        status = RobotStatus.from_bytes(raw)
        print(status.battery_level)  # 85.0

pub = horus.Node("publisher", pubs="robot.status", tick=publisher_tick)
sub = horus.Node("subscriber", subs="robot.status", tick=subscriber_tick)
horus.run(pub, sub, duration=3)

Compiled Messages (Production)

For maximum performance (~3-5μs), compile your messages to Rust. This generates PyO3 bindings with the same zero-copy performance as built-in types.

Step 1: Define Messages

# simplified
from horus.msggen import register_message

# Register one or more messages
register_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
    ('is_active', 'bool'),
    ('timestamp', 'u64'),
])

register_message('SensorReading', 'sensor.reading', [
    ('x', 'f64'),
    ('y', 'f64'),
    ('z', 'f64'),
])

Step 2: Build

# simplified
from horus.msggen import build_messages

# Generate Rust code and rebuild
build_messages()  # Runs: maturin develop --release

This generates Rust code in horus_py/src/custom_messages/ and rebuilds the module.

Step 3: Use

After building, your messages are available directly from horus:

# simplified
from horus import RobotStatus, SensorReading, Topic

# Create typed topic
topic = Topic(RobotStatus)

# Send
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
topic.send(status)

# Receive (typed!)
received = topic.recv()
print(received.battery_level)

For larger projects, define messages in YAML:

# messages.yaml
messages:
  - name: RobotStatus
    topic: robot.status
    fields:
      - name: battery_level
        type: f32
      - name: error_code
        type: i32
      - name: is_active
        type: bool

  - name: SensorReading
    topic: sensor.reading
    fields:
      - name: x
        type: f64
      - name: y
        type: f64
      - name: z
        type: f64
# simplified
from horus.msggen import generate_messages_from_yaml, build_messages

generate_messages_from_yaml('messages.yaml')
build_messages()

Rebuild Detection

The builder tracks message definitions via hash. It won't rebuild unless messages change:

# simplified
from horus.msggen import check_needs_rebuild, build_messages

if check_needs_rebuild():
    build_messages()
else:
    print("Messages are up to date")

Force rebuild with:

# simplified
build_messages(force=True)

Performance Comparison

ApproachLatencyThroughputUse Case
Built-in (Rust)~3μs300K msgs/secCmdVel, Pose2D, etc.
Compiled Custom~3-5μs200K msgs/secProduction custom types
Runtime~20-40μs25K msgs/secPrototyping
Runtime (NumPy)~15-30μs35K msgs/secNumPy integration
Pickle~50-100μs10K msgs/secLegacy/dynamic types

Recommendation: Start with runtime messages for fast iteration, then compile for production.


API Reference

define_message

# simplified
def define_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, str]]
) -> Type[RuntimeMessage]

Create a runtime message class.

Parameters:

  • name: Class name (e.g., "RobotStatus")
  • topic: Topic name (e.g., "robot.status")
  • fields: List of (field_name, type_string) tuples

Returns: New message class

define_numpy_message

# simplified
def define_numpy_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, Any]]
) -> Type[NumpyMessage]

Create a NumPy-based message class.

Parameters:

  • name: Class name
  • topic: Topic name
  • fields: List of (field_name, numpy_dtype) tuples

Returns: New NumPy message class

register_message

# simplified
def register_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, str]]
) -> None

Register a message for compiled generation.

build_messages

# simplified
def build_messages(
    force: bool = False,
    verbose: bool = True
) -> bool

Build all registered messages.

Parameters:

  • force: Rebuild even if unchanged
  • verbose: Print progress

Returns: True if successful

check_needs_rebuild

# simplified
def check_needs_rebuild() -> bool

Check if registered messages differ from last build.


Complete Example

# simplified
#!/usr/bin/env python3
"""Custom message example with runtime messages."""

import horus
from horus.msggen import define_message

# Define custom sensor message
MySensor = define_message('MySensor', 'my.sensor', [
    ('distance', 'f32'),
    ('angle', 'f32'),
    ('confidence', 'f32'),
    ('object_id', 'u32'),
])

def sensor_tick(node):
    """Publish sensor readings."""
    reading = MySensor(
        distance=2.5,
        angle=0.785,
        confidence=0.95,
        object_id=42
    )
    node.send("my.sensor", reading.to_bytes())

def processor_tick(node):
    """Process sensor readings."""
    if node.has_msg("my.sensor"):
        raw = node.recv("my.sensor")
        reading = MySensor.from_bytes(raw)
        print(f"Object {reading.object_id}: {reading.distance}m at {reading.angle}rad")

# Create nodes
sensor = horus.Node("sensor", pubs="my.sensor", tick=sensor_tick, rate=10)
processor = horus.Node("processor", subs="my.sensor", tick=processor_tick)

# Run
horus.run(sensor, processor, duration=3)

When to Use Each Approach

Use Runtime Messages When:

  • Prototyping new message types
  • Message schema changes frequently
  • You don't want to wait for compilation
  • Performance requirements are moderate (<50Hz)

Use Compiled Messages When:

  • Deploying to production
  • High-frequency data (>100Hz)
  • Cross-language compatibility required
  • Type safety is critical

Use Built-in Messages When:

  • Standard robotics types (CmdVel, Pose2D, LaserScan)
  • Maximum performance needed
  • Compatibility with other HORUS systems

Design Decisions

Why two approaches (runtime vs compiled) instead of one? Development speed and runtime speed are inversely correlated. Runtime messages need no build step and let you iterate on message schemas in seconds -- ideal for prototyping. Compiled messages take longer to build but achieve near-native performance -- essential for production. The two-path design matches the typical robotics workflow: prototype fast, then optimize for deployment.

Why struct module for runtime messages instead of pickle? struct produces a fixed-size binary layout that is deterministic and cross-process compatible. Pickle produces variable-length output, is version-sensitive, and has security implications (arbitrary code execution on deserialization). The tradeoff is that struct only supports primitive types (no nested objects), but this matches the fixed-size Pod constraint of zero-copy IPC.

Why YAML schema support? Teams need a shared source of truth for message definitions that is language-agnostic and version-controllable. YAML schemas serve as the canonical definition that generates both Python runtime classes and Rust compiled types, ensuring cross-language compatibility.

Why generate Rust code for compiled messages instead of a Python-only binary format? Generated Rust code produces the same Pod types used by the standard library, so compiled custom messages get identical zero-copy IPC performance and are binary-compatible with Rust nodes. A Python-only approach would sacrifice cross-language support.


See Also