Topics Deep-Dive (Python)
A warehouse robot has a Python ML node running YOLO at 30 FPS, a Rust motor controller ticking at 1 kHz, and a Python dashboard logging everything. The ML node detects an obstacle and needs to tell the motor controller to stop -- in under 2 microseconds, across process boundaries, without the GIL blocking the Rust side.
HORUS topics make this work. Every node.send() writes into a shared-memory ring buffer. Every node.recv() reads from it. No sockets, no serialization for typed messages, no kernel involvement. The Python node and the Rust node share the same memory region -- the ML node writes a CmdVel, and the motor controller reads it directly.
This page covers the Python topic API in depth: how to declare topics, what determines your latency, how to receive messages correctly, and when to use the standalone Topic class outside the scheduler.
How Topics Work
A topic is a named ring buffer in shared memory. When you call node.send("cmd_vel", data), HORUS writes data into the ring buffer for the topic named "cmd_vel". When another node calls node.recv("cmd_vel"), it reads the oldest unread message from that same buffer.
Python Node A Shared Memory Rust/Python Node B
┌─────────────┐ ┌─────────────────────────┐ ┌─────────────────┐
│ node.send() │ ──write──│ Ring Buffer (1024 slots)│──read── │ node.recv() │
│ │ │ topic: "cmd_vel" │ │ │
└─────────────┘ └─────────────────────────┘ └─────────────────┘
Key properties:
- Lock-free: Writers and readers never block each other.
send()always returns immediately. - Bounded: The ring buffer has a fixed capacity (default: 1024 messages). When full, the oldest unread message is overwritten.
- Cross-language: A Python publisher and a Rust subscriber share the exact same SHM region. No bridge process, no translation layer.
- Auto-discovered: Two nodes that use the same topic name are connected automatically. No configuration, no broker.
Three Ways to Declare Topics
Topic declarations go in the pubs and subs parameters of horus.Node(). The declaration style determines the performance path -- string topics use generic serialization, typed topics use zero-copy Pod transfer.
String Topics (Generic)
import horus
node = horus.Node(
name="logger",
subs=["sensor.data", "motor.status"],
pubs=["log.output"],
tick=my_tick,
rate=10,
)
String topics use GenericMessage under the hood -- your data is serialized with MessagePack before writing to SHM. This handles any Python-serializable value: dicts, lists, strings, numbers, nested structures.
def my_tick(node):
# Dicts, lists, nested structures -- all work
node.send("log.output", {
"level": "info",
"message": "Motor started",
"details": {"voltage": 12.4, "current": 1.2},
})
Latency: ~5--50 us per send+recv round-trip (depends on message size). The cost is serialization and deserialization -- MessagePack must convert your Python dict to bytes and back.
When to use: Prototyping, logging, configuration updates, any data that changes shape frequently, or when you need to send arbitrary Python objects.
Typed Topics (Pod Zero-Copy)
from horus import Node, CmdVel, Imu, LaserScan
node = Node(
name="controller",
subs=[Imu, LaserScan],
pubs=[CmdVel],
tick=control_tick,
rate=100,
)
When you pass a message class instead of a string, HORUS uses the Pod (plain-old-data) path. The message type has a fixed binary layout known at compile time. The Python wrapper writes fields directly into SHM -- no serialization step.
Topic names are derived automatically from the type:
| Type | Auto-generated name |
|---|---|
CmdVel | "cmd_vel" |
Imu | "imu" |
LaserScan | "scan" |
Pose2D | "pose2d" |
Odometry | "odometry" |
from horus import CmdVel
def control_tick(node):
cmd = CmdVel(linear=1.0, angular=0.0)
node.send("cmd_vel", cmd)
Latency: ~1.5 us per send+recv round-trip. This is 3--30x faster than the generic path because there is no serialization -- the Pod struct is memory-mapped directly.
When to use: Control loops, sensor data, any hot path where latency matters. Use typed topics for anything running above 10 Hz.
Named Typed Topics
node = horus.Node(
name="dual_arm",
pubs={"left.cmd": CmdVel, "right.cmd": CmdVel},
subs={"left.odom": Odometry, "right.odom": Odometry},
tick=dual_arm_tick,
rate=100,
)
A dict maps custom topic names to types. This gives you the Pod zero-copy performance path with explicit control over the topic name -- essential when you have multiple topics of the same type (two arms, two cameras, four wheels).
def dual_arm_tick(node):
left_odom = node.recv("left.odom")
right_odom = node.recv("right.odom")
if left_odom and right_odom:
node.send("left.cmd", CmdVel(linear=1.0, angular=0.0))
node.send("right.cmd", CmdVel(linear=1.0, angular=0.0))
Latency: Same as typed topics (~1.5 us). The dict syntax only changes the name, not the transport mechanism.
Declaration Summary
| Style | Syntax | Transport | Latency | Best for |
|---|---|---|---|---|
| String | pubs=["data"] | GenericMessage (MessagePack) | ~5--50 us | Dicts, prototyping, flexible data |
| Typed | pubs=[CmdVel] | Pod zero-copy | ~1.5 us | Control loops, sensor data |
| Named typed | pubs={"cmd": CmdVel} | Pod zero-copy | ~1.5 us | Multiple topics of the same type |
You can mix styles in the same node. Use typed topics for your hot-path control data and string topics for debug/logging output:
node = horus.Node(
name="controller",
subs=[Imu], # Typed: fast sensor input
pubs={"cmd": CmdVel, "debug": None}, # Typed CmdVel + string debug
tick=control_tick,
rate=100,
)
Performance: Generic vs Typed
The difference between generic and typed topics is not academic. At high frequencies, serialization overhead dominates:
| Scenario | Generic (string) | Typed (Pod) | Speedup |
|---|---|---|---|
CmdVel (16 bytes) at 100 Hz | ~10 us/msg | ~1.5 us/msg | 6.7x |
Imu (~300 bytes) at 200 Hz | ~25 us/msg | ~2.5 us/msg | 10x |
dict with 5 keys at 30 Hz | ~15 us/msg | N/A | -- |
At 1 kHz, a 25 us generic send+recv consumes 2.5% of every tick cycle. A 1.5 us typed send+recv consumes 0.15%. For a motor controller with a 1 ms budget, that difference is the margin between meeting deadlines and missing them.
Rule of thumb: If the topic runs above 10 Hz and carries structured data with a known schema, use a typed topic. If the data is ad-hoc (debug dicts, configuration blobs, log messages), use a string topic.
Why the Difference?
Generic topics must serialize your Python dict into bytes (MessagePack encoding), copy those bytes into SHM, then deserialize on the receiver side. Typed topics have a fixed binary layout -- the Python wrapper writes each field at its known offset in the SHM slot, and the receiver reads from those same offsets. No encoding, no decoding, no intermediate byte buffer.
Receiving Messages
recv() -- Take One Message
def tick(node):
msg = node.recv("sensor.data")
if msg is not None:
process(msg)
recv() returns the oldest unread message and removes it from the buffer. If nothing is available, it returns None immediately -- it never blocks. Each call consumes exactly one message.
Important: If a publisher sends 5 messages between your ticks, a single recv() returns only the first one. The other 4 remain in the buffer. If you only ever call recv() once per tick but messages arrive faster than you process them, the buffer fills and older messages are dropped.
has_msg() -- Peek Without Consuming
def tick(node):
if node.has_msg("emergency"):
stop_cmd = node.recv("emergency")
handle_emergency(stop_cmd)
else:
do_normal_work(node)
has_msg() checks whether at least one message is waiting on the topic, without consuming it. Internally, it performs a recv() and buffers the result -- the next recv() call returns that same message. This means has_msg() followed by recv() does not skip a message.
Use has_msg() when you need to branch on whether data is available before committing to process it.
recv_all() -- Drain the Buffer
def tick(node):
commands = node.recv_all("commands")
for cmd in commands:
execute(cmd)
node.log_debug(f"Processed {len(commands)} commands")
recv_all() returns a list of all available messages, draining the buffer completely. Returns an empty list if nothing is available.
This is the correct pattern when you must process every message (logging, event recording, command queues) rather than just the latest.
Pattern: Keep-Latest
The most common pattern in control loops -- drain the buffer and act on only the newest message:
def control_tick(node):
# Drain all buffered IMU readings, keep only the latest
latest_imu = None
for msg in node.recv_all("imu"):
latest_imu = msg
if latest_imu is not None:
cmd = compute_control(latest_imu)
node.send("cmd_vel", cmd)
This ensures you always act on the freshest data, even if several messages accumulated between ticks. Stale readings are discarded.
Pattern: Conditional Multi-Topic
def fusion_tick(node):
lidar = node.recv("lidar") if node.has_msg("lidar") else None
camera = node.recv("camera") if node.has_msg("camera") else None
if lidar and camera:
fused = fuse_sensors(lidar, camera)
node.send("fused", fused)
elif lidar:
node.send("fused", lidar_only_estimate(lidar))
Check multiple topics and degrade gracefully when some sensors are unavailable.
Pattern: Batch Processing
def logger_tick(node):
events = node.recv_all("events")
if events:
# Batch write is more efficient than one-at-a-time
with open("log.jsonl", "a") as f:
for event in events:
f.write(json.dumps(event) + "\n")
Collect all messages and process them in a batch. More efficient for I/O-heavy consumers that benefit from batching.
Dropped Messages
Messages are dropped when the ring buffer is full and a new send() overwrites the oldest unread slot. This happens when:
- A publisher sends faster than the subscriber reads (subscriber too slow)
- A subscriber is temporarily blocked (GIL contention, I/O wait, garbage collection pause)
- The buffer capacity is too small for the burst rate
How to Detect Drops
HORUS does not expose a Python-level dropped_count() method on nodes. Instead, use these strategies:
Sequence numbers: Add a counter to your messages and check for gaps on the receiver side:
send_seq = 0
def publisher_tick(node):
global send_seq
send_seq += 1
node.send("data", {"seq": send_seq, "value": read_sensor()})
last_seq = 0
def subscriber_tick(node):
global last_seq
for msg in node.recv_all("data"):
if msg["seq"] != last_seq + 1:
node.log_warning(f"Dropped {msg['seq'] - last_seq - 1} messages")
last_seq = msg["seq"]
process(msg)
Rate monitoring: Use horus monitor --tui to watch topic publish and subscribe rates in real time. If the publish rate consistently exceeds the subscribe rate, drops are happening.
How to Prevent Drops
| Strategy | How | When |
|---|---|---|
| Drain every tick | Use recv_all() instead of single recv() | Always, unless you only need latest |
| Increase capacity | default_capacity=4096 on Node | Bursty publishers |
| Keep-latest pattern | Drain + use only the newest | Control loops |
| Reduce publisher rate | Lower the publisher's rate | Publisher is too fast for the subscriber |
| Use Rust for the subscriber | Rust nodes process messages faster | GIL is the bottleneck |
The default buffer capacity is 1024 messages. For most applications, this is large enough that drops only occur under sustained overload, not brief bursts.
Topic Naming Rules
HORUS topic names use dots for hierarchy. Never use slashes.
# CORRECT
pubs=["sensor.temperature"]
pubs=["robot.arm.left.position"]
pubs=["camera.front.rgb"]
# WRONG -- fails on macOS, may cause subtle bugs on Linux
pubs=["sensor/temperature"]
pubs=["robot/arm/left/position"]
Slashes fail because shm_open (the POSIX system call that creates shared memory regions) interprets / as a directory separator on macOS. A topic named "sensor/temperature" tries to create /dev/shm/sensor/temperature as a nested path, which fails if the intermediate directory does not exist. Dots work identically on Linux and macOS.
| Framework | Separator | Example |
|---|---|---|
| ROS / ROS2 | / | /sensor/lidar |
| HORUS | . | sensor.lidar |
Naming Conventions
| Pattern | Example | Use for |
|---|---|---|
subsystem.data | sensor.temperature | Most topics |
subsystem.device.data | camera.front.rgb | Multi-device systems |
robot.subsystem.data | robot1.motor.cmd_vel | Multi-robot fleets |
Avoid: Names starting with _ (reserved for internal use), names containing special characters, names containing /.
Cross-Process Topics
Topics work across process boundaries with zero configuration. A Python node in one process and a Rust node in another process can share the same topic -- HORUS handles everything through shared memory.
How Auto-Discovery Works
When a node creates a topic, HORUS:
- Creates (or opens) a shared-memory file named after the topic (e.g.,
horus_cmd_velfor topic"cmd_vel") - Writes a
.metadiscovery file containing the topic's type, capacity, and creator PID - Maps the SHM region into the process's address space
When a second node (even in a different process, even in a different language) creates a topic with the same name, HORUS:
- Finds the existing SHM file
- Maps it into the new process's address space
- Both processes now read and write to the same ring buffer
No broker. No discovery service. No network. Just filesystem-level coordination through SHM metadata.
Python + Rust Interop
A Python node and a Rust node share topics seamlessly when they use the same typed messages:
Python process:
from horus import Node, CmdVel, run
def control_tick(node):
node.send("cmd_vel", CmdVel(linear=1.0, angular=0.5))
controller = Node(name="py_controller", pubs=[CmdVel], tick=control_tick, rate=50)
run(controller)
Rust process (running separately):
use horus::prelude::*;
struct Motor { cmd_sub: Topic<CmdVel> }
impl Node for Motor {
fn name(&self) -> &str { "Motor" }
fn tick(&mut self) {
if let Some(cmd) = self.cmd_sub.recv() {
drive(cmd.linear, cmd.angular);
}
}
}
Both use the CmdVel Pod type. The Python side writes fields at the same memory offsets the Rust side reads from. No serialization, no translation. The Rust node sees the exact bytes the Python node wrote.
Cross-process topics use the SHM backend (~50--167 ns for Rust-to-Rust). The Python overhead (~1.5 us for typed, ~5--50 us for generic) is from the Python/Rust FFI boundary and MessagePack serialization, not from the SHM transport itself.
Namespace Isolation
By default, each terminal session gets its own SHM namespace (based on session ID and user ID). Two horus run commands in different terminals do not share topics.
To explicitly share topics across terminals or processes:
# Both processes must use the same namespace
HORUS_NAMESPACE=shared horus run my_publisher.py
HORUS_NAMESPACE=shared horus run my_subscriber.py
Standalone Topic Class
The node.send() and node.recv() methods work inside the scheduler tick loop. For code that runs outside the scheduler -- scripts, tests, debugging tools, one-shot publishers -- use the standalone Topic class directly.
from horus import Topic, CmdVel
# Create a typed topic (Pod zero-copy)
cmd_topic = Topic(CmdVel)
# Send a message
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))
# Receive a message
msg = cmd_topic.recv()
if msg is not None:
print(f"linear={msg.linear}, angular={msg.angular}")
Constructor
Topic(msg_type, capacity=None, endpoint=None)
| Parameter | Type | Description |
|---|---|---|
msg_type | type or str | Message class (e.g., CmdVel) for typed topics, or a string name for generic topics |
capacity | int or None | Ring buffer capacity. None uses the default (1024) |
endpoint | str or None | Custom topic name. None auto-derives from the type |
Methods
| Method | Signature | Description |
|---|---|---|
send | send(message, node=None) -> bool | Write a message to the ring buffer. Returns True on success |
recv | recv(node=None) -> Optional[Any] | Read the next message. Returns None if empty |
Properties
| Property | Type | Description |
|---|---|---|
name | str | Topic name |
msg_type | type | Message type class |
endpoint | str or None | Custom endpoint if set |
backend_type | str | Current backend (e.g., "shm", "intra") |
When to Use Standalone Topics
Testing: Send test data to a node and verify its output without running the full scheduler.
from horus import Topic, CmdVel
# Inject test data
cmd_topic = Topic(CmdVel)
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))
# Verify the node processed it
output_topic = Topic("output")
result = output_topic.recv()
assert result is not None
One-shot commands: Send a single command from a script and exit.
from horus import Topic, CmdVel
# Send emergency stop
Topic(CmdVel).send(CmdVel(linear=0.0, angular=0.0))
Monitoring tools: Read topic data from a separate process for visualization or logging.
from horus import Topic, Imu
import time
imu_topic = Topic(Imu)
while True:
msg = imu_topic.recv()
if msg:
print(f"accel=({msg.ax:.2f}, {msg.ay:.2f}, {msg.az:.2f})")
time.sleep(0.01)
The standalone Topic class does not participate in the scheduler's tick loop. It has no rate control, no budget enforcement, and no lifecycle management. For sustained publishing or subscribing, use horus.Node() with the scheduler.
Auto-Created Topics
Topics used in send() or recv() that were not declared in pubs/subs are created automatically on first use:
def tick(node):
# "debug.info" was not in pubs -- auto-created as a GenericMessage topic
node.send("debug.info", {"tick": 42, "status": "ok"})
Auto-created topics always use the GenericMessage path (string-style, ~5--50 us). If you want the typed fast path, you must declare the topic in pubs/subs with a message type.
Auto-creation is convenient for debugging and prototyping but should not be relied on in production -- undeclared topics make the data flow harder to trace and always take the slower generic path.
Design Decisions
Why three declaration styles instead of one? String topics are the simplest possible API -- you just name your data channel and send anything through it. Typed topics exist because serialization overhead matters at high frequencies. Named typed topics exist because real robots have multiple instances of the same sensor type (two cameras, four wheels) and need distinct names with the same zero-copy performance. Each style exists to serve a different need; none is universally better.
Why is the default capacity 1024 messages? A control loop at 1 kHz produces 1000 messages per second. At the default capacity, a subscriber can fall a full second behind before messages are dropped. This is generous enough for most applications while keeping memory usage bounded. Increase it for bursty workloads; decrease it if memory is constrained.
Why does recv() return None instead of blocking?
Blocking in a tick function stalls the entire scheduler -- every other node waits until the blocked node returns. Returning None lets the node continue to its next tick, and the scheduler maintains its timing guarantees. This is the same design as Rust's Option-based recv() for the same reason.
Why auto-create topics?
Strict declaration-only topics are safer (every data flow is explicit), but they add friction during prototyping. Auto-creation lets you add a send("debug.foo", ...) call without modifying the node constructor. The trade-off is that auto-created topics are invisible in the node's pubs/subs lists and always use the slower generic path -- a deliberate incentive to declare your topics properly for production.
Why the standalone Topic class?
Nodes require a scheduler. But testing, scripting, and one-shot commands should not need a full scheduler setup. The standalone Topic class provides direct SHM access for these use cases, at the cost of no lifecycle management.
Trade-offs
| Gain | Cost |
|---|---|
| Three declaration styles -- pick the right abstraction level | Must understand the performance difference between generic and typed |
| Auto-created topics -- fast prototyping, no boilerplate | Undeclared topics take the slow generic path; hidden data flow |
| 1024-slot default buffer -- generous for most workloads | Uses more memory than Rust's default 4-slot buffer |
recv() returns None -- never blocks the scheduler | Must check for None on every call (no exceptions on empty) |
| Pod zero-copy for typed -- 1.5 us cross-process | Only works for fixed-layout message types, not arbitrary dicts |
| Cross-process transparency -- same API for in-process and cross-process | All cross-process topics incur SHM overhead; cannot force in-process-only |
Standalone Topic class -- works outside the scheduler | No rate control, no lifecycle, no budget enforcement |
See Also
- Topics: How Nodes Talk -- Beginner introduction to topics with first-principles explanations
- Topics -- Full Reference -- Rust-focused deep dive: 10 backends, live migration, capacity tuning
- Python Bindings -- Complete Python API reference for Node, Scheduler, and Topic
- Custom Messages -- Define your own typed messages in Python
- Message Library -- 55+ typed message classes (CmdVel, Imu, LaserScan, etc.)
- Shared Memory -- How SHM works under the hood: ring buffers, directory structure, namespace isolation