Tutorial 4: Custom Messages (Python)
Learn three ways to send data between Python nodes: typed messages (fast), dicts (flexible), and dataclasses (structured).
What You'll Learn
- Using built-in typed messages (
CmdVel,Imu,Odometry) - Sending Python dicts as generic messages
- Using dataclasses for structured Python-only data
- Compiling custom messages with
maturinfor production performance - Performance trade-offs between approaches
- When to use which approach
Prerequisites: Tutorial 1 (Python) completed. A working horus installation.
Time: 25 minutes
Approach 1: Built-in Typed Messages (Fastest)
Use HORUS's built-in message types for maximum performance (~1.7us latency, zero-copy with Rust nodes). These types are defined in Rust and exposed to Python via PyO3 bindings, so they work seamlessly across both languages.
When to use typed messages
- Your data matches a standard robotics message (IMU, velocity, pose, etc.)
- You need cross-language compatibility (Python publisher, Rust subscriber, or vice versa)
- You want the fastest possible latency without a custom build step
- You are building control loops, sensor pipelines, or motor drivers
Complete example
Create typed_demo.py:
import horus
# --- Node tick functions ---
def imu_sensor_tick(node):
"""Simulate an IMU sensor publishing orientation and acceleration."""
imu = horus.Imu()
# Imu fields: orientation[4], angular_velocity[3], linear_acceleration[3]
# All fields default to 0.0 — set what you need
node.send("imu", imu)
def motor_driver_tick(node):
"""Receive velocity commands and 'drive' motors."""
cmd = node.recv("cmd_vel")
if cmd is not None:
print(f"[Motor] linear={cmd.linear:.2f} angular={cmd.angular:.2f}")
def controller_tick(node):
"""Read IMU data and publish velocity commands."""
imu = node.recv("imu")
if imu is not None:
# Simple proportional controller: slow down if tilting
tilt = abs(imu.linear_acceleration[0])
speed = max(0.1, 1.0 - tilt * 0.5)
cmd = horus.CmdVel(linear=speed, angular=0.0)
node.send("cmd_vel", cmd)
print(f"[Ctrl] tilt={tilt:.2f} -> speed={speed:.2f}")
# --- Node definitions ---
sensor = horus.Node(
name="ImuSensor",
tick=imu_sensor_tick,
rate=100,
order=0,
pubs=[horus.Imu], # typed topic — HORUS knows the type at registration
)
controller = horus.Node(
name="Controller",
tick=controller_tick,
rate=100,
order=1,
subs=[horus.Imu],
pubs=[horus.CmdVel],
)
motor = horus.Node(
name="MotorDriver",
tick=motor_driver_tick,
rate=100,
order=2,
subs=[horus.CmdVel],
)
# --- Run ---
horus.run(sensor, controller, motor, duration=5.0)
Create horus.toml in the same directory:
[project]
name = "typed-demo"
version = "0.1.0"
language = "python"
entry = "typed_demo.py"
Run it:
horus run
Expected output:
[Ctrl] tilt=0.00 -> speed=1.00
[Motor] linear=1.00 angular=0.00
[Ctrl] tilt=0.00 -> speed=1.00
[Motor] linear=1.00 angular=0.00
...
Key details
pubs=[horus.Imu]registers a typed topic. HORUS uses the type's built-in topic name (e.g.,horus.Imumaps to"imu").node.send("imu", imu)sends to the topic by name. The name must match what subscribers use.node.recv("imu")returnsNoneif no message has arrived since the last read.- All 70+ built-in types are zero-copy across the Rust/Python boundary: the Python object wraps the same shared memory that Rust nodes read.
Available types: CmdVel, Imu, Odometry, LaserScan, Pose2D, Pose3D, Twist, BatteryState, JointState, MotorCommand, ServoCommand, EmergencyStop, and 60+ more. See Message Types for the full list.
Approach 2: Python Dicts (Most Flexible)
Send any Python dict as a message. HORUS serializes it with MessagePack under the hood (~6-50us latency depending on dict size). No type registration, no build step, no schema definition.
When to use dicts
- You are prototyping and the message schema is still evolving
- Your data does not fit any built-in type (configuration blobs, experiment metadata, etc.)
- You only need Python-to-Python communication
- The topic runs at low frequency (1-10Hz) where microsecond overhead does not matter
Complete example
Create dict_demo.py:
import horus
import time
# --- Node tick functions ---
def environment_sensor_tick(node):
"""Simulate an environment sensor with multiple readings."""
reading = {
"temperature": 23.5 + (horus.rng_float() * 2.0 - 1.0),
"humidity": 0.65 + (horus.rng_float() * 0.1 - 0.05),
"pressure": 1013.25,
"location": "lab_room_3",
"tags": ["indoor", "calibrated"],
"nested": {
"sensor_id": 42,
"firmware": "v2.1.0",
},
}
node.send("environment", reading)
def logger_tick(node):
"""Log environment data to the console."""
data = node.recv("environment")
if data is not None:
temp = data["temperature"]
humidity = data["humidity"] * 100
sensor_id = data["nested"]["sensor_id"]
print(f"[Logger] sensor={sensor_id} temp={temp:.1f}C humidity={humidity:.0f}%")
def alert_tick(node):
"""Check for out-of-range readings and publish alerts."""
data = node.recv("environment")
if data is not None:
if data["temperature"] > 24.0:
alert = {
"type": "temperature_high",
"value": data["temperature"],
"threshold": 24.0,
"location": data["location"],
}
node.send("alerts", alert)
print(f"[Alert] Temperature {data['temperature']:.1f}C exceeds 24.0C")
# --- Node definitions ---
sensor = horus.Node(
name="EnvSensor",
tick=environment_sensor_tick,
rate=1, # 1 Hz — environment data changes slowly
order=0,
pubs=["environment"], # string topic name = dict-based message
)
logger = horus.Node(
name="Logger",
tick=logger_tick,
rate=1,
order=1,
subs=["environment"],
)
alerter = horus.Node(
name="Alerter",
tick=alert_tick,
rate=1,
order=2,
subs=["environment"],
pubs=["alerts"],
)
# --- Run ---
horus.run(sensor, logger, alerter, duration=10.0)
Create horus.toml:
[project]
name = "dict-demo"
version = "0.1.0"
language = "python"
entry = "dict_demo.py"
Run it:
horus run
Expected output:
[Logger] sensor=42 temp=23.8C humidity=63%
[Logger] sensor=42 temp=22.9C humidity=67%
[Alert] Temperature 24.3C exceeds 24.0C
[Logger] sensor=42 temp=24.3C humidity=62%
...
What dicts support
Dicts can contain any MessagePack-compatible value:
| Python type | Supported | Notes |
|---|---|---|
str | Yes | UTF-8 strings |
int | Yes | Arbitrary precision |
float | Yes | 64-bit float |
bool | Yes | True / False |
list | Yes | Heterogeneous allowed |
dict | Yes | Nested dicts work |
None | Yes | Serialized as nil |
bytes | Yes | Raw binary data |
| Custom objects | No | Use asdict() or __dict__ first |
Limitations
- No cross-language support. Rust nodes cannot subscribe to dict-based topics (they do not know the schema). If you need Rust interop, use typed messages (Approach 1) or compiled messages (Approach 4).
- No type checking. A typo in a key name (
data["temperture"]) is a runtimeKeyError, not a compile-time error. - Size overhead. MessagePack includes key names in every message. A dict with long key names is larger than an equivalent struct.
Approach 3: Dataclasses (Structured Python)
For structured Python-only data, define @dataclass classes and serialize them to dicts with asdict(). This gives you IDE autocompletion, type hints, and constructor validation while still using the dict transport under the hood.
When to use dataclasses
- You want Python type checking and IDE support
- Multiple team members work on the same message schema
- You need default values, validation, or computed fields
- The data stays within Python (no Rust subscribers)
Complete example
Create dataclass_demo.py:
import horus
from dataclasses import dataclass, asdict, field
from typing import List
# --- Message definitions ---
@dataclass
class BatteryReading:
voltage: float
current: float
temperature: float
charge_percent: float
cell_count: int = 3
def is_critical(self) -> bool:
"""Business logic directly on the message."""
return self.voltage < 10.5 or self.temperature > 60.0
@dataclass
class BatteryAlert:
level: str # "info", "warning", "critical"
message: str
voltage: float
charge_percent: float
@dataclass
class SystemStatus:
battery: BatteryReading
alerts: List[str] = field(default_factory=list)
uptime_secs: float = 0.0
# --- Node tick functions ---
tick_counter = {"value": 0}
def battery_sensor_tick(node):
"""Simulate a draining battery."""
tick_counter["value"] += 1
t = tick_counter["value"]
reading = BatteryReading(
voltage=12.6 - (t * 0.15),
current=2.5,
temperature=35.0 + (t * 0.3),
charge_percent=max(0.0, 100.0 - t * 5.0),
)
# asdict() converts the dataclass to a dict for transport
node.send("battery.raw", asdict(reading))
print(f"[Battery] {reading.charge_percent:.0f}% ({reading.voltage:.1f}V)")
def monitor_tick(node):
"""Watch battery readings and publish alerts."""
data = node.recv("battery.raw")
if data is not None:
# Reconstruct the dataclass from the dict
reading = BatteryReading(**data)
if reading.is_critical():
alert = BatteryAlert(
level="critical",
message=f"Battery critical: {reading.voltage:.1f}V",
voltage=reading.voltage,
charge_percent=reading.charge_percent,
)
node.send("battery.alert", asdict(alert))
print(f"[Monitor] CRITICAL: {alert.message}")
elif reading.charge_percent < 30.0:
alert = BatteryAlert(
level="warning",
message=f"Battery low: {reading.charge_percent:.0f}%",
voltage=reading.voltage,
charge_percent=reading.charge_percent,
)
node.send("battery.alert", asdict(alert))
print(f"[Monitor] WARNING: {alert.message}")
def dashboard_tick(node):
"""Aggregate battery data and alerts into a system status."""
battery_data = node.recv("battery.raw")
alert_data = node.recv("battery.alert")
if battery_data is not None:
battery = BatteryReading(**battery_data)
alerts = []
if alert_data is not None:
alerts.append(alert_data["message"])
status = SystemStatus(
battery=battery,
alerts=alerts,
uptime_secs=tick_counter["value"],
)
# Nested dataclasses serialize correctly with asdict()
node.send("system.status", asdict(status))
# --- Node definitions ---
battery = horus.Node(
name="Battery",
tick=battery_sensor_tick,
rate=1,
order=0,
pubs=["battery.raw"],
)
monitor = horus.Node(
name="Monitor",
tick=monitor_tick,
rate=1,
order=1,
subs=["battery.raw"],
pubs=["battery.alert"],
)
dashboard = horus.Node(
name="Dashboard",
tick=dashboard_tick,
rate=1,
order=2,
subs=["battery.raw", "battery.alert"],
pubs=["system.status"],
)
# --- Run ---
horus.run(battery, monitor, dashboard, duration=20.0)
Create horus.toml:
[project]
name = "dataclass-demo"
version = "0.1.0"
language = "python"
entry = "dataclass_demo.py"
Run it:
horus run
Expected output:
[Battery] 95% (12.5V)
[Battery] 90% (12.3V)
[Battery] 85% (12.2V)
...
[Battery] 25% (10.9V)
[Monitor] WARNING: Battery low: 25%
[Battery] 20% (10.7V)
[Monitor] WARNING: Battery low: 20%
...
[Battery] 5% (10.4V)
[Monitor] CRITICAL: Battery critical: 10.4V
Dataclass tips
asdict()handles nesting. IfSystemStatuscontains aBatteryReading,asdict()recursively converts both.- Reconstruction with
**data.BatteryReading(**data)works becauseasdict()preserves field names. Nested dataclasses come back as plain dicts though, so you need to reconstruct them manually:reading = BatteryReading(**data["battery"]). - Validation in
__post_init__. Add a__post_init__method to validate on construction:
@dataclass
class BatteryReading:
voltage: float
current: float
temperature: float
charge_percent: float
def __post_init__(self):
if not 0.0 <= self.charge_percent <= 100.0:
raise ValueError(f"charge_percent must be 0-100, got {self.charge_percent}")
- Methods survive transport. After
BatteryReading(**data), you can callreading.is_critical()-- the methods are on the class, not on the dict.
Approach 4: Compiled Messages with maturin (Production)
For Python-only custom messages at high frequency, use horus.msggen to define a message schema, then compile it with maturin for near-native performance (~3-5us). This approach requires a build step but gives you a typed, binary-serialized message without writing any Rust yourself.
When to use compiled messages
- You have a custom message type that does not match any built-in type
- The topic runs at high frequency (50Hz+) and dict overhead is too high
- You want binary serialization without writing Rust
- You are deploying to production and want predictable performance
Complete example
Create compiled_demo.py:
from horus.msggen import define_message
import horus
# --- Define a custom message type ---
# This creates a Python class backed by fixed-layout binary serialization.
# At runtime (no maturin), latency is ~20-40us.
# After `maturin develop`, latency drops to ~3-5us.
RobotStatus = define_message("RobotStatus", "robot.status", [
("battery_level", "f32"),
("motor_rpm", "f32"),
("error_code", "i32"),
("is_active", "bool"),
("tick_count", "u64"),
])
WheelSpeed = define_message("WheelSpeed", "wheel.speed", [
("left_rpm", "f32"),
("right_rpm", "f32"),
("timestamp_ns", "u64"),
])
# --- Node tick functions ---
counter = {"value": 0}
def status_publisher_tick(node):
"""Publish robot status at 50 Hz."""
counter["value"] += 1
status = RobotStatus(
battery_level=85.0,
motor_rpm=3200.0 + (counter["value"] % 100),
error_code=0,
is_active=True,
tick_count=counter["value"],
)
node.send("robot.status", status)
def wheel_publisher_tick(node):
"""Publish wheel speeds at 50 Hz."""
speed = WheelSpeed(
left_rpm=150.0,
right_rpm=148.0,
timestamp_ns=counter["value"] * 20_000_000, # 20ms per tick
)
node.send("wheel.speed", speed)
def dashboard_tick(node):
"""Read both topics and display."""
status = node.recv("robot.status")
wheels = node.recv("wheel.speed")
if status is not None and wheels is not None:
print(
f"[Dash] battery={status.battery_level:.0f}% "
f"rpm={status.motor_rpm:.0f} "
f"wheels=({wheels.left_rpm:.0f}, {wheels.right_rpm:.0f})"
)
# --- Node definitions ---
status_pub = horus.Node(
name="StatusPub",
tick=status_publisher_tick,
rate=50,
order=0,
pubs=["robot.status"],
)
wheel_pub = horus.Node(
name="WheelPub",
tick=wheel_publisher_tick,
rate=50,
order=1,
pubs=["wheel.speed"],
)
dash = horus.Node(
name="Dashboard",
tick=dashboard_tick,
rate=50,
order=2,
subs=["robot.status", "wheel.speed"],
)
# --- Run ---
horus.run(status_pub, wheel_pub, dash, duration=5.0)
Create horus.toml:
[project]
name = "compiled-demo"
version = "0.1.0"
language = "python"
entry = "compiled_demo.py"
This example runs without any build step (runtime mode, ~20-40us). To get compiled performance (~3-5us), run:
# One-time setup: install maturin
pip install maturin
# Compile the message definitions
maturin develop
After compilation, the same code runs with binary serialization instead of Python's struct module.
Expected output (both runtime and compiled):
[Dash] battery=85% rpm=3201 wheels=(150, 148)
[Dash] battery=85% rpm=3202 wheels=(150, 148)
[Dash] battery=85% rpm=3203 wheels=(150, 148)
...
Performance Comparison
| Approach | Latency | Cross-language? | Build step? | Best for |
|---|---|---|---|---|
Typed (horus.CmdVel) | ~1.7us | Yes (Rust + Python) | No | Control loops, typed sensor data |
| Dict (GenericMessage) | ~6-50us | No (Python only) | No | Prototyping, flexible schemas |
| Dataclass (runtime) | ~20-40us | No (Python only) | No | Structured Python-only data |
| Compiled (maturin) | ~3-5us | No (Python only) | Yes (maturin develop) | Production, high-frequency custom types |
Latency is measured end-to-end: serialize on the publisher, write to shared memory, read on the subscriber, deserialize. Dict latency varies with payload size (6us for a small dict, 50us for a dict with large nested structures).
Choosing the right approach
Follow this decision process:
Does your data match a built-in type? Use typed messages (Approach 1). They are the fastest option with no build step, and they work across Rust and Python. If you are sending CmdVel, Imu, Odometry, or any of the 70+ standard types, there is no reason to use anything else.
Are you prototyping? Use dicts (Approach 2). You can change the schema by editing a Python dict literal. No class definitions, no registration, no build step. When the schema stabilizes, migrate to typed or compiled messages.
Do you need structure but stay Python-only? Use dataclasses (Approach 3). You get IDE autocompletion, type hints, and constructor validation. The transport cost is the same as dicts (they serialize as dicts), but your code is cleaner and more maintainable.
Do you need a custom type at high frequency? Use compiled messages (Approach 4). Define the schema once with define_message(), compile with maturin, and get ~3-5us latency with binary serialization. This is the right choice for production Python nodes running at 50Hz or above with custom data.
Do you need cross-language custom types? Define the message in Rust with message! and expose it to Python via PyO3. See Tutorial 4 (Rust) for the Rust side and Python API: Custom Messages for binding patterns.
Common Mistakes
Mixing typed and string topic names. If you register pubs=[horus.Imu] but send with node.send("imu", some_dict), the subscriber gets a dict, not an Imu object. Always match the registration type with the send type.
Forgetting asdict(). Sending a raw dataclass instance without asdict() will fail at serialization. Always wrap: node.send("topic", asdict(my_dataclass)).
Nested dataclass reconstruction. BatteryReading(**data) works for flat dataclasses, but if your dataclass contains another dataclass, the nested field comes back as a plain dict. Reconstruct it manually:
@dataclass
class Outer:
inner: Inner
value: float
# After receiving:
data = node.recv("topic")
if data is not None:
inner = Inner(**data["inner"])
outer = Outer(inner=inner, value=data["value"])
Dict key typos. data["temperture"] (typo) raises KeyError at runtime. Dataclasses catch this at construction time: BatteryReading(temperture=23.5) raises TypeError. This is why Approach 3 is better for team projects.
Next Steps
- Tutorial 5: Hardware Drivers (Python) -- connect to real hardware
- Tutorial 4 (Rust) -- same tutorial in Rust with
message!macro - Message Types -- full list of 70+ built-in types
- Python API: Custom Messages --
define_message(),define_numpy_message(), and compiled workflows
See Also
- Custom Messages (Rust) -- Rust version
- Python Custom Messages -- Python API
- Performance Optimization -- benchmarks and tuning