Custom Messages (horus.msggen)
The horus.msggen module lets you define custom typed messages directly in Python. Two approaches are available:
| Approach | Build Step | Latency | Best For |
|---|---|---|---|
| Runtime Messages | None | ~20-40μs | Prototyping, quick iteration |
| Compiled Messages | maturin develop | ~3-5μs | Production, high-frequency |
Runtime Messages (No Build Step)
Create custom messages instantly without any compilation. Uses Python's struct module for fixed-layout binary serialization.
Basic Usage
# simplified
from horus.msggen import define_message
# Define a custom message type
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
('is_active', 'bool'),
('timestamp', 'u64'),
])
# Create instances
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
# Access fields
print(status.battery_level) # 85.0
status.error_code = 5
# Serialize for IPC
raw_bytes = status.to_bytes() # 17 bytes
# Reconstruct from bytes
status2 = RobotStatus.from_bytes(raw_bytes)
Supported Types
| Type String | Size | Description |
|---|---|---|
f32 / float32 | 4 bytes | 32-bit float |
f64 / float64 | 8 bytes | 64-bit float |
i8 | 1 byte | Signed 8-bit int |
i16 | 2 bytes | Signed 16-bit int |
i32 | 4 bytes | Signed 32-bit int |
i64 | 8 bytes | Signed 64-bit int |
u8 | 1 byte | Unsigned 8-bit int |
u16 | 2 bytes | Unsigned 16-bit int |
u32 | 4 bytes | Unsigned 32-bit int |
u64 | 8 bytes | Unsigned 64-bit int |
bool | 1 byte | Boolean |
NumPy Messages (Better Performance)
If NumPy is available, use define_numpy_message for better performance:
# simplified
from horus.msggen import define_numpy_message
import numpy as np
# NumPy-based message (uses structured arrays internally)
SensorData = define_numpy_message('SensorData', 'sensor.data', [
('x', np.float32),
('y', np.float32),
('z', np.float32),
('temperature', np.float32),
('timestamp', np.uint64),
])
# Create instance
data = SensorData(x=1.0, y=2.0, z=3.0, temperature=25.5, timestamp=0)
# Get underlying numpy structured array
arr = data.to_numpy()
# Zero-copy bytes access
raw = data.to_bytes()
With Topic (IPC)
Runtime messages work with Topic for inter-process communication. Use to_bytes() / from_bytes() for serialization over generic topics:
# simplified
from horus import Topic
from horus.msggen import define_message
# Define message
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
])
# Publisher — use generic topic with manual serialization
pub_topic = Topic("robot.status")
status = RobotStatus(battery_level=85.0, error_code=0)
pub_topic.send(status.to_bytes())
# Subscriber (different process)
sub_topic = Topic("robot.status")
raw = sub_topic.recv()
if raw:
received = RobotStatus.from_bytes(raw)
print(received.battery_level) # 85.0
Or use the Node convenience API (handles serialization automatically):
# simplified
import horus
from horus.msggen import define_message
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
])
def publisher_tick(node):
status = RobotStatus(battery_level=85.0, error_code=0)
node.send("robot.status", status.to_bytes())
def subscriber_tick(node):
if node.has_msg("robot.status"):
raw = node.recv("robot.status")
status = RobotStatus.from_bytes(raw)
print(status.battery_level) # 85.0
pub = horus.Node("publisher", pubs="robot.status", tick=publisher_tick)
sub = horus.Node("subscriber", subs="robot.status", tick=subscriber_tick)
horus.run(pub, sub, duration=3)
Compiled Messages (Production)
For maximum performance (~3-5μs), compile your messages to Rust. This generates PyO3 bindings with the same zero-copy performance as built-in types.
Step 1: Define Messages
# simplified
from horus.msggen import register_message
# Register one or more messages
register_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
('is_active', 'bool'),
('timestamp', 'u64'),
])
register_message('SensorReading', 'sensor.reading', [
('x', 'f64'),
('y', 'f64'),
('z', 'f64'),
])
Step 2: Build
# simplified
from horus.msggen import build_messages
# Generate Rust code and rebuild
build_messages() # Runs: maturin develop --release
This generates Rust code in horus_py/src/custom_messages/ and rebuilds the module.
Step 3: Use
After building, your messages are available directly from horus:
# simplified
from horus import RobotStatus, SensorReading, Topic
# Create typed topic
topic = Topic(RobotStatus)
# Send
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
topic.send(status)
# Receive (typed!)
received = topic.recv()
print(received.battery_level)
YAML Schema (Recommended for Teams)
For larger projects, define messages in YAML:
# messages.yaml
messages:
- name: RobotStatus
topic: robot.status
fields:
- name: battery_level
type: f32
- name: error_code
type: i32
- name: is_active
type: bool
- name: SensorReading
topic: sensor.reading
fields:
- name: x
type: f64
- name: y
type: f64
- name: z
type: f64
# simplified
from horus.msggen import generate_messages_from_yaml, build_messages
generate_messages_from_yaml('messages.yaml')
build_messages()
Rebuild Detection
The builder tracks message definitions via hash. It won't rebuild unless messages change:
# simplified
from horus.msggen import check_needs_rebuild, build_messages
if check_needs_rebuild():
build_messages()
else:
print("Messages are up to date")
Force rebuild with:
# simplified
build_messages(force=True)
Performance Comparison
| Approach | Latency | Throughput | Use Case |
|---|---|---|---|
| Built-in (Rust) | ~3μs | 300K msgs/sec | CmdVel, Pose2D, etc. |
| Compiled Custom | ~3-5μs | 200K msgs/sec | Production custom types |
| Runtime | ~20-40μs | 25K msgs/sec | Prototyping |
| Runtime (NumPy) | ~15-30μs | 35K msgs/sec | NumPy integration |
| Pickle | ~50-100μs | 10K msgs/sec | Legacy/dynamic types |
Recommendation: Start with runtime messages for fast iteration, then compile for production.
API Reference
define_message
# simplified
def define_message(
name: str,
topic: str,
fields: List[Tuple[str, str]]
) -> Type[RuntimeMessage]
Create a runtime message class.
Parameters:
name: Class name (e.g.,"RobotStatus")topic: Topic name (e.g.,"robot.status")fields: List of(field_name, type_string)tuples
Returns: New message class
define_numpy_message
# simplified
def define_numpy_message(
name: str,
topic: str,
fields: List[Tuple[str, Any]]
) -> Type[NumpyMessage]
Create a NumPy-based message class.
Parameters:
name: Class nametopic: Topic namefields: List of(field_name, numpy_dtype)tuples
Returns: New NumPy message class
register_message
# simplified
def register_message(
name: str,
topic: str,
fields: List[Tuple[str, str]]
) -> None
Register a message for compiled generation.
build_messages
# simplified
def build_messages(
force: bool = False,
verbose: bool = True
) -> bool
Build all registered messages.
Parameters:
force: Rebuild even if unchangedverbose: Print progress
Returns: True if successful
check_needs_rebuild
# simplified
def check_needs_rebuild() -> bool
Check if registered messages differ from last build.
Complete Example
# simplified
#!/usr/bin/env python3
"""Custom message example with runtime messages."""
import horus
from horus.msggen import define_message
# Define custom sensor message
MySensor = define_message('MySensor', 'my.sensor', [
('distance', 'f32'),
('angle', 'f32'),
('confidence', 'f32'),
('object_id', 'u32'),
])
def sensor_tick(node):
"""Publish sensor readings."""
reading = MySensor(
distance=2.5,
angle=0.785,
confidence=0.95,
object_id=42
)
node.send("my.sensor", reading.to_bytes())
def processor_tick(node):
"""Process sensor readings."""
if node.has_msg("my.sensor"):
raw = node.recv("my.sensor")
reading = MySensor.from_bytes(raw)
print(f"Object {reading.object_id}: {reading.distance}m at {reading.angle}rad")
# Create nodes
sensor = horus.Node("sensor", pubs="my.sensor", tick=sensor_tick, rate=10)
processor = horus.Node("processor", subs="my.sensor", tick=processor_tick)
# Run
horus.run(sensor, processor, duration=3)
When to Use Each Approach
Use Runtime Messages When:
- Prototyping new message types
- Message schema changes frequently
- You don't want to wait for compilation
- Performance requirements are moderate (<50Hz)
Use Compiled Messages When:
- Deploying to production
- High-frequency data (>100Hz)
- Cross-language compatibility required
- Type safety is critical
Use Built-in Messages When:
- Standard robotics types (CmdVel, Pose2D, LaserScan)
- Maximum performance needed
- Compatibility with other HORUS systems
Design Decisions
Why two approaches (runtime vs compiled) instead of one? Development speed and runtime speed are inversely correlated. Runtime messages need no build step and let you iterate on message schemas in seconds -- ideal for prototyping. Compiled messages take longer to build but achieve near-native performance -- essential for production. The two-path design matches the typical robotics workflow: prototype fast, then optimize for deployment.
Why struct module for runtime messages instead of pickle? struct produces a fixed-size binary layout that is deterministic and cross-process compatible. Pickle produces variable-length output, is version-sensitive, and has security implications (arbitrary code execution on deserialization). The tradeoff is that struct only supports primitive types (no nested objects), but this matches the fixed-size Pod constraint of zero-copy IPC.
Why YAML schema support? Teams need a shared source of truth for message definitions that is language-agnostic and version-controllable. YAML schemas serve as the canonical definition that generates both Python runtime classes and Rust compiled types, ensuring cross-language compatibility.
Why generate Rust code for compiled messages instead of a Python-only binary format? Generated Rust code produces the same Pod types used by the standard library, so compiled custom messages get identical zero-copy IPC performance and are binary-compatible with Rust nodes. A Python-only approach would sacrifice cross-language support.
See Also
- Python Bindings — Core Python API
- Custom Messages Tutorial — Step-by-step guide
- Message Types — How messages work in HORUS
- Python Message Library — Standard message types