Topic API

Topic<T> is the primary communication primitive in HORUS. It provides typed publish/subscribe channels backed by shared memory for zero-copy IPC within a single machine.

Creating a Topic

use horus::prelude::*;

// Default capacity and slot size
let topic = Topic::<Twist>::new("cmd_vel");

// Custom capacity (number of slots) and slot size (bytes per slot)
let topic = Topic::<LaserScan>::with_capacity("scan", 8, 4096);
ConstructorReturnsDescription
Topic::<T>::new(name)Topic<T>Create with default capacity and slot size
Topic::<T>::with_capacity(name, capacity, slot_size)Topic<T>Create with explicit capacity and slot size

The name string identifies the shared-memory channel. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.


Sending Messages

MethodReturnsDescription
send(msg)()Publish a message (non-blocking, drops oldest if full)
try_send(msg)Result<(), T>Publish if space available; returns message back on failure
send_blocking(msg, timeout)Result<(), SendBlockingError>Block until space is available or timeout elapses

SendBlockingError

VariantDescription
TimeoutTimed out waiting for space in the ring buffer

Example

let topic = Topic::<CmdVel>::new("cmd_vel");

// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel { linear: 1.0, angular: 0.0 });

// Try without blocking
match topic.try_send(CmdVel { linear: 0.5, angular: 0.1 }) {
    Ok(()) => { /* sent */ }
    Err(msg) => { /* buffer full, msg returned */ }
}

// Block up to 10ms
use horus::prelude::*;
match topic.send_blocking(CmdVel { linear: 1.0, angular: 0.0 }, 10_u64.ms()) {
    Ok(()) => { /* sent */ }
    Err(SendBlockingError::Timeout) => { /* timed out */ }
}

Receiving Messages

MethodReturnsDescription
recv()Option<T>Take the next unread message (FIFO order)
read_latest()Option<T>Read the most recent message, skipping older ones (requires T: Copy)

recv() returns messages in order. Each message is delivered to each subscriber exactly once. read_latest() is useful for state-like data (sensor readings, poses) where you only care about the newest value.

Example

let topic = Topic::<Imu>::new("imu");

// Process all pending messages
while let Some(msg) = topic.recv() {
    println!("Accel: ({}, {}, {})", msg.linear_accel.x, msg.linear_accel.y, msg.linear_accel.z);
}

// Or just get the latest reading
if let Some(latest) = topic.read_latest() {
    println!("Latest orientation: {:?}", latest.orientation);
}

State & Metrics

MethodReturnsDescription
name()&strThe topic name
has_message()boolWhether there is at least one unread message
pending_count()usizeNumber of unread messages in the buffer
dropped_count()u64Total messages dropped due to full buffer
metrics()TopicMetricsAggregate send/receive statistics

TopicMetrics

MethodReturnsDescription
messages_sent()u64Total messages published on this topic
messages_received()u64Total messages consumed from this topic
send_failures()u64Failed send attempts (e.g., try_send on a full buffer)
recv_failures()u64Failed receive attempts (e.g., recv on empty buffer)

Example

let topic = Topic::<Twist>::new("cmd_vel");

// Check state
if topic.has_message() {
    println!("{} messages pending", topic.pending_count());
}

// Check metrics
let m = topic.metrics();
println!("Sent: {}, Received: {}", m.messages_sent(), m.messages_received());
println!("Dropped: {}", topic.dropped_count());

Pool-Backed Types (Zero-Copy)

For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation to avoid copying payloads through the ring buffer. The Topic API provides specialized methods for these types.

Image Topics

let topic = Topic::<Image>::new("camera/rgb");

// Send (moves the Image into the pool slot)
topic.send(image);

// Try send (returns the Image on failure)
match topic.try_send(image) {
    Ok(()) => {}
    Err(img) => { /* buffer full */ }
}

// Receive
if let Some(img) = topic.recv() {
    println!("{}x{} image received", img.width(), img.height());
}

PointCloud Topics

let topic = Topic::<PointCloud>::new("lidar/points");

topic.send(cloud);

if let Some(cloud) = topic.recv() {
    println!("{} points received", cloud.len());
}

DepthImage Topics

let topic = Topic::<DepthImage>::new("camera/depth");

topic.send(depth);

if let Some(depth) = topic.recv() {
    println!("{}x{} depth image", depth.width(), depth.height());
}

Tensor Topics

Tensor topics use explicit handle-based allocation for maximum control:

let topic = Topic::<Tensor>::new("model/output");

// Access the backing pool
let pool = topic.pool();

// Allocate a tensor from the pool
let handle = topic.alloc_tensor(&[1, 3, 224, 224], TensorDtype::F32, Device::cpu());

// Send by handle (zero-copy — only the handle crosses the ring buffer)
topic.send_handle(handle);

// Receive by handle
if let Some(handle) = topic.recv_handle() {
    println!("Tensor shape: {:?}", handle.shape());
}
MethodReturnsDescription
pool()&TensorPoolAccess the backing tensor pool
alloc_tensor(shape, dtype, device)TensorHandleAllocate a tensor from the pool
send_handle(handle)()Publish a tensor handle (zero-copy)
recv_handle()Option<TensorHandle>Receive a tensor handle

Using Topics in Nodes

Topics are typically used inside Node implementations registered with a Scheduler:

use horus::prelude::*;

struct LidarProcessor {
    scan_in: Topic<LaserScan>,
    cmd_out: Topic<CmdVel>,
}

impl LidarProcessor {
    fn new() -> Self {
        Self {
            scan_in: Topic::new("scan"),
            cmd_out: Topic::new("cmd_vel"),
        }
    }
}

impl Node for LidarProcessor {
    fn name(&self) -> &str { "LidarProcessor" }

    fn tick(&mut self) {
        if let Some(scan) = self.scan_in.recv() {
            // Find closest obstacle
            let min_range = scan.ranges.iter()
                .copied()
                .filter(|r| *r > scan.range_min && *r < scan.range_max)
                .fold(f32::MAX, f32::min);

            // Stop if too close
            let speed = if min_range < 0.5 { 0.0 } else { 1.0 };
            self.cmd_out.send(CmdVel { linear: speed, angular: 0.0 });
        }
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(LidarProcessor::new())
        .rate(50.hz())
        .build()?;
    scheduler.run()
}

See Also