Custom Messages (horus.msggen)

The horus.msggen module lets you define custom typed messages directly in Python. Two approaches are available:

ApproachBuild StepLatencyBest For
Runtime MessagesNone~20-40μsPrototyping, quick iteration
Compiled Messagesmaturin develop~3-5μsProduction, high-frequency

Runtime Messages (No Build Step)

Create custom messages instantly without any compilation. Uses Python's struct module for fixed-layout binary serialization.

Basic Usage

from horus.msggen import define_message

# Define a custom message type
RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
    ('is_active', 'bool'),
    ('timestamp', 'u64'),
])

# Create instances
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)

# Access fields
print(status.battery_level)  # 85.0
status.error_code = 5

# Serialize for IPC
raw_bytes = status.to_bytes()  # 17 bytes

# Reconstruct from bytes
status2 = RobotStatus.from_bytes(raw_bytes)

Supported Types

Type StringSizeDescription
f32 / float324 bytes32-bit float
f64 / float648 bytes64-bit float
i81 byteSigned 8-bit int
i162 bytesSigned 16-bit int
i324 bytesSigned 32-bit int
i648 bytesSigned 64-bit int
u81 byteUnsigned 8-bit int
u162 bytesUnsigned 16-bit int
u324 bytesUnsigned 32-bit int
u648 bytesUnsigned 64-bit int
bool1 byteBoolean

NumPy Messages (Better Performance)

If NumPy is available, use define_numpy_message for better performance:

from horus.msggen import define_numpy_message
import numpy as np

# NumPy-based message (uses structured arrays internally)
SensorData = define_numpy_message('SensorData', 'sensor.data', [
    ('x', np.float32),
    ('y', np.float32),
    ('z', np.float32),
    ('temperature', np.float32),
    ('timestamp', np.uint64),
])

# Create instance
data = SensorData(x=1.0, y=2.0, z=3.0, temperature=25.5, timestamp=0)

# Get underlying numpy structured array
arr = data.to_numpy()

# Zero-copy bytes access
raw = data.to_bytes()

With Topic (IPC)

Runtime messages work with Topic for inter-process communication. Use to_bytes() / from_bytes() for serialization over generic topics:

from horus import Topic
from horus.msggen import define_message

# Define message
RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
])

# Publisher — use generic topic with manual serialization
pub_topic = Topic("robot.status")
status = RobotStatus(battery_level=85.0, error_code=0)
pub_topic.send(status.to_bytes())

# Subscriber (different process)
sub_topic = Topic("robot.status")
raw = sub_topic.recv()
if raw:
    received = RobotStatus.from_bytes(raw)
    print(received.battery_level)  # 85.0

Or use the Node convenience API (handles serialization automatically):

import horus
from horus.msggen import define_message

RobotStatus = define_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
])

def publisher_tick(node):
    status = RobotStatus(battery_level=85.0, error_code=0)
    node.send("robot.status", status.to_bytes())

def subscriber_tick(node):
    if node.has_msg("robot.status"):
        raw = node.get("robot.status")
        status = RobotStatus.from_bytes(raw)
        print(status.battery_level)  # 85.0

pub = horus.Node("publisher", pubs="robot.status", tick=publisher_tick)
sub = horus.Node("subscriber", subs="robot.status", tick=subscriber_tick)
horus.run(pub, sub, duration=3)

Compiled Messages (Production)

For maximum performance (~3-5μs), compile your messages to Rust. This generates PyO3 bindings with the same zero-copy performance as built-in types.

Step 1: Define Messages

from horus.msggen import register_message

# Register one or more messages
register_message('RobotStatus', 'robot.status', [
    ('battery_level', 'f32'),
    ('error_code', 'i32'),
    ('is_active', 'bool'),
    ('timestamp', 'u64'),
])

register_message('SensorReading', 'sensor.reading', [
    ('x', 'f64'),
    ('y', 'f64'),
    ('z', 'f64'),
])

Step 2: Build

from horus.msggen import build_messages

# Generate Rust code and rebuild
build_messages()  # Runs: maturin develop --release

This generates Rust code in horus_py/src/custom_messages/ and rebuilds the module.

Step 3: Use

After building, your messages are available directly from horus:

from horus import RobotStatus, SensorReading, Topic

# Create typed topic
topic = Topic(RobotStatus)

# Send
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
topic.send(status)

# Receive (typed!)
received = topic.recv()
print(received.battery_level)

For larger projects, define messages in YAML:

# messages.yaml
messages:
  - name: RobotStatus
    topic: robot.status
    fields:
      - name: battery_level
        type: f32
      - name: error_code
        type: i32
      - name: is_active
        type: bool

  - name: SensorReading
    topic: sensor.reading
    fields:
      - name: x
        type: f64
      - name: y
        type: f64
      - name: z
        type: f64
from horus.msggen import generate_messages_from_yaml, build_messages

generate_messages_from_yaml('messages.yaml')
build_messages()

Rebuild Detection

The builder tracks message definitions via hash. It won't rebuild unless messages change:

from horus.msggen import check_needs_rebuild, build_messages

if check_needs_rebuild():
    build_messages()
else:
    print("Messages are up to date")

Force rebuild with:

build_messages(force=True)

Performance Comparison

ApproachLatencyThroughputUse Case
Built-in (Rust)~3μs300K msgs/secCmdVel, Pose2D, etc.
Compiled Custom~3-5μs200K msgs/secProduction custom types
Runtime~20-40μs25K msgs/secPrototyping
Runtime (NumPy)~15-30μs35K msgs/secNumPy integration
Pickle~50-100μs10K msgs/secLegacy/dynamic types

Recommendation: Start with runtime messages for fast iteration, then compile for production.


API Reference

define_message

def define_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, str]]
) -> Type[RuntimeMessage]

Create a runtime message class.

Parameters:

  • name: Class name (e.g., "RobotStatus")
  • topic: Topic name (e.g., "robot.status")
  • fields: List of (field_name, type_string) tuples

Returns: New message class

define_numpy_message

def define_numpy_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, Any]]
) -> Type[NumpyMessage]

Create a NumPy-based message class.

Parameters:

  • name: Class name
  • topic: Topic name
  • fields: List of (field_name, numpy_dtype) tuples

Returns: New NumPy message class

register_message

def register_message(
    name: str,
    topic: str,
    fields: List[Tuple[str, str]]
) -> None

Register a message for compiled generation.

build_messages

def build_messages(
    force: bool = False,
    verbose: bool = True
) -> bool

Build all registered messages.

Parameters:

  • force: Rebuild even if unchanged
  • verbose: Print progress

Returns: True if successful

check_needs_rebuild

def check_needs_rebuild() -> bool

Check if registered messages differ from last build.


Complete Example

#!/usr/bin/env python3
"""Custom message example with runtime messages."""

import horus
from horus.msggen import define_message

# Define custom sensor message
MySensor = define_message('MySensor', 'my.sensor', [
    ('distance', 'f32'),
    ('angle', 'f32'),
    ('confidence', 'f32'),
    ('object_id', 'u32'),
])

def sensor_tick(node):
    """Publish sensor readings."""
    reading = MySensor(
        distance=2.5,
        angle=0.785,
        confidence=0.95,
        object_id=42
    )
    node.send("my.sensor", reading.to_bytes())

def processor_tick(node):
    """Process sensor readings."""
    if node.has_msg("my.sensor"):
        raw = node.get("my.sensor")
        reading = MySensor.from_bytes(raw)
        print(f"Object {reading.object_id}: {reading.distance}m at {reading.angle}rad")

# Create nodes
sensor = horus.Node("sensor", pubs="my.sensor", tick=sensor_tick, rate=10)
processor = horus.Node("processor", subs="my.sensor", tick=processor_tick)

# Run
horus.run(sensor, processor, duration=3)

When to Use Each Approach

Use Runtime Messages When:

  • Prototyping new message types
  • Message schema changes frequently
  • You don't want to wait for compilation
  • Performance requirements are moderate (<50Hz)

Use Compiled Messages When:

  • Deploying to production
  • High-frequency data (>100Hz)
  • Cross-language compatibility required
  • Type safety is critical

Use Built-in Messages When:

  • Standard robotics types (CmdVel, Pose2D, LaserScan)
  • Maximum performance needed
  • Compatibility with other HORUS systems