Topic API

Topics are how nodes communicate in HORUS. Topic<T> provides typed publish/subscribe channels backed by shared memory for zero-copy IPC. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.

Python: Available via horus.Topic("name", Type). See Python Bindings.

New to topics? Start with Topics: How Nodes Talk for a 5-minute introduction.

// simplified
use horus::prelude::*;

Quick Reference — Constructors

ConstructorReturnsDescription
Topic::<T>::new(name)HorusResult<Topic<T>>Creates with auto-computed capacity (16–1024 based on message size) and auto-sized slots
Topic::<T>::with_capacity(name, capacity, slot_size)HorusResult<Topic<T>>Creates with explicit buffer configuration

Quick Reference — Sending

MethodReturnsDescription
send(msg)()Publishes a message. Non-blocking. Drops oldest if full.
try_send(msg)Result<(), T>Publishes if space available. Returns message back on failure.
send_blocking(msg, timeout)Result<(), SendBlockingError>Blocks until space available or timeout.

Quick Reference — Receiving

MethodReturnsDescription
recv()Option<T>Takes the next unread message (FIFO). Consumes it.
try_recv()Option<T>Same as recv(). Provided for API symmetry with try_send().
read_latest()Option<T>Reads the newest message, skipping older ones. Requires T: Copy.

Quick Reference — State & Metrics

MethodReturnsDescription
name()&strTopic name
has_message()boolWhether at least one unread message exists
pending_count()u64Number of unread messages in the buffer
dropped_count()u64Total messages dropped (buffer-full overwrites)
pub_count()u32Number of active publishers
sub_count()u32Number of active subscribers
is_same_process()boolWhether all pub/sub are in the same process (#[cfg(test)] only)
is_same_thread()boolWhether all pub/sub are on the same thread (#[cfg(test)] only)
metrics()TopicMetricsAggregate send/receive statistics

Constructor Methods

new(name)

Creates a topic with default capacity and auto-sized slots.

Signature

// simplified
pub fn new(name: impl Into<String>) -> HorusResult<Self>

Parameters

NameTypeRequiredDescription
nameimpl Into<String>yesTopic identifier. Case-sensitive. Use dot-delimited hierarchical names: "sensors.imu", "cmd_vel". Two Topic instances with the same name and type T connect automatically. Note: Dots are preferred over slashes for macOS compatibility (shm_open does not allow / in names).

Returns

HorusResult<Topic<T>> — connected to the named shared memory channel.

Errors

ErrorCondition
HorusErrorSHM region creation failed (permissions, disk full)

Behavior

  • Default capacity: auto-computed (16–1024 slots based on message size; smaller messages get more slots)
  • Default slot size: size_of::<T>() rounded up to page alignment
  • Backend auto-selection: if publisher and subscriber are in the same process, uses an in-process ring buffer (no SHM overhead). Cross-process uses SHM.
  • Creating two Topic<T> with the same name connects them — no broker, no configuration

Example

// simplified
use horus::prelude::*;

let pub_topic = Topic::<CmdVel>::new("cmd_vel")?;
let sub_topic = Topic::<CmdVel>::new("cmd_vel")?; // connects to same channel

pub_topic.send(CmdVel::new(1.0, 0.0));
let msg = sub_topic.recv(); // Some(CmdVel { linear: 1.0, angular: 0.0 })

with_capacity(name, capacity, slot_size)

Creates a topic with explicit buffer configuration.

Signature

// simplified
pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> HorusResult<Self>

Parameters

NameTypeRequiredDescription
name&stryesTopic identifier (same rules as new).
capacityu32yesNumber of slots in the ring buffer. Must be a power of 2. Typical: 4, 8, 16.
slot_sizeOption<usize>noBytes per slot. None = auto-sized from size_of::<T>(). Set explicitly for variable-size messages.

Returns

HorusResult<Topic<T>>

When to use

  • Large messages that need bigger slots (e.g., LaserScan with 360+ ranges)
  • High-frequency topics where you need more buffering to prevent drops
  • Small messages where you want to reduce memory footprint

When NOT to use

  • Most cases — Topic::new() auto-sizes correctly
  • Pool-backed types (Image, PointCloud, DepthImage, Tensor) — they manage their own memory

Example

// simplified
use horus::prelude::*;

// 8 slots, 4KB each — for large LiDAR scans
let scan = Topic::<LaserScan>::with_capacity("lidar.scan", 8, Some(4096))?;

// 16 slots, auto-sized — high-frequency IMU with extra buffering
let imu = Topic::<Imu>::with_capacity("imu", 16, None)?;

Sending Methods

send(msg)

Publishes a message. Non-blocking. Overwrites the oldest unread message if the buffer is full.

Signature

// simplified
pub fn send(&self, msg: T)

Parameters

NameTypeRequiredDescription
msgTyesMessage to publish. Moved into the ring buffer.

Returns

Nothing (()). Always succeeds — never blocks, never returns an error.

Behavior

  • If all slots are occupied, the oldest unread message is silently overwritten
  • Use dropped_count() to detect overwrites
  • Notifies any event-driven nodes (.on("topic")) that new data arrived
  • Fast path: ~3ns for same-thread publisher+subscriber (inlined ring write, no pointer chase)
  • Cross-process: serializes via SHM, ~150ns typical

When to use

  • Default for real-time robotics — you always want the latest data
  • Control loops where blocking is unacceptable
  • Fire-and-forget telemetry

When NOT to use

  • When you need to know if the message was received — use try_send() instead
  • When dropping messages is unacceptable — use send_blocking() instead

Example

// simplified
use horus::prelude::*;

let topic = Topic::<CmdVel>::new("cmd_vel")?;

// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel::new(1.0, 0.0));

// Check if messages were dropped
if topic.dropped_count() > 0 {
    hlog!(warn, "{} messages dropped — subscriber too slow", topic.dropped_count());
}

try_send(msg)

Attempts to publish without overwriting. Returns the message back if the buffer is full.

Signature

// simplified
pub fn try_send(&self, msg: T) -> Result<(), T>

Parameters

NameTypeRequiredDescription
msgTyesMessage to publish. Returned on failure.

Returns

  • Ok(()) — message published successfully
  • Err(msg) — buffer full, message returned to caller (not consumed)

Behavior

  • Non-blocking. Checks buffer space and either publishes or returns immediately.
  • The returned message can be retried or discarded — your choice.

When to use

  • When you need to detect drops — implement backpressure, count losses, or log warnings
  • When the message is expensive to create and you don't want to waste it

When NOT to use

  • Normal control loops — use send() (simpler, always succeeds)

Example

// simplified
use horus::prelude::*;

let topic = Topic::<CmdVel>::new("cmd_vel")?;

match topic.try_send(CmdVel::new(1.0, 0.0)) {
    Ok(()) => { /* sent */ }
    Err(_returned) => {
        hlog!(warn, "Buffer full — message not sent");
    }
}

send_blocking(msg, timeout)

Blocks until buffer space is available or the timeout elapses.

Signature

// simplified
pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>

Parameters

NameTypeRequiredDescription
msgTyesMessage to publish. Consumed on success.
timeoutDurationyesMaximum time to wait for space. Create with .ms(): 10_u64.ms().

Returns

  • Ok(()) — message published
  • Err(SendBlockingError::Timeout) — buffer stayed full for the entire timeout

Behavior

Uses a graduated wait strategy for low latency:

  1. Immediate try_send (0 latency)
  2. Spin loop — 256 iterations (~sub-microsecond)
  3. Yield — 8 thread yields (~microseconds)
  4. Sleep — 100us increments until deadline

When to use

  • When dropping messages is unacceptable and brief blocking is acceptable
  • Logging pipelines, recording, non-RT data transfer

When NOT to use

  • Real-time control loops — blocking in tick() causes deadline misses
  • Any node with .rate() or .budget() — use send() instead

Example

// simplified
use horus::prelude::*;

let topic = Topic::<DiagnosticReport>::new("diagnostics")?;

match topic.send_blocking(report, 10_u64.ms()) {
    Ok(()) => { /* sent */ }
    Err(SendBlockingError::Timeout) => {
        hlog!(warn, "Diagnostic buffer full for 10ms — dropping report");
    }
}

Receiving Methods

recv()

Takes the next unread message in FIFO order. Consumes it from the buffer.

Signature

// simplified
pub fn recv(&self) -> Option<T>

Parameters

None.

Returns

  • Some(T) — the oldest unread message (removed from buffer)
  • None — no unread messages available

Behavior

  • Non-blocking. Returns immediately.
  • Each message is delivered to each subscriber exactly once. After recv() returns it, the message is consumed.
  • Fast path: ~3ns for same-thread (inlined ring read)

When to use

  • Command streams where every message matters (velocity commands, goal updates)
  • Processing pipelines where order matters (image frames, sensor sequences)

When NOT to use

  • State-like data where you only care about the latest value — use read_latest() instead

Example

// simplified
use horus::prelude::*;

let topic = Topic::<Imu>::new("imu")?;

// IMPORTANT: drain every tick to avoid stale data accumulation
while let Some(msg) = topic.recv() {
    process(msg);
}

try_recv()

Non-blocking receive. Functionally identical to recv().

Signature

// simplified
pub fn try_recv(&self) -> Option<T>

Returns

Same as recv(). Provided for API symmetry with try_send().


read_latest()

Reads the most recent message, skipping all older ones.

Signature

// simplified
pub fn read_latest(&self) -> Option<T>
where
    T: Copy

Parameters

None.

Returns

  • Some(T) — copy of the newest message
  • None — no messages available

Behavior

  • Reads without consuming — multiple calls return the same value until a new message arrives
  • Skips all older messages — only the latest matters
  • Requires T: Copy because it reads (copies) without removing from the buffer

When to use

  • State-like data where you only care about the current value: poses, sensor readings, configuration
  • Slow subscribers that can't keep up with a fast publisher

When NOT to use

  • Command streams where every message matters — use recv() instead
  • Messages that aren't Copy (e.g., types with Vec, String)

Example

// simplified
use horus::prelude::*;

let odom = Topic::<Odometry>::new("odom")?;

// Good: pose is state — only the latest matters
if let Some(pose) = odom.read_latest() {
    current_position = pose;
}

State & Metrics Methods

name()

Returns the topic name.

Signature

// simplified
pub fn name(&self) -> &str

Returns

&str — the name passed to new() or with_capacity().


has_message()

Checks whether at least one unread message exists.

Signature

// simplified
pub fn has_message(&self) -> bool

Returns

true if recv() would return Some. false if the buffer is empty.


pending_count()

Returns the number of unread messages in the buffer.

Signature

// simplified
pub fn pending_count(&self) -> u64

Returns

u64 — count of messages waiting to be consumed by recv().


dropped_count()

Returns the total number of messages dropped due to buffer-full overwrites.

Signature

// simplified
pub fn dropped_count(&self) -> u64

Returns

u64 — cumulative count since topic creation. Incremented by send() when it overwrites the oldest slot.

When to use

  • Monitor for backpressure — a rising dropped_count() means the subscriber is too slow
  • Log warnings when drops exceed a threshold

Example

// simplified
use horus::prelude::*;

let topic = Topic::<LaserScan>::new("scan")?;

if topic.dropped_count() > 0 {
    hlog!(warn, "Dropped {} scans — subscriber can't keep up", topic.dropped_count());
}

pub_count()

Returns the number of active publishers on this topic.

Advanced: These methods are #[doc(hidden)] in the Rust API — they may change without notice. For monitoring, prefer horus topic list --verbose.

Signature

// simplified
pub fn pub_count(&self) -> u32

sub_count()

Returns the number of active subscribers on this topic.

Advanced: These methods are #[doc(hidden)] in the Rust API — they may change without notice. For monitoring, prefer horus topic list --verbose.

Signature

// simplified
pub fn sub_count(&self) -> u32

is_same_process()#[cfg(test)] only

Checks whether all publishers and subscribers are in the same process. Available only in test builds.

Signature

// simplified
#[cfg(test)]
pub fn is_same_process(&self) -> bool

Returns

true if using in-process ring buffer (fastest path, ~3ns). false if cross-process SHM.

Note: This method is gated behind #[cfg(test)] and is not available in production builds. It exists for internal testing and assertions only.


is_same_thread()#[cfg(test)] only

Checks whether all publishers and subscribers are on the same thread. Available only in test builds.

Signature

// simplified
#[cfg(test)]
pub fn is_same_thread(&self) -> bool

Returns

true if using the ultra-fast same-thread path (inlined ring operations, no atomics).

Note: This method is gated behind #[cfg(test)] and is not available in production builds. It exists for internal testing and assertions only.


metrics()

Returns aggregate send/receive statistics.

Signature

// simplified
pub fn metrics(&self) -> TopicMetrics

Returns

TopicMetrics — see Types section below.

Example

// simplified
use horus::prelude::*;

let topic = Topic::<CmdVel>::new("cmd_vel")?;

let m = topic.metrics();
println!("Sent: {}, Received: {}, Failures: {}",
    m.messages_sent(), m.messages_received(), m.send_failures());

Pool-Backed Types (Zero-Copy)

For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation. The Topic API works the same — send(), recv(), try_send() — but the data is transferred via shared memory pools instead of copying through the ring buffer.

// simplified
use horus::prelude::*;

let camera = Topic::<Image>::new("camera.rgb")?;

// Send (moves the Image into the pool slot)
camera.send(image);

// IMPORTANT: call recv() every tick to drain — images are large and stale frames waste pool slots
if let Some(img) = camera.recv() {
    println!("{}x{} image received", img.width(), img.height());
}

See Image API, PointCloud API, DepthImage API, Tensor API for type-specific methods.


Types

TopicMetrics

Aggregate statistics returned by topic.metrics().

MethodReturnsDescription
.messages_sent()u64Total messages published on this topic
.messages_received()u64Total messages consumed from this topic
.send_failures()u64Failed send attempts (e.g., try_send on a full buffer)
.recv_failures()u64Failed receive attempts (e.g., recv on empty buffer)

SendBlockingError

Error returned by send_blocking().

VariantDescription
TimeoutThe ring buffer stayed full for the entire timeout duration

Production Example

Multi-sensor fusion node subscribing to two topics and publishing a fused pose:

// simplified
use horus::prelude::*;

struct FusionNode {
    imu_sub: Topic<Imu>,
    odom_sub: Topic<Odometry>,
    pose_pub: Topic<Pose3D>,
    last_imu: Option<Imu>,
    last_odom: Option<Odometry>,
}

impl Node for FusionNode {
    fn name(&self) -> &str { "Fusion" }

    fn tick(&mut self) {
        // IMPORTANT: always drain both topics every tick
        if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
        if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }

        if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
            let fused = self.fuse(imu, odom);
            self.pose_pub.send(fused);
        }
    }
}

See Also