Topic<T> is the primary communication primitive in HORUS. It provides typed publish/subscribe channels backed by shared memory for zero-copy IPC within a single machine.
Creating a Topic
use horus::prelude::*;
// Default capacity and slot size
let topic = Topic::<Twist>::new("cmd_vel");
// Custom capacity (number of slots) and slot size (bytes per slot)
let topic = Topic::<LaserScan>::with_capacity("scan", 8, 4096);
The name string identifies the shared-memory channel. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.
Sending Messages
Method
Returns
Description
send(msg)
()
Publish a message (non-blocking, drops oldest if full)
try_send(msg)
Result<(), T>
Publish if space available; returns message back on failure
send_blocking(msg, timeout)
Result<(), SendBlockingError>
Block until space is available or timeout elapses
SendBlockingError
Variant
Description
Timeout
Timed out waiting for space in the ring buffer
Example
let topic = Topic::<CmdVel>::new("cmd_vel");
// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel { linear: 1.0, angular: 0.0 });
// Try without blocking
match topic.try_send(CmdVel { linear: 0.5, angular: 0.1 }) {
Ok(()) => { /* sent */ }
Err(msg) => { /* buffer full, msg returned */ }
}
// Block up to 10ms
use horus::prelude::*;
match topic.send_blocking(CmdVel { linear: 1.0, angular: 0.0 }, 10_u64.ms()) {
Ok(()) => { /* sent */ }
Err(SendBlockingError::Timeout) => { /* timed out */ }
}
Receiving Messages
Method
Returns
Description
recv()
Option<T>
Take the next unread message (FIFO order)
read_latest()
Option<T>
Read the most recent message, skipping older ones (requires T: Copy)
recv() returns messages in order. Each message is delivered to each subscriber exactly once. read_latest() is useful for state-like data (sensor readings, poses) where you only care about the newest value.
Example
let topic = Topic::<Imu>::new("imu");
// Process all pending messages
while let Some(msg) = topic.recv() {
println!("Accel: ({}, {}, {})", msg.linear_accel.x, msg.linear_accel.y, msg.linear_accel.z);
}
// Or just get the latest reading
if let Some(latest) = topic.read_latest() {
println!("Latest orientation: {:?}", latest.orientation);
}
State & Metrics
Method
Returns
Description
name()
&str
The topic name
has_message()
bool
Whether there is at least one unread message
pending_count()
usize
Number of unread messages in the buffer
dropped_count()
u64
Total messages dropped due to full buffer
metrics()
TopicMetrics
Aggregate send/receive statistics
TopicMetrics
Method
Returns
Description
messages_sent()
u64
Total messages published on this topic
messages_received()
u64
Total messages consumed from this topic
send_failures()
u64
Failed send attempts (e.g., try_send on a full buffer)
recv_failures()
u64
Failed receive attempts (e.g., recv on empty buffer)
Example
let topic = Topic::<Twist>::new("cmd_vel");
// Check state
if topic.has_message() {
println!("{} messages pending", topic.pending_count());
}
// Check metrics
let m = topic.metrics();
println!("Sent: {}, Received: {}", m.messages_sent(), m.messages_received());
println!("Dropped: {}", topic.dropped_count());
Pool-Backed Types (Zero-Copy)
For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation to avoid copying payloads through the ring buffer. The Topic API provides specialized methods for these types.
Image Topics
let topic = Topic::<Image>::new("camera/rgb");
// Send (moves the Image into the pool slot)
topic.send(image);
// Try send (returns the Image on failure)
match topic.try_send(image) {
Ok(()) => {}
Err(img) => { /* buffer full */ }
}
// Receive
if let Some(img) = topic.recv() {
println!("{}x{} image received", img.width(), img.height());
}
PointCloud Topics
let topic = Topic::<PointCloud>::new("lidar/points");
topic.send(cloud);
if let Some(cloud) = topic.recv() {
println!("{} points received", cloud.len());
}
DepthImage Topics
let topic = Topic::<DepthImage>::new("camera/depth");
topic.send(depth);
if let Some(depth) = topic.recv() {
println!("{}x{} depth image", depth.width(), depth.height());
}
Tensor Topics
Tensor topics use explicit handle-based allocation for maximum control:
let topic = Topic::<Tensor>::new("model/output");
// Access the backing pool
let pool = topic.pool();
// Allocate a tensor from the pool
let handle = topic.alloc_tensor(&[1, 3, 224, 224], TensorDtype::F32, Device::cpu());
// Send by handle (zero-copy — only the handle crosses the ring buffer)
topic.send_handle(handle);
// Receive by handle
if let Some(handle) = topic.recv_handle() {
println!("Tensor shape: {:?}", handle.shape());
}
Method
Returns
Description
pool()
&TensorPool
Access the backing tensor pool
alloc_tensor(shape, dtype, device)
TensorHandle
Allocate a tensor from the pool
send_handle(handle)
()
Publish a tensor handle (zero-copy)
recv_handle()
Option<TensorHandle>
Receive a tensor handle
Using Topics in Nodes
Topics are typically used inside Node implementations registered with a Scheduler: