Custom Messages (horus.msggen)
The horus.msggen module lets you define custom typed messages directly in Python. Two approaches are available:
| Approach | Build Step | Latency | Best For |
|---|---|---|---|
| Runtime Messages | None | ~20-40μs | Prototyping, quick iteration |
| Compiled Messages | maturin develop | ~3-5μs | Production, high-frequency |
Runtime Messages (No Build Step)
Create custom messages instantly without any compilation. Uses Python's struct module for fixed-layout binary serialization.
Basic Usage
from horus.msggen import define_message
# Define a custom message type
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
('is_active', 'bool'),
('timestamp', 'u64'),
])
# Create instances
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
# Access fields
print(status.battery_level) # 85.0
status.error_code = 5
# Serialize for IPC
raw_bytes = status.to_bytes() # 17 bytes
# Reconstruct from bytes
status2 = RobotStatus.from_bytes(raw_bytes)
Supported Types
| Type String | Size | Description |
|---|---|---|
f32 / float32 | 4 bytes | 32-bit float |
f64 / float64 | 8 bytes | 64-bit float |
i8 | 1 byte | Signed 8-bit int |
i16 | 2 bytes | Signed 16-bit int |
i32 | 4 bytes | Signed 32-bit int |
i64 | 8 bytes | Signed 64-bit int |
u8 | 1 byte | Unsigned 8-bit int |
u16 | 2 bytes | Unsigned 16-bit int |
u32 | 4 bytes | Unsigned 32-bit int |
u64 | 8 bytes | Unsigned 64-bit int |
bool | 1 byte | Boolean |
NumPy Messages (Better Performance)
If NumPy is available, use define_numpy_message for better performance:
from horus.msggen import define_numpy_message
import numpy as np
# NumPy-based message (uses structured arrays internally)
SensorData = define_numpy_message('SensorData', 'sensor.data', [
('x', np.float32),
('y', np.float32),
('z', np.float32),
('temperature', np.float32),
('timestamp', np.uint64),
])
# Create instance
data = SensorData(x=1.0, y=2.0, z=3.0, temperature=25.5, timestamp=0)
# Get underlying numpy structured array
arr = data.to_numpy()
# Zero-copy bytes access
raw = data.to_bytes()
With Topic (IPC)
Runtime messages work with Topic for inter-process communication. Use to_bytes() / from_bytes() for serialization over generic topics:
from horus import Topic
from horus.msggen import define_message
# Define message
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
])
# Publisher — use generic topic with manual serialization
pub_topic = Topic("robot.status")
status = RobotStatus(battery_level=85.0, error_code=0)
pub_topic.send(status.to_bytes())
# Subscriber (different process)
sub_topic = Topic("robot.status")
raw = sub_topic.recv()
if raw:
received = RobotStatus.from_bytes(raw)
print(received.battery_level) # 85.0
Or use the Node convenience API (handles serialization automatically):
import horus
from horus.msggen import define_message
RobotStatus = define_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
])
def publisher_tick(node):
status = RobotStatus(battery_level=85.0, error_code=0)
node.send("robot.status", status.to_bytes())
def subscriber_tick(node):
if node.has_msg("robot.status"):
raw = node.get("robot.status")
status = RobotStatus.from_bytes(raw)
print(status.battery_level) # 85.0
pub = horus.Node("publisher", pubs="robot.status", tick=publisher_tick)
sub = horus.Node("subscriber", subs="robot.status", tick=subscriber_tick)
horus.run(pub, sub, duration=3)
Compiled Messages (Production)
For maximum performance (~3-5μs), compile your messages to Rust. This generates PyO3 bindings with the same zero-copy performance as built-in types.
Step 1: Define Messages
from horus.msggen import register_message
# Register one or more messages
register_message('RobotStatus', 'robot.status', [
('battery_level', 'f32'),
('error_code', 'i32'),
('is_active', 'bool'),
('timestamp', 'u64'),
])
register_message('SensorReading', 'sensor.reading', [
('x', 'f64'),
('y', 'f64'),
('z', 'f64'),
])
Step 2: Build
from horus.msggen import build_messages
# Generate Rust code and rebuild
build_messages() # Runs: maturin develop --release
This generates Rust code in horus_py/src/custom_messages/ and rebuilds the module.
Step 3: Use
After building, your messages are available directly from horus:
from horus import RobotStatus, SensorReading, Topic
# Create typed topic
topic = Topic(RobotStatus)
# Send
status = RobotStatus(battery_level=85.0, error_code=0, is_active=True, timestamp=0)
topic.send(status)
# Receive (typed!)
received = topic.recv()
print(received.battery_level)
YAML Schema (Recommended for Teams)
For larger projects, define messages in YAML:
# messages.yaml
messages:
- name: RobotStatus
topic: robot.status
fields:
- name: battery_level
type: f32
- name: error_code
type: i32
- name: is_active
type: bool
- name: SensorReading
topic: sensor.reading
fields:
- name: x
type: f64
- name: y
type: f64
- name: z
type: f64
from horus.msggen import generate_messages_from_yaml, build_messages
generate_messages_from_yaml('messages.yaml')
build_messages()
Rebuild Detection
The builder tracks message definitions via hash. It won't rebuild unless messages change:
from horus.msggen import check_needs_rebuild, build_messages
if check_needs_rebuild():
build_messages()
else:
print("Messages are up to date")
Force rebuild with:
build_messages(force=True)
Performance Comparison
| Approach | Latency | Throughput | Use Case |
|---|---|---|---|
| Built-in (Rust) | ~3μs | 300K msgs/sec | CmdVel, Pose2D, etc. |
| Compiled Custom | ~3-5μs | 200K msgs/sec | Production custom types |
| Runtime | ~20-40μs | 25K msgs/sec | Prototyping |
| Runtime (NumPy) | ~15-30μs | 35K msgs/sec | NumPy integration |
| Pickle | ~50-100μs | 10K msgs/sec | Legacy/dynamic types |
Recommendation: Start with runtime messages for fast iteration, then compile for production.
API Reference
define_message
def define_message(
name: str,
topic: str,
fields: List[Tuple[str, str]]
) -> Type[RuntimeMessage]
Create a runtime message class.
Parameters:
name: Class name (e.g.,"RobotStatus")topic: Topic name (e.g.,"robot.status")fields: List of(field_name, type_string)tuples
Returns: New message class
define_numpy_message
def define_numpy_message(
name: str,
topic: str,
fields: List[Tuple[str, Any]]
) -> Type[NumpyMessage]
Create a NumPy-based message class.
Parameters:
name: Class nametopic: Topic namefields: List of(field_name, numpy_dtype)tuples
Returns: New NumPy message class
register_message
def register_message(
name: str,
topic: str,
fields: List[Tuple[str, str]]
) -> None
Register a message for compiled generation.
build_messages
def build_messages(
force: bool = False,
verbose: bool = True
) -> bool
Build all registered messages.
Parameters:
force: Rebuild even if unchangedverbose: Print progress
Returns: True if successful
check_needs_rebuild
def check_needs_rebuild() -> bool
Check if registered messages differ from last build.
Complete Example
#!/usr/bin/env python3
"""Custom message example with runtime messages."""
import horus
from horus.msggen import define_message
# Define custom sensor message
MySensor = define_message('MySensor', 'my.sensor', [
('distance', 'f32'),
('angle', 'f32'),
('confidence', 'f32'),
('object_id', 'u32'),
])
def sensor_tick(node):
"""Publish sensor readings."""
reading = MySensor(
distance=2.5,
angle=0.785,
confidence=0.95,
object_id=42
)
node.send("my.sensor", reading.to_bytes())
def processor_tick(node):
"""Process sensor readings."""
if node.has_msg("my.sensor"):
raw = node.get("my.sensor")
reading = MySensor.from_bytes(raw)
print(f"Object {reading.object_id}: {reading.distance}m at {reading.angle}rad")
# Create nodes
sensor = horus.Node("sensor", pubs="my.sensor", tick=sensor_tick, rate=10)
processor = horus.Node("processor", subs="my.sensor", tick=processor_tick)
# Run
horus.run(sensor, processor, duration=3)
When to Use Each Approach
Use Runtime Messages When:
- Prototyping new message types
- Message schema changes frequently
- You don't want to wait for compilation
- Performance requirements are moderate (<50Hz)
Use Compiled Messages When:
- Deploying to production
- High-frequency data (>100Hz)
- Cross-language compatibility required
- Type safety is critical
Use Built-in Messages When:
- Standard robotics types (CmdVel, Pose2D, LaserScan)
- Maximum performance needed
- Compatibility with other HORUS systems