Topic and Pub/Sub

Key Takeaways

After reading this guide, you will understand:

  • How Topic provides ultra-fast pub/sub communication (~3ns to ~167ns latency) with automatic optimization
  • The send() and recv() methods for publishing and subscribing to typed topics
  • Communication patterns (one-to-one, one-to-many, many-to-one, many-to-many) for different architectures
  • When to use Topic for real-time, single-machine communication vs network-based messaging

The Topic is HORUS's ultra-low latency publish-subscribe (pub/sub) communication system. It enables nodes to exchange messages through shared memory IPC with automatic optimization — from ~3ns (same-thread) to ~167ns (cross-process, many-to-many) depending on topology.

What is a Topic?

A Topic<T> is a typed communication channel that connects publishers and subscribers. The system automatically detects the optimal backend based on:

  • Where participants are (same thread, same process, or different processes)
  • How many publishers and subscribers exist

You just call send() and recv() — HORUS handles the rest.

Key Features

Automatic Optimization: The fastest communication path is auto-detected at runtime

Zero-Copy Communication: Simple fixed-size types are shared directly in memory without serialization

Type Safety: Compile-time guarantees for message types

Live Migration: Communication paths upgrade/downgrade transparently as participants join or leave

Automatic Optimization

HORUS auto-selects the fastest communication path based on your topology. You never need to configure this — it happens automatically when participants call send() or recv().

ScenarioLatency
Same thread~3ns
Same process, 1:1~18ns
Same process, many-to-many~36ns
Cross-process~50-167ns

Note: Latencies are for small messages (16B). Larger messages scale linearly with size.

If topology changes (e.g., a second subscriber joins), HORUS automatically migrates to the optimal path without dropping messages.

// All of these use the same API — optimization is automatic
let topic: Topic<f32> = Topic::new("velocity")?;
topic.send(1.5);
let msg = topic.recv();

Tip: Simple fixed-size structs (no String, Vec, Box) automatically get a faster zero-copy path. Both types work with the same API — HORUS picks the fastest path for you.

Basic Usage

Creating a Topic

use horus::prelude::*;

// Create a Topic for f32 values on topic "velocity"
let topic: Topic<f32> = Topic::new("velocity")?;

Publishing Messages

use horus::prelude::*;

struct Publisher {
    velocity_pub: Topic<f32>,
}

impl Publisher {
    fn new() -> Result<Self> {
        Ok(Self {
            velocity_pub: Topic::new("velocity")?,
        })
    }
}

impl Node for Publisher {
    fn name(&self) -> &str {
        "Publisher"
    }

    fn tick(&mut self) {
        let velocity = 1.5;

        // Send message — infallible, non-blocking
        self.velocity_pub.send(velocity);
    }
}

Subscribing to Messages

use horus::prelude::*;

struct Subscriber {
    velocity_sub: Topic<f32>,
}

impl Subscriber {
    fn new() -> Result<Self> {
        Ok(Self {
            velocity_sub: Topic::new("velocity")?,
        })
    }
}

impl Node for Subscriber {
    fn name(&self) -> &str {
        "Subscriber"
    }

    fn tick(&mut self) {
        if let Some(velocity) = self.velocity_sub.recv() {
            println!("Received velocity: {}", velocity);
        }
    }
}

Transport

Topics use local shared memory for ultra-fast communication between processes on the same machine.

// Automatically uses local shared memory
let topic: Topic<SensorData> = Topic::new("sensors")?;

Performance: ~3ns to ~167ns depending on topology Use case: All nodes on same machine Pros: Ultra-fast, deterministic, zero-copy

For multi-machine communication, HORUS supports zero-configuration peer discovery via mDNS (requires the mdns feature flag).

API Reference

Note: HORUS separates introspection from the hot path. The send() and recv() methods have zero logging overhead by default.

send()

pub fn send(&self, msg: T)

Publishes a message to the topic. Infallible — the oldest unread message is overwritten when the buffer is full.

// send() always succeeds — no error handling needed
topic.send(data);

recv()

pub fn recv(&self) -> Option<T>

Receives the next message from the topic. Returns None if no message is available. Non-blocking.

if let Some(data) = topic.recv() {
    // Process the received message
}

try_send()

pub fn try_send(&self, msg: T) -> Result<(), T>

Attempts to send a message, returning it on failure. Unlike send() which silently handles errors, try_send() gives you explicit control over failure cases.

match topic.try_send(data) {
    Ok(()) => { /* sent successfully */ }
    Err(returned_data) => { /* buffer full, message returned */ }
}

try_recv()

pub fn try_recv(&self) -> Option<T>

Attempts to receive a message. Functionally identical to recv() for most use cases — both are non-blocking and return Option<T>.

send_blocking()

pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>

Sends a message, blocking until buffer space is available or the timeout expires. Unlike send() which overwrites the oldest message when full, send_blocking() guarantees delivery or returns an explicit error.

Use this for critical command topics (emergency stop, motor setpoints) where message loss is unacceptable.

use std::time::Duration;

// Block up to 1ms waiting for buffer space
match topic.send_blocking(emergency_stop_cmd, Duration::from_millis(1)) {
    Ok(()) => { /* delivered */ }
    Err(_) => { hlog!(error, "Failed to deliver emergency stop!"); }
}

read_latest()

pub fn read_latest(&self) -> Option<T>
where
    T: Copy,

Returns the most recent message without advancing the consumer position. Calling it multiple times returns the same message until a new one is published. Useful for reading infrequently-updated data like static transforms or configuration.

T: Copy requirement: read_latest() requires T: Copy to guarantee safe concurrent reads. Types with heap allocations (String, Vec, etc.) should use recv() instead.

// Read latest transform (doesn't consume it)
if let Some(transform) = tf_topic.read_latest() {
    self.apply_transform(transform);
}

has_message()

pub fn has_message(&self) -> bool

Checks if at least one message is available without consuming it.

pending_count()

pub fn pending_count(&self) -> u64

Returns the number of messages waiting to be consumed.

Introspection Methods

Topic provides several methods for monitoring and debugging at runtime:

// Topic name
let name = topic.name();  // "velocity"

// Check message availability
if topic.has_message() {
    println!("{} messages pending", topic.pending_count());
}

// Message loss tracking
let dropped = topic.dropped_count();
if dropped > 0 {
    hlog!(warn, "Dropped {} messages on '{}'", dropped, topic.name());
}

// Connection monitoring
println!("Publishers: {}, Subscribers: {}", topic.pub_count(), topic.sub_count());

// Backend info (auto-selected by HORUS)
println!("Using backend: {}", topic.backend_name());

// Detailed metrics
let metrics = topic.metrics();
println!("Sent: {}, Received: {}", metrics.messages_sent(), metrics.messages_received());
println!("Send failures: {}, Recv failures: {}", metrics.send_failures(), metrics.recv_failures());
MethodReturnsDescription
name()&strTopic name
metrics()TopicMetricsMessage counts and failure stats
dropped_count()u64Messages dropped (buffer overflows)
has_message()boolAt least one message available
pending_count()u64Messages waiting to be consumed
backend_name()&'static strAuto-selected IPC backend name
pub_count()u32Number of active publishers
sub_count()u32Number of active subscribers

TopicMetrics provides aggregated statistics:

MethodReturnsDescription
messages_sent()u64Total messages published
messages_received()u64Total messages consumed
send_failures()u64Failed send attempts
recv_failures()u64Failed receive attempts

Runtime Debug Logging

Debug logging is toggled at runtime from the TUI monitor — no code changes or recompilation needed. Select a topic in the Topics tab and press Enter to start logging; press Esc to stop.

When debug logging is active, every send() and recv() records timing and message summaries to the log buffer. If the message type implements LogSummary, rich summaries are logged; otherwise, metadata-only entries (topic name, direction, IPC latency) are produced.

horus monitor --tui    # Open TUI, navigate to Topics tab, press Enter on a topic

Debug logging adds no overhead when disabled — introspection is fully separated from the hot path.

with_capacity()

pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> Result<Self>

Creates a topic with a custom ring buffer capacity. By default, HORUS auto-sizes capacity based on the message type size. Use this when you need more buffering (e.g., bursty producers).

// 256-slot ring buffer (rounded up to next power of 2)
let topic: Topic<f32> = Topic::with_capacity("velocity", 256, None)?;

Type-Safe Topic Descriptors

The topics! macro defines compile-time topic descriptors that prevent topic name typos and type mismatches across your codebase.

Defining Topics

use horus::prelude::*;

// Define topics with compile-time type checking
topics! {
    pub CMD_VEL: CmdVel = "cmd_vel",
    pub SENSOR_DATA: SensorReading = "sensor.data",
    pub MOTOR_STATUS: MotorState = "motor.status",
}

Using Descriptors

// Publisher — type is enforced at compile time
let pub_topic: Topic<CmdVel> = Topic::new(CMD_VEL.name())?;
pub_topic.send(CmdVel::new(1.0, 0.5));

// Subscriber — same type enforced
let sub_topic: Topic<CmdVel> = Topic::new(CMD_VEL.name())?;
if let Some(cmd) = sub_topic.recv() {
    // cmd is guaranteed to be CmdVel
}

This prevents common errors like:

  • Typos in topic names (caught at compile time)
  • Type mismatches between publishers and subscribers
  • Inconsistent topic names across modules

Communication Patterns

One-to-One

Single publisher, single subscriber (~18ns same-process, ~85ns cross-process).

struct PubNode {
    data_pub: Topic<f32>,
}

impl Node for PubNode {
    fn name(&self) -> &str { "PubNode" }

    fn tick(&mut self) {
        self.data_pub.send(42.0);
    }
}

struct SubNode {
    data_sub: Topic<f32>,
}

impl Node for SubNode {
    fn name(&self) -> &str { "SubNode" }

    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            println!("Got: {}", data);
        }
    }
}

One-to-Many (Broadcast)

Single publisher, multiple subscribers (~24ns same-process, ~70ns cross-process).

// One publisher
struct Broadcaster {
    broadcast_pub: Topic<String>,
}

// Multiple subscribers — all receive the same messages
struct Listener1 { broadcast_sub: Topic<String> }
struct Listener2 { broadcast_sub: Topic<String> }
struct Listener3 { broadcast_sub: Topic<String> }

Many-to-One (Aggregation)

Multiple publishers, single subscriber (~26ns same-process, ~65ns cross-process).

// Multiple publishers
struct Sensor1 { reading_pub: Topic<f32> }
struct Sensor2 { reading_pub: Topic<f32> }

// Single aggregator
struct Aggregator {
    reading_sub: Topic<f32>,
}

impl Node for Aggregator {
    fn name(&self) -> &str { "Aggregator" }

    fn tick(&mut self) {
        if let Some(reading) = self.reading_sub.recv() {
            self.process(reading);
        }
    }
}

Many-to-Many

Multiple publishers and subscribers (~36ns same-process, ~167ns cross-process).

struct Agent1 {
    state_pub: Topic<RobotState>,
    state_sub: Topic<RobotState>,
}

struct Agent2 {
    state_pub: Topic<RobotState>,
    state_sub: Topic<RobotState>,
}

Topic Naming

Use Dots, Not Slashes

Important: HORUS uses dots (.) for topic hierarchy, not slashes (/).

// CORRECT - Use dots
let topic = Topic::new("sensors.lidar");
let topic = Topic::new("robot.cmd_vel");

// WRONG - Do NOT use slashes
let topic = Topic::new("sensors/lidar");   // Will cause errors!
let topic = Topic::new("robot/cmd_vel");   // Will cause errors!

Why dots instead of slashes?

Slashes in topic names cause errors because they conflict with the underlying storage format. Always use dots.

Coming from ROS?

FrameworkTopic SeparatorExample
ROS/ROS2//sensor/lidar
HORUS.sensor.lidar

Best Practices

// Descriptive names
let topic = Topic::new("cmd_vel");           // Good
let topic = Topic::new("data");              // Too vague

// Hierarchical naming
let topic = Topic::new("sensor.lidar");      // Hierarchical
let topic = Topic::new("robot1.cmd_vel");    // Namespaced
let topic = Topic::new("diagnostics.cpu");   // Categorized

Reserved Topic Names

Avoid using these patterns:

  • Topics starting with _ (internal use)
  • Topics containing / (causes file path errors)
  • Topics containing /dev/ (conflicts with paths)
  • Topics with special characters: !@#$%^&*()

Error Handling

Send Behavior

send() is infallible — it uses ring buffer "keep last" semantics. When the buffer is full, the oldest message is overwritten:

// send() always succeeds — no error handling needed
topic.send(data);

For explicit error handling, use try_send():

match topic.try_send(data) {
    Ok(()) => { /* success */ }
    Err(msg) => { /* buffer full — msg returned to caller */ }
}

Receive Behavior

recv() returns None when no message is available — this is normal, not an error:

match topic.recv() {
    Some(data) => {
        // Process data
    }
    None => {
        // No data available - this is normal
    }
}

Message Types

What Types Can I Use?

Most types work with Topic — just add standard derives:

use serde::{Serialize, Deserialize};

// Primitive types work out of the box
let topic: Topic<f32> = Topic::new("float_topic")?;
let topic: Topic<bool> = Topic::new("bool_topic")?;

// Custom structs — just add derives
#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
    x: f32,
    y: f32,
    name: String,
}

let topic: Topic<MyMessage> = Topic::new("my_topic")?;

// Collections work too
#[derive(Clone, Serialize, Deserialize)]
struct SensorBatch {
    readings: Vec<f32>,
    timestamp: u64,
}

let topic: Topic<SensorBatch> = Topic::new("batch_topic")?;

Required Traits

Your message types need these traits (all auto-derived):

TraitWhyHow to Get It
CloneMessages may be copied between backends#[derive(Clone)]
SerializeFor serialized shared memory path#[derive(Serialize)]
DeserializeFor serialized shared memory path#[derive(Deserialize)]

Additionally, types must be Send + Sync + 'static — satisfied automatically by most types.

use serde::{Serialize, Deserialize};

#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
    // your fields
}

Advanced Usage

Conditional Publishing

Only publish when certain conditions are met:

impl Node for ConditionalPublisher {
    fn tick(&mut self) {
        let data = self.read_sensor();

        if data > self.threshold {
            self.alert_pub.send(data);
        }
    }
}

Message Buffering

Cache the last received message using read_latest() or manual buffering:

struct BufferedSubscriber {
    data_sub: Topic<f32>,
    last_value: Option<f32>,
}

impl Node for BufferedSubscriber {
    fn tick(&mut self) {
        if let Some(value) = self.data_sub.recv() {
            self.last_value = Some(value);
        }

        if let Some(value) = self.last_value {
            self.process(value);
        }
    }
}

For Copy types, use read_latest() instead — it reads the latest message without consuming it:

// Requires T: Copy — reads latest without advancing consumer position
if let Some(transform) = self.tf_sub.read_latest() {
    self.apply_transform(transform);
}

Rate Limiting

Publish at a specific rate:

struct RateLimitedPublisher {
    data_pub: Topic<f32>,
    tick_count: u32,
    publish_every_n_ticks: u32,
}

impl Node for RateLimitedPublisher {
    fn tick(&mut self) {
        self.tick_count += 1;

        if self.tick_count % self.publish_every_n_ticks == 0 {
            self.data_pub.send(42.0);
        }
    }
}

Message Filtering

Filter messages before processing:

impl Node for FilteringSubscriber {
    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            if data.is_valid() && data.quality > 0.8 {
                self.process(data);
            }
        }
    }
}

Memory and Capacity

Memory Usage Per Topic

By default, Topic::new() auto-sizes capacity based on message type size:

Message TypeAuto CapacityApproximate Memory
f32 (4 bytes)1024 slots~73 KB
CmdVel (16 bytes)256 slots~19 KB
Pose2D (32 bytes)128 slots~10 KB
Twist (56 bytes)73 slots~6 KB
Imu (~300 bytes)16 slots~129 KB

For most robotics applications, memory usage per topic is under 1 MB.

Cleaning Up

Shared memory files persist after processes exit (by design — allows new processes to join existing topics). Clean up between sessions:

# Clean shared memory only (recommended)
horus clean --shm

# Preview what would be cleaned
horus clean --shm --dry-run

# Clean everything (shared memory + build cache)
horus clean --all

HORUS also performs automatic stale-topic cleanup — files with no active process are removed automatically.

Troubleshooting

"No space left on device" — Shared memory is full:

horus clean --shm            # Clean up

Type mismatch — Ensure publisher and subscriber use the exact same type for a topic name:

// Both sides MUST use the same type
let pub_topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
let sub_topic: Topic<CmdVel> = Topic::new("cmd_vel")?;

Performance

Latency by Topology (16B message)

ScenarioLatency
Same thread~3ns
Same process, 1:1~18ns
Same process, many-to-many~36ns
Cross-process~50-167ns

Latency by Message Size (cross-process)

Message TypeSizeLatency
CmdVel16B~167ns
IMU304B~940ns
LaserScan1.5KB~2.2µs
PointCloud120KB~360µs

Latency scales linearly with message size.

Throughput

  • Millions of messages per second for small messages
  • Gigabytes per second for large messages
  • Deterministic latency regardless of system load

Best Practices

Use simple fixed-size types when possible — they get the fastest path automatically:

Topic::<[f32; 3]>::new("position")?;   // Fixed-size: fastest path
Topic::<Vec<f32>>::new("position")?;   // Dynamic: still fast, but uses serialization

Keep messages small — latency scales linearly with size.

Check recv() every tick — don't skip ticks:

fn tick(&mut self) {
    if let Some(msg) = self.sub.recv() {
        self.process(msg);
    }
}

Use topics! macro for shared topic definitions across modules:

topics! {
    pub CMD_VEL: CmdVel = "cmd_vel",
    pub ODOM: Odometry = "odom",
}

send() is infallible — no error handling needed:

self.pub_topic.send(data);

Next Steps