Topic and Pub/Sub
Key Takeaways
After reading this guide, you will understand:
- How Topic provides ultra-fast pub/sub communication (~3ns to ~167ns latency) with automatic optimization
- The send() and recv() methods for publishing and subscribing to typed topics
- Communication patterns (one-to-one, one-to-many, many-to-one, many-to-many) for different architectures
- When to use Topic for real-time, single-machine communication vs network-based messaging
The Topic is HORUS's ultra-low latency publish-subscribe (pub/sub) communication system. It enables nodes to exchange messages through shared memory IPC with automatic optimization — from ~3ns (same-thread) to ~167ns (cross-process, many-to-many) depending on topology.
What is a Topic?
A Topic<T> is a typed communication channel that connects publishers and subscribers. The system automatically detects the optimal backend based on:
- Where participants are (same thread, same process, or different processes)
- How many publishers and subscribers exist
You just call send() and recv() — HORUS handles the rest.
Key Features
Automatic Optimization: The fastest communication path is auto-detected at runtime
Zero-Copy Communication: Simple fixed-size types are shared directly in memory without serialization
Type Safety: Compile-time guarantees for message types
Live Migration: Communication paths upgrade/downgrade transparently as participants join or leave
Automatic Optimization
HORUS auto-selects the fastest communication path based on your topology. You never need to configure this — it happens automatically when participants call send() or recv().
| Scenario | Latency |
|---|---|
| Same thread | ~3ns |
| Same process, 1:1 | ~18ns |
| Same process, many-to-many | ~36ns |
| Cross-process | ~50-167ns |
Note: Latencies are for small messages (16B). Larger messages scale linearly with size.
If topology changes (e.g., a second subscriber joins), HORUS automatically migrates to the optimal path without dropping messages.
// All of these use the same API — optimization is automatic
let topic: Topic<f32> = Topic::new("velocity")?;
topic.send(1.5);
let msg = topic.recv();
Tip: Simple fixed-size structs (no
String,Vec,Box) automatically get a faster zero-copy path. Both types work with the same API — HORUS picks the fastest path for you.
Basic Usage
Creating a Topic
use horus::prelude::*;
// Create a Topic for f32 values on topic "velocity"
let topic: Topic<f32> = Topic::new("velocity")?;
Publishing Messages
use horus::prelude::*;
struct Publisher {
velocity_pub: Topic<f32>,
}
impl Publisher {
fn new() -> Result<Self> {
Ok(Self {
velocity_pub: Topic::new("velocity")?,
})
}
}
impl Node for Publisher {
fn name(&self) -> &str {
"Publisher"
}
fn tick(&mut self) {
let velocity = 1.5;
// Send message — infallible, non-blocking
self.velocity_pub.send(velocity);
}
}
Subscribing to Messages
use horus::prelude::*;
struct Subscriber {
velocity_sub: Topic<f32>,
}
impl Subscriber {
fn new() -> Result<Self> {
Ok(Self {
velocity_sub: Topic::new("velocity")?,
})
}
}
impl Node for Subscriber {
fn name(&self) -> &str {
"Subscriber"
}
fn tick(&mut self) {
if let Some(velocity) = self.velocity_sub.recv() {
println!("Received velocity: {}", velocity);
}
}
}
Transport
Topics use local shared memory for ultra-fast communication between processes on the same machine.
// Automatically uses local shared memory
let topic: Topic<SensorData> = Topic::new("sensors")?;
Performance: ~3ns to ~167ns depending on topology Use case: All nodes on same machine Pros: Ultra-fast, deterministic, zero-copy
For multi-machine communication, HORUS supports zero-configuration peer discovery via mDNS (requires the mdns feature flag).
API Reference
Note: HORUS separates introspection from the hot path. The
send()andrecv()methods have zero logging overhead by default.
send()
pub fn send(&self, msg: T)
Publishes a message to the topic. Infallible — the oldest unread message is overwritten when the buffer is full.
// send() always succeeds — no error handling needed
topic.send(data);
recv()
pub fn recv(&self) -> Option<T>
Receives the next message from the topic. Returns None if no message is available. Non-blocking.
if let Some(data) = topic.recv() {
// Process the received message
}
try_send()
pub fn try_send(&self, msg: T) -> Result<(), T>
Attempts to send a message, returning it on failure. Unlike send() which silently handles errors, try_send() gives you explicit control over failure cases.
match topic.try_send(data) {
Ok(()) => { /* sent successfully */ }
Err(returned_data) => { /* buffer full, message returned */ }
}
try_recv()
pub fn try_recv(&self) -> Option<T>
Attempts to receive a message. Functionally identical to recv() for most use cases — both are non-blocking and return Option<T>.
send_blocking()
pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>
Sends a message, blocking until buffer space is available or the timeout expires. Unlike send() which overwrites the oldest message when full, send_blocking() guarantees delivery or returns an explicit error.
Use this for critical command topics (emergency stop, motor setpoints) where message loss is unacceptable.
use std::time::Duration;
// Block up to 1ms waiting for buffer space
match topic.send_blocking(emergency_stop_cmd, Duration::from_millis(1)) {
Ok(()) => { /* delivered */ }
Err(_) => { hlog!(error, "Failed to deliver emergency stop!"); }
}
read_latest()
pub fn read_latest(&self) -> Option<T>
where
T: Copy,
Returns the most recent message without advancing the consumer position. Calling it multiple times returns the same message until a new one is published. Useful for reading infrequently-updated data like static transforms or configuration.
T: Copyrequirement:read_latest()requiresT: Copyto guarantee safe concurrent reads. Types with heap allocations (String,Vec, etc.) should userecv()instead.
// Read latest transform (doesn't consume it)
if let Some(transform) = tf_topic.read_latest() {
self.apply_transform(transform);
}
has_message()
pub fn has_message(&self) -> bool
Checks if at least one message is available without consuming it.
pending_count()
pub fn pending_count(&self) -> u64
Returns the number of messages waiting to be consumed.
Introspection Methods
Topic provides several methods for monitoring and debugging at runtime:
// Topic name
let name = topic.name(); // "velocity"
// Check message availability
if topic.has_message() {
println!("{} messages pending", topic.pending_count());
}
// Message loss tracking
let dropped = topic.dropped_count();
if dropped > 0 {
hlog!(warn, "Dropped {} messages on '{}'", dropped, topic.name());
}
// Connection monitoring
println!("Publishers: {}, Subscribers: {}", topic.pub_count(), topic.sub_count());
// Backend info (auto-selected by HORUS)
println!("Using backend: {}", topic.backend_name());
// Detailed metrics
let metrics = topic.metrics();
println!("Sent: {}, Received: {}", metrics.messages_sent(), metrics.messages_received());
println!("Send failures: {}, Recv failures: {}", metrics.send_failures(), metrics.recv_failures());
| Method | Returns | Description |
|---|---|---|
name() | &str | Topic name |
metrics() | TopicMetrics | Message counts and failure stats |
dropped_count() | u64 | Messages dropped (buffer overflows) |
has_message() | bool | At least one message available |
pending_count() | u64 | Messages waiting to be consumed |
backend_name() | &'static str | Auto-selected IPC backend name |
pub_count() | u32 | Number of active publishers |
sub_count() | u32 | Number of active subscribers |
TopicMetrics provides aggregated statistics:
| Method | Returns | Description |
|---|---|---|
messages_sent() | u64 | Total messages published |
messages_received() | u64 | Total messages consumed |
send_failures() | u64 | Failed send attempts |
recv_failures() | u64 | Failed receive attempts |
Runtime Debug Logging
Debug logging is toggled at runtime from the TUI monitor — no code changes or recompilation needed. Select a topic in the Topics tab and press Enter to start logging; press Esc to stop.
When debug logging is active, every send() and recv() records timing and message summaries to the log buffer. If the message type implements LogSummary, rich summaries are logged; otherwise, metadata-only entries (topic name, direction, IPC latency) are produced.
horus monitor --tui # Open TUI, navigate to Topics tab, press Enter on a topic
Debug logging adds no overhead when disabled — introspection is fully separated from the hot path.
with_capacity()
pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> Result<Self>
Creates a topic with a custom ring buffer capacity. By default, HORUS auto-sizes capacity based on the message type size. Use this when you need more buffering (e.g., bursty producers).
// 256-slot ring buffer (rounded up to next power of 2)
let topic: Topic<f32> = Topic::with_capacity("velocity", 256, None)?;
Type-Safe Topic Descriptors
The topics! macro defines compile-time topic descriptors that prevent topic name typos and type mismatches across your codebase.
Defining Topics
use horus::prelude::*;
// Define topics with compile-time type checking
topics! {
pub CMD_VEL: CmdVel = "cmd_vel",
pub SENSOR_DATA: SensorReading = "sensor.data",
pub MOTOR_STATUS: MotorState = "motor.status",
}
Using Descriptors
// Publisher — type is enforced at compile time
let pub_topic: Topic<CmdVel> = Topic::new(CMD_VEL.name())?;
pub_topic.send(CmdVel::new(1.0, 0.5));
// Subscriber — same type enforced
let sub_topic: Topic<CmdVel> = Topic::new(CMD_VEL.name())?;
if let Some(cmd) = sub_topic.recv() {
// cmd is guaranteed to be CmdVel
}
This prevents common errors like:
- Typos in topic names (caught at compile time)
- Type mismatches between publishers and subscribers
- Inconsistent topic names across modules
Communication Patterns
One-to-One
Single publisher, single subscriber (~18ns same-process, ~85ns cross-process).
struct PubNode {
data_pub: Topic<f32>,
}
impl Node for PubNode {
fn name(&self) -> &str { "PubNode" }
fn tick(&mut self) {
self.data_pub.send(42.0);
}
}
struct SubNode {
data_sub: Topic<f32>,
}
impl Node for SubNode {
fn name(&self) -> &str { "SubNode" }
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
println!("Got: {}", data);
}
}
}
One-to-Many (Broadcast)
Single publisher, multiple subscribers (~24ns same-process, ~70ns cross-process).
// One publisher
struct Broadcaster {
broadcast_pub: Topic<String>,
}
// Multiple subscribers — all receive the same messages
struct Listener1 { broadcast_sub: Topic<String> }
struct Listener2 { broadcast_sub: Topic<String> }
struct Listener3 { broadcast_sub: Topic<String> }
Many-to-One (Aggregation)
Multiple publishers, single subscriber (~26ns same-process, ~65ns cross-process).
// Multiple publishers
struct Sensor1 { reading_pub: Topic<f32> }
struct Sensor2 { reading_pub: Topic<f32> }
// Single aggregator
struct Aggregator {
reading_sub: Topic<f32>,
}
impl Node for Aggregator {
fn name(&self) -> &str { "Aggregator" }
fn tick(&mut self) {
if let Some(reading) = self.reading_sub.recv() {
self.process(reading);
}
}
}
Many-to-Many
Multiple publishers and subscribers (~36ns same-process, ~167ns cross-process).
struct Agent1 {
state_pub: Topic<RobotState>,
state_sub: Topic<RobotState>,
}
struct Agent2 {
state_pub: Topic<RobotState>,
state_sub: Topic<RobotState>,
}
Topic Naming
Use Dots, Not Slashes
Important: HORUS uses dots (
.) for topic hierarchy, not slashes (/).
// CORRECT - Use dots
let topic = Topic::new("sensors.lidar");
let topic = Topic::new("robot.cmd_vel");
// WRONG - Do NOT use slashes
let topic = Topic::new("sensors/lidar"); // Will cause errors!
let topic = Topic::new("robot/cmd_vel"); // Will cause errors!
Why dots instead of slashes?
Slashes in topic names cause errors because they conflict with the underlying storage format. Always use dots.
Coming from ROS?
| Framework | Topic Separator | Example |
|---|---|---|
| ROS/ROS2 | / | /sensor/lidar |
| HORUS | . | sensor.lidar |
Best Practices
// Descriptive names
let topic = Topic::new("cmd_vel"); // Good
let topic = Topic::new("data"); // Too vague
// Hierarchical naming
let topic = Topic::new("sensor.lidar"); // Hierarchical
let topic = Topic::new("robot1.cmd_vel"); // Namespaced
let topic = Topic::new("diagnostics.cpu"); // Categorized
Reserved Topic Names
Avoid using these patterns:
- Topics starting with
_(internal use) - Topics containing
/(causes file path errors) - Topics containing
/dev/(conflicts with paths) - Topics with special characters:
!@#$%^&*()
Error Handling
Send Behavior
send() is infallible — it uses ring buffer "keep last" semantics. When the buffer is full, the oldest message is overwritten:
// send() always succeeds — no error handling needed
topic.send(data);
For explicit error handling, use try_send():
match topic.try_send(data) {
Ok(()) => { /* success */ }
Err(msg) => { /* buffer full — msg returned to caller */ }
}
Receive Behavior
recv() returns None when no message is available — this is normal, not an error:
match topic.recv() {
Some(data) => {
// Process data
}
None => {
// No data available - this is normal
}
}
Message Types
What Types Can I Use?
Most types work with Topic — just add standard derives:
use serde::{Serialize, Deserialize};
// Primitive types work out of the box
let topic: Topic<f32> = Topic::new("float_topic")?;
let topic: Topic<bool> = Topic::new("bool_topic")?;
// Custom structs — just add derives
#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
x: f32,
y: f32,
name: String,
}
let topic: Topic<MyMessage> = Topic::new("my_topic")?;
// Collections work too
#[derive(Clone, Serialize, Deserialize)]
struct SensorBatch {
readings: Vec<f32>,
timestamp: u64,
}
let topic: Topic<SensorBatch> = Topic::new("batch_topic")?;
Required Traits
Your message types need these traits (all auto-derived):
| Trait | Why | How to Get It |
|---|---|---|
Clone | Messages may be copied between backends | #[derive(Clone)] |
Serialize | For serialized shared memory path | #[derive(Serialize)] |
Deserialize | For serialized shared memory path | #[derive(Deserialize)] |
Additionally, types must be Send + Sync + 'static — satisfied automatically by most types.
use serde::{Serialize, Deserialize};
#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
// your fields
}
Advanced Usage
Conditional Publishing
Only publish when certain conditions are met:
impl Node for ConditionalPublisher {
fn tick(&mut self) {
let data = self.read_sensor();
if data > self.threshold {
self.alert_pub.send(data);
}
}
}
Message Buffering
Cache the last received message using read_latest() or manual buffering:
struct BufferedSubscriber {
data_sub: Topic<f32>,
last_value: Option<f32>,
}
impl Node for BufferedSubscriber {
fn tick(&mut self) {
if let Some(value) = self.data_sub.recv() {
self.last_value = Some(value);
}
if let Some(value) = self.last_value {
self.process(value);
}
}
}
For Copy types, use read_latest() instead — it reads the latest message without consuming it:
// Requires T: Copy — reads latest without advancing consumer position
if let Some(transform) = self.tf_sub.read_latest() {
self.apply_transform(transform);
}
Rate Limiting
Publish at a specific rate:
struct RateLimitedPublisher {
data_pub: Topic<f32>,
tick_count: u32,
publish_every_n_ticks: u32,
}
impl Node for RateLimitedPublisher {
fn tick(&mut self) {
self.tick_count += 1;
if self.tick_count % self.publish_every_n_ticks == 0 {
self.data_pub.send(42.0);
}
}
}
Message Filtering
Filter messages before processing:
impl Node for FilteringSubscriber {
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
if data.is_valid() && data.quality > 0.8 {
self.process(data);
}
}
}
}
Memory and Capacity
Memory Usage Per Topic
By default, Topic::new() auto-sizes capacity based on message type size:
| Message Type | Auto Capacity | Approximate Memory |
|---|---|---|
f32 (4 bytes) | 1024 slots | ~73 KB |
CmdVel (16 bytes) | 256 slots | ~19 KB |
Pose2D (32 bytes) | 128 slots | ~10 KB |
Twist (56 bytes) | 73 slots | ~6 KB |
Imu (~300 bytes) | 16 slots | ~129 KB |
For most robotics applications, memory usage per topic is under 1 MB.
Cleaning Up
Shared memory files persist after processes exit (by design — allows new processes to join existing topics). Clean up between sessions:
# Clean shared memory only (recommended)
horus clean --shm
# Preview what would be cleaned
horus clean --shm --dry-run
# Clean everything (shared memory + build cache)
horus clean --all
HORUS also performs automatic stale-topic cleanup — files with no active process are removed automatically.
Troubleshooting
"No space left on device" — Shared memory is full:
horus clean --shm # Clean up
Type mismatch — Ensure publisher and subscriber use the exact same type for a topic name:
// Both sides MUST use the same type
let pub_topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
let sub_topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
Performance
Latency by Topology (16B message)
| Scenario | Latency |
|---|---|
| Same thread | ~3ns |
| Same process, 1:1 | ~18ns |
| Same process, many-to-many | ~36ns |
| Cross-process | ~50-167ns |
Latency by Message Size (cross-process)
| Message Type | Size | Latency |
|---|---|---|
| CmdVel | 16B | ~167ns |
| IMU | 304B | ~940ns |
| LaserScan | 1.5KB | ~2.2µs |
| PointCloud | 120KB | ~360µs |
Latency scales linearly with message size.
Throughput
- Millions of messages per second for small messages
- Gigabytes per second for large messages
- Deterministic latency regardless of system load
Best Practices
Use simple fixed-size types when possible — they get the fastest path automatically:
Topic::<[f32; 3]>::new("position")?; // Fixed-size: fastest path
Topic::<Vec<f32>>::new("position")?; // Dynamic: still fast, but uses serialization
Keep messages small — latency scales linearly with size.
Check recv() every tick — don't skip ticks:
fn tick(&mut self) {
if let Some(msg) = self.sub.recv() {
self.process(msg);
}
}
Use topics! macro for shared topic definitions across modules:
topics! {
pub CMD_VEL: CmdVel = "cmd_vel",
pub ODOM: Odometry = "odom",
}
send() is infallible — no error handling needed:
self.pub_topic.send(data);
Next Steps
- Learn about the Scheduler for orchestrating nodes
- Explore Message Types for standard robotics messages
- Read the Topic API Reference for complete method documentation