Topic API
Topics are how nodes communicate in HORUS. Topic<T> provides typed publish/subscribe channels backed by shared memory for zero-copy IPC. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.
Python: Available via
horus.Topic("name", Type). See Python Bindings.New to topics? Start with Topics: How Nodes Talk for a 5-minute introduction.
// simplified
use horus::prelude::*;
Quick Reference — Constructors
| Constructor | Returns | Description |
|---|---|---|
Topic::<T>::new(name) | HorusResult<Topic<T>> | Creates with auto-computed capacity (16–1024 based on message size) and auto-sized slots |
Topic::<T>::with_capacity(name, capacity, slot_size) | HorusResult<Topic<T>> | Creates with explicit buffer configuration |
Quick Reference — Sending
| Method | Returns | Description |
|---|---|---|
send(msg) | () | Publishes a message. Non-blocking. Drops oldest if full. |
try_send(msg) | Result<(), T> | Publishes if space available. Returns message back on failure. |
send_blocking(msg, timeout) | Result<(), SendBlockingError> | Blocks until space available or timeout. |
Quick Reference — Receiving
| Method | Returns | Description |
|---|---|---|
recv() | Option<T> | Takes the next unread message (FIFO). Consumes it. |
try_recv() | Option<T> | Same as recv(). Provided for API symmetry with try_send(). |
read_latest() | Option<T> | Reads the newest message, skipping older ones. Requires T: Copy. |
Quick Reference — State & Metrics
| Method | Returns | Description |
|---|---|---|
name() | &str | Topic name |
has_message() | bool | Whether at least one unread message exists |
pending_count() | u64 | Number of unread messages in the buffer |
dropped_count() | u64 | Total messages dropped (buffer-full overwrites) |
pub_count() | u32 | Number of active publishers |
sub_count() | u32 | Number of active subscribers |
is_same_process() | bool | Whether all pub/sub are in the same process (#[cfg(test)] only) |
is_same_thread() | bool | Whether all pub/sub are on the same thread (#[cfg(test)] only) |
metrics() | TopicMetrics | Aggregate send/receive statistics |
Constructor Methods
new(name)
Creates a topic with default capacity and auto-sized slots.
Signature
// simplified
pub fn new(name: impl Into<String>) -> HorusResult<Self>
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
name | impl Into<String> | yes | Topic identifier. Case-sensitive. Use dot-delimited hierarchical names: "sensors.imu", "cmd_vel". Two Topic instances with the same name and type T connect automatically. Note: Dots are preferred over slashes for macOS compatibility (shm_open does not allow / in names). |
Returns
HorusResult<Topic<T>> — connected to the named shared memory channel.
Errors
| Error | Condition |
|---|---|
HorusError | SHM region creation failed (permissions, disk full) |
Behavior
- Default capacity: auto-computed (16–1024 slots based on message size; smaller messages get more slots)
- Default slot size:
size_of::<T>()rounded up to page alignment - Backend auto-selection: if publisher and subscriber are in the same process, uses an in-process ring buffer (no SHM overhead). Cross-process uses SHM.
- Creating two
Topic<T>with the same name connects them — no broker, no configuration
Example
// simplified
use horus::prelude::*;
let pub_topic = Topic::<CmdVel>::new("cmd_vel")?;
let sub_topic = Topic::<CmdVel>::new("cmd_vel")?; // connects to same channel
pub_topic.send(CmdVel::new(1.0, 0.0));
let msg = sub_topic.recv(); // Some(CmdVel { linear: 1.0, angular: 0.0 })
with_capacity(name, capacity, slot_size)
Creates a topic with explicit buffer configuration.
Signature
// simplified
pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> HorusResult<Self>
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
name | &str | yes | Topic identifier (same rules as new). |
capacity | u32 | yes | Number of slots in the ring buffer. Must be a power of 2. Typical: 4, 8, 16. |
slot_size | Option<usize> | no | Bytes per slot. None = auto-sized from size_of::<T>(). Set explicitly for variable-size messages. |
Returns
HorusResult<Topic<T>>
When to use
- Large messages that need bigger slots (e.g.,
LaserScanwith 360+ ranges) - High-frequency topics where you need more buffering to prevent drops
- Small messages where you want to reduce memory footprint
When NOT to use
- Most cases —
Topic::new()auto-sizes correctly - Pool-backed types (
Image,PointCloud,DepthImage,Tensor) — they manage their own memory
Example
// simplified
use horus::prelude::*;
// 8 slots, 4KB each — for large LiDAR scans
let scan = Topic::<LaserScan>::with_capacity("lidar.scan", 8, Some(4096))?;
// 16 slots, auto-sized — high-frequency IMU with extra buffering
let imu = Topic::<Imu>::with_capacity("imu", 16, None)?;
Sending Methods
send(msg)
Publishes a message. Non-blocking. Overwrites the oldest unread message if the buffer is full.
Signature
// simplified
pub fn send(&self, msg: T)
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
msg | T | yes | Message to publish. Moved into the ring buffer. |
Returns
Nothing (()). Always succeeds — never blocks, never returns an error.
Behavior
- If all slots are occupied, the oldest unread message is silently overwritten
- Use
dropped_count()to detect overwrites - Notifies any event-driven nodes (
.on("topic")) that new data arrived - Fast path: ~3ns for same-thread publisher+subscriber (inlined ring write, no pointer chase)
- Cross-process: serializes via SHM, ~150ns typical
When to use
- Default for real-time robotics — you always want the latest data
- Control loops where blocking is unacceptable
- Fire-and-forget telemetry
When NOT to use
- When you need to know if the message was received — use
try_send()instead - When dropping messages is unacceptable — use
send_blocking()instead
Example
// simplified
use horus::prelude::*;
let topic = Topic::<CmdVel>::new("cmd_vel")?;
// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel::new(1.0, 0.0));
// Check if messages were dropped
if topic.dropped_count() > 0 {
hlog!(warn, "{} messages dropped — subscriber too slow", topic.dropped_count());
}
try_send(msg)
Attempts to publish without overwriting. Returns the message back if the buffer is full.
Signature
// simplified
pub fn try_send(&self, msg: T) -> Result<(), T>
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
msg | T | yes | Message to publish. Returned on failure. |
Returns
Ok(())— message published successfullyErr(msg)— buffer full, message returned to caller (not consumed)
Behavior
- Non-blocking. Checks buffer space and either publishes or returns immediately.
- The returned message can be retried or discarded — your choice.
When to use
- When you need to detect drops — implement backpressure, count losses, or log warnings
- When the message is expensive to create and you don't want to waste it
When NOT to use
- Normal control loops — use
send()(simpler, always succeeds)
Example
// simplified
use horus::prelude::*;
let topic = Topic::<CmdVel>::new("cmd_vel")?;
match topic.try_send(CmdVel::new(1.0, 0.0)) {
Ok(()) => { /* sent */ }
Err(_returned) => {
hlog!(warn, "Buffer full — message not sent");
}
}
send_blocking(msg, timeout)
Blocks until buffer space is available or the timeout elapses.
Signature
// simplified
pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
msg | T | yes | Message to publish. Consumed on success. |
timeout | Duration | yes | Maximum time to wait for space. Create with .ms(): 10_u64.ms(). |
Returns
Ok(())— message publishedErr(SendBlockingError::Timeout)— buffer stayed full for the entire timeout
Behavior
Uses a graduated wait strategy for low latency:
- Immediate
try_send(0 latency) - Spin loop — 256 iterations (~sub-microsecond)
- Yield — 8 thread yields (~microseconds)
- Sleep — 100us increments until deadline
When to use
- When dropping messages is unacceptable and brief blocking is acceptable
- Logging pipelines, recording, non-RT data transfer
When NOT to use
- Real-time control loops — blocking in
tick()causes deadline misses - Any node with
.rate()or.budget()— usesend()instead
Example
// simplified
use horus::prelude::*;
let topic = Topic::<DiagnosticReport>::new("diagnostics")?;
match topic.send_blocking(report, 10_u64.ms()) {
Ok(()) => { /* sent */ }
Err(SendBlockingError::Timeout) => {
hlog!(warn, "Diagnostic buffer full for 10ms — dropping report");
}
}
Receiving Methods
recv()
Takes the next unread message in FIFO order. Consumes it from the buffer.
Signature
// simplified
pub fn recv(&self) -> Option<T>
Parameters
None.
Returns
Some(T)— the oldest unread message (removed from buffer)None— no unread messages available
Behavior
- Non-blocking. Returns immediately.
- Each message is delivered to each subscriber exactly once. After
recv()returns it, the message is consumed. - Fast path: ~3ns for same-thread (inlined ring read)
When to use
- Command streams where every message matters (velocity commands, goal updates)
- Processing pipelines where order matters (image frames, sensor sequences)
When NOT to use
- State-like data where you only care about the latest value — use
read_latest()instead
Example
// simplified
use horus::prelude::*;
let topic = Topic::<Imu>::new("imu")?;
// IMPORTANT: drain every tick to avoid stale data accumulation
while let Some(msg) = topic.recv() {
process(msg);
}
try_recv()
Non-blocking receive. Functionally identical to recv().
Signature
// simplified
pub fn try_recv(&self) -> Option<T>
Returns
Same as recv(). Provided for API symmetry with try_send().
read_latest()
Reads the most recent message, skipping all older ones.
Signature
// simplified
pub fn read_latest(&self) -> Option<T>
where
T: Copy
Parameters
None.
Returns
Some(T)— copy of the newest messageNone— no messages available
Behavior
- Reads without consuming — multiple calls return the same value until a new message arrives
- Skips all older messages — only the latest matters
- Requires
T: Copybecause it reads (copies) without removing from the buffer
When to use
- State-like data where you only care about the current value: poses, sensor readings, configuration
- Slow subscribers that can't keep up with a fast publisher
When NOT to use
- Command streams where every message matters — use
recv()instead - Messages that aren't
Copy(e.g., types withVec,String)
Example
// simplified
use horus::prelude::*;
let odom = Topic::<Odometry>::new("odom")?;
// Good: pose is state — only the latest matters
if let Some(pose) = odom.read_latest() {
current_position = pose;
}
State & Metrics Methods
name()
Returns the topic name.
Signature
// simplified
pub fn name(&self) -> &str
Returns
&str — the name passed to new() or with_capacity().
has_message()
Checks whether at least one unread message exists.
Signature
// simplified
pub fn has_message(&self) -> bool
Returns
true if recv() would return Some. false if the buffer is empty.
pending_count()
Returns the number of unread messages in the buffer.
Signature
// simplified
pub fn pending_count(&self) -> u64
Returns
u64 — count of messages waiting to be consumed by recv().
dropped_count()
Returns the total number of messages dropped due to buffer-full overwrites.
Signature
// simplified
pub fn dropped_count(&self) -> u64
Returns
u64 — cumulative count since topic creation. Incremented by send() when it overwrites the oldest slot.
When to use
- Monitor for backpressure — a rising
dropped_count()means the subscriber is too slow - Log warnings when drops exceed a threshold
Example
// simplified
use horus::prelude::*;
let topic = Topic::<LaserScan>::new("scan")?;
if topic.dropped_count() > 0 {
hlog!(warn, "Dropped {} scans — subscriber can't keep up", topic.dropped_count());
}
pub_count()
Returns the number of active publishers on this topic.
Advanced: These methods are
#[doc(hidden)]in the Rust API — they may change without notice. For monitoring, preferhorus topic list --verbose.
Signature
// simplified
pub fn pub_count(&self) -> u32
sub_count()
Returns the number of active subscribers on this topic.
Advanced: These methods are
#[doc(hidden)]in the Rust API — they may change without notice. For monitoring, preferhorus topic list --verbose.
Signature
// simplified
pub fn sub_count(&self) -> u32
is_same_process() — #[cfg(test)] only
Checks whether all publishers and subscribers are in the same process. Available only in test builds.
Signature
// simplified
#[cfg(test)]
pub fn is_same_process(&self) -> bool
Returns
true if using in-process ring buffer (fastest path, ~3ns). false if cross-process SHM.
Note: This method is gated behind
#[cfg(test)]and is not available in production builds. It exists for internal testing and assertions only.
is_same_thread() — #[cfg(test)] only
Checks whether all publishers and subscribers are on the same thread. Available only in test builds.
Signature
// simplified
#[cfg(test)]
pub fn is_same_thread(&self) -> bool
Returns
true if using the ultra-fast same-thread path (inlined ring operations, no atomics).
Note: This method is gated behind
#[cfg(test)]and is not available in production builds. It exists for internal testing and assertions only.
metrics()
Returns aggregate send/receive statistics.
Signature
// simplified
pub fn metrics(&self) -> TopicMetrics
Returns
TopicMetrics — see Types section below.
Example
// simplified
use horus::prelude::*;
let topic = Topic::<CmdVel>::new("cmd_vel")?;
let m = topic.metrics();
println!("Sent: {}, Received: {}, Failures: {}",
m.messages_sent(), m.messages_received(), m.send_failures());
Pool-Backed Types (Zero-Copy)
For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation. The Topic API works the same — send(), recv(), try_send() — but the data is transferred via shared memory pools instead of copying through the ring buffer.
// simplified
use horus::prelude::*;
let camera = Topic::<Image>::new("camera.rgb")?;
// Send (moves the Image into the pool slot)
camera.send(image);
// IMPORTANT: call recv() every tick to drain — images are large and stale frames waste pool slots
if let Some(img) = camera.recv() {
println!("{}x{} image received", img.width(), img.height());
}
See Image API, PointCloud API, DepthImage API, Tensor API for type-specific methods.
Types
TopicMetrics
Aggregate statistics returned by topic.metrics().
| Method | Returns | Description |
|---|---|---|
.messages_sent() | u64 | Total messages published on this topic |
.messages_received() | u64 | Total messages consumed from this topic |
.send_failures() | u64 | Failed send attempts (e.g., try_send on a full buffer) |
.recv_failures() | u64 | Failed receive attempts (e.g., recv on empty buffer) |
SendBlockingError
Error returned by send_blocking().
| Variant | Description |
|---|---|
Timeout | The ring buffer stayed full for the entire timeout duration |
Production Example
Multi-sensor fusion node subscribing to two topics and publishing a fused pose:
// simplified
use horus::prelude::*;
struct FusionNode {
imu_sub: Topic<Imu>,
odom_sub: Topic<Odometry>,
pose_pub: Topic<Pose3D>,
last_imu: Option<Imu>,
last_odom: Option<Odometry>,
}
impl Node for FusionNode {
fn name(&self) -> &str { "Fusion" }
fn tick(&mut self) {
// IMPORTANT: always drain both topics every tick
if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }
if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
let fused = self.fuse(imu, odom);
self.pose_pub.send(fused);
}
}
}
See Also
- Topics Concept — Architecture, backends, and design patterns
- Topics for Beginners — Gentle introduction
- Communication Overview — When to use topics vs services vs actions
- Image API — Pool-backed camera images
- PointCloud API — Pool-backed 3D point clouds
- DepthImage API — Pool-backed depth images
- Tensor API — Tensor descriptor and DLPack
- Scheduler API — Running nodes that use topics
- Python Topic API — Python topic bindings