Ring Buffer Semantics (Python)
You have a LiDAR node publishing scans at 40 Hz and a path planner consuming them at 10 Hz. Four scans arrive between each planner tick. Where do the extra three go? They sit in the ring buffer. When the planner calls node.recv("scan"), it gets the oldest unread scan -- not the latest. If the planner falls further behind and 1024 scans pile up, the buffer is full. The next scan overwrites the oldest one. No error. No exception. No backpressure. The publisher keeps publishing at full speed.
This is the fundamental contract of HORUS topic communication: publishers never block, subscribers never stall, and when the buffer fills, the oldest message is silently dropped.
Understanding this contract is the difference between a system that works reliably and one where "my subscriber misses messages" becomes a recurring mystery.
How the Ring Buffer Works
Every topic subscription creates a ring buffer. The buffer is a fixed-size circular queue in shared memory. Publishers write to the head. Subscribers read from the tail.
Buffer capacity: 8 (shown small for illustration; default is 1024)
After 5 publishes, 0 reads:
[msg1] [msg2] [msg3] [msg4] [msg5] [ ] [ ] [ ]
tail head
^ recv() returns msg1
After 5 publishes, 2 reads:
[ ] [ ] [msg3] [msg4] [msg5] [ ] [ ] [ ]
tail head
^ recv() returns msg3
Buffer full (8 publishes, 0 reads):
[msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8]
tail head
9th publish — msg1 is dropped, msg9 takes its slot:
[msg9] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8]
tail head
^ recv() returns msg2 (msg1 is gone)
Key Properties
| Property | Behavior |
|---|---|
| Default capacity | 1024 messages per topic |
| Overflow policy | Oldest message dropped (no backpressure) |
recv() when empty | Returns None |
recv() when available | Returns oldest unread message (FIFO) |
recv_all() | Returns all buffered messages as a list, drains the buffer |
has_msg() | Returns True if at least one message is available, does not consume it |
| Publisher behavior on full buffer | Overwrites oldest, continues at full speed |
| Subscriber behavior on empty buffer | Returns immediately with None |
The Core API
recv() -- Read One Message
def tick(node):
msg = node.recv("scan")
if msg is None:
return # Nothing available this tick
# Process the oldest unread message
process(msg)
recv() returns the oldest unread message from the buffer and removes it. If the buffer is empty, it returns None immediately -- it never blocks. Each call consumes exactly one message.
recv_all() -- Drain Everything
def tick(node):
messages = node.recv_all("commands")
for msg in messages:
execute(msg)
# messages is [] if nothing was buffered
recv_all() returns every buffered message as a Python list and empties the buffer. Returns an empty list if nothing is available. Use this when you need to process every message and cannot afford to drop any.
has_msg() -- Peek Without Consuming
def tick(node):
if node.has_msg("emergency.stop"):
stop_msg = node.recv("emergency.stop")
handle_emergency(stop_msg)
has_msg() checks if at least one message is available without consuming it. The message stays in the buffer for the next recv() call. Use this for conditional processing -- check first, then decide whether to consume.
"My Subscriber Misses Messages"
This is the most common question from developers new to HORUS. Here is exactly what happens and why.
The Scenario
A camera node publishes images at 30 Hz. A detection node subscribes and processes them at 10 Hz. Over 3 seconds:
- Camera publishes 90 images
- Detection processes 30 images (one per tick at 10 Hz)
- 60 images sit in the buffer (90 - 30 = 60)
If this continues indefinitely, the buffer fills to 1024. After that, every new publish drops the oldest unread image.
This is not a bug. The detection node is processing images slower than they arrive. The ring buffer absorbs the burst, and when full, it keeps only the most recent 1024 images.
When recv() Returns None
recv() returns None only when the buffer is truly empty -- the subscriber has consumed everything and the publisher has not written anything new since the last read. This happens when:
- The publisher has not started yet (startup race)
- The publisher runs slower than the subscriber
- The subscriber just called
recv_all()and drained everything
recv() does not return None when messages were dropped due to overflow. Overflow is invisible to the subscriber -- it simply never sees the dropped messages.
How to Know If Messages Were Dropped
There is no built-in per-message drop counter on the Python API. Instead, use these approaches:
Approach 1: Check publish rate vs. receive rate with CLI tools
# In one terminal: check how fast a topic is being published
horus topic hz scan
# In another: check how many messages are buffered
horus topic info scan
If the publish rate is significantly higher than your subscriber's tick rate and the buffer is near capacity, drops are happening.
Approach 2: Use message timestamps
If your messages include a timestamp_ns field (all typed messages do), check for gaps:
last_ts = None
def tick(node):
global last_ts
msg = node.recv("imu")
if msg is None:
return
if last_ts is not None:
gap_ms = (msg.timestamp_ns - last_ts) / 1_000_000
expected_ms = 10.0 # 100 Hz publisher
if gap_ms > expected_ms * 2:
node.log_warning(f"IMU gap: {gap_ms:.1f}ms (expected ~{expected_ms:.1f}ms)")
last_ts = msg.timestamp_ns
A timestamp gap larger than the expected publish interval means either messages were dropped or the publisher hiccupped.
Approach 3: Count what you receive
received = 0
def tick(node):
global received
msgs = node.recv_all("data")
received += len(msgs)
# After the run:
# If publisher sent 10000 and you received 8500, 1500 were dropped
Tuning Buffer Capacity
Setting Capacity
Set the default buffer capacity for all topics auto-created by a node using default_capacity on the Node constructor:
import horus
# Small buffer: only keep the 16 most recent images
camera_sub = horus.Node(
name="detector",
subs=[horus.Image],
tick=detect_tick,
rate=10,
default_capacity=16,
)
# Large buffer: keep 4096 commands to never drop any
command_processor = horus.Node(
name="cmd_processor",
subs=["commands"],
tick=process_tick,
rate=100,
default_capacity=4096,
)
Choosing the Right Capacity
The right capacity depends on two numbers: the publisher's rate and the subscriber's rate.
Minimum to avoid drops: capacity >= publisher_rate / subscriber_rate * max_burst_duration
For a 100 Hz publisher and a 10 Hz subscriber, each subscriber tick processes 1 message while 10 arrive. If the subscriber occasionally takes 2x longer (e.g., ML inference on a complex frame), 20 messages arrive in that period. A capacity of 64 gives comfortable headroom for occasional slowdowns.
Rule of thumb:
| Publisher rate | Subscriber rate | Recommended capacity |
|---|---|---|
| Same or lower | Any | Default (1024) is fine |
| 2-10x faster | Any | Default (1024) is fine |
| 10-100x faster | Any | Consider larger if every message matters |
| Any | Bursty (variable processing time) | 2-4x the steady-state backlog |
Memory Impact
Each slot in the ring buffer holds one message. For typed (Pod) messages, the memory per slot is the message size (e.g., 48 bytes for CmdVel, 64 bytes for Imu). For generic messages (dicts), each slot holds a serialized MessagePack blob.
| Capacity | CmdVel (48 B) | Imu (64 B) | Image 640x480 RGB (921 KB) |
|---|---|---|---|
| 64 | 3 KB | 4 KB | 57 MB |
| 1024 (default) | 48 KB | 64 KB | 922 MB |
| 4096 | 192 KB | 256 KB | 3.6 GB |
For large messages like images, reduce the buffer capacity. A 1024-slot buffer for 640x480 RGB images uses nearly 1 GB. Set default_capacity=8 or default_capacity=16 for image topics -- you almost certainly want the latest frame, not a backlog of 1024 frames.
When Drops Are OK vs. When They Are Not
Drops Are Fine
Video frames: A perception node running at 10 Hz on a 30 Hz camera stream should process the most recent frame, not a queue of old ones. Dropped frames are not just acceptable -- they are desirable. Processing a 3-second-old frame is worse than skipping it.
Sensor readings for display: A dashboard showing IMU data at 1 Hz does not need every 100 Hz reading. The latest reading is sufficient.
Telemetry / logging: If the logger falls behind, it is better to lose some data points than to build up an ever-growing backlog that delays all future data.
Redundant readings: If a temperature sensor publishes at 100 Hz but the value changes every few seconds, dropping 99% of the readings loses no information.
For these cases, use recv() to get one message per tick, and accept that earlier messages were overwritten. Or even better, drain with recv_all() and only use the last one:
def tick(node):
# Get all buffered frames, use only the latest
frames = node.recv_all("camera.rgb")
if frames:
latest = frames[-1]
process(latest)
Drops Are Not OK
Motor commands: If a trajectory planner sends a sequence of 100 velocity commands and the motor controller drops 20, the robot follows the wrong trajectory. Every command in the sequence matters.
State machine transitions: If a state manager sends "arm -> disarm -> arm" and the middle message is dropped, the system stays armed when it should be armed but missed the safety check.
Configuration updates: If a parameter server sends updated PID gains and the message is dropped, the controller runs with stale gains indefinitely.
Sequential protocols: Any communication where message ordering and completeness matter (handshakes, acknowledgments, multi-step commands).
For these cases:
# Use recv_all() to process every message
def motor_tick(node):
commands = node.recv_all("trajectory")
for cmd in commands:
apply_velocity(cmd)
# And increase capacity if needed
motor = horus.Node(
name="motor",
subs=["trajectory"],
tick=motor_tick,
rate=100,
default_capacity=4096,
)
Patterns
Latest-Only (Skip Backlog)
When you only care about the most recent value:
def tick(node):
msgs = node.recv_all("camera.rgb")
if msgs:
process(msgs[-1]) # Only the newest
Process All (Never Drop)
When every message must be handled:
def tick(node):
for cmd in node.recv_all("commands"):
execute(cmd)
Conditional Processing
When some messages need immediate handling:
def tick(node):
# Always check for emergency stop first
if node.has_msg("emergency.stop"):
stop = node.recv("emergency.stop")
handle_emergency(stop)
return
# Normal processing
msg = node.recv("data")
if msg:
process(msg)
Accumulate and Batch
When you want to collect messages over time and process as a batch:
accumulated = []
def tick(node):
global accumulated
accumulated.extend(node.recv_all("measurements"))
if len(accumulated) >= 100:
result = analyze_batch(accumulated)
node.send("analysis", result)
accumulated.clear()
Common Mistakes
Mistake 1: Calling recv() Once When Multiple Messages Are Buffered
# WRONG: processes only 1 message per tick, even if 50 are buffered
def tick(node):
msg = node.recv("commands")
if msg:
execute(msg)
If the publisher is faster than the subscriber, this creates an ever-growing backlog. The buffer eventually fills and messages are dropped. If every message matters, use recv_all():
# CORRECT: processes all buffered messages each tick
def tick(node):
for msg in node.recv_all("commands"):
execute(msg)
Mistake 2: Assuming recv() Returns the Latest Message
# WRONG assumption: "recv gives me the newest"
def tick(node):
latest = node.recv("scan") # This is the OLDEST unread, not the newest
recv() returns the oldest unread message (FIFO order). If you want the latest, drain and use the last:
# CORRECT: get the latest
def tick(node):
scans = node.recv_all("scan")
if scans:
latest = scans[-1]
Mistake 3: Not Checking for None
# WRONG: crashes when buffer is empty
def tick(node):
msg = node.recv("data")
process(msg["value"]) # TypeError: 'NoneType' is not subscriptable
Always check:
# CORRECT
def tick(node):
msg = node.recv("data")
if msg is not None:
process(msg["value"])
Mistake 4: Using Default Capacity for Large Messages
# DANGEROUS: 1024 * 921KB = 922 MB for one topic
node = horus.Node(
subs=[horus.Image],
tick=process_tick,
rate=10,
# default_capacity=1024 (implicit)
)
Reduce capacity for large messages:
# CORRECT: 16 * 921KB = ~14 MB
node = horus.Node(
subs=[horus.Image],
tick=process_tick,
rate=10,
default_capacity=16,
)
Monitoring Buffer Health
Use the HORUS CLI to inspect topic buffer state at runtime:
# Check publish rate
horus topic hz scan
# Output: scan: 40.0 Hz
# List active topics and their metadata
horus topic list
# Watch a topic's contents
horus topic echo scan --count 5
If you see the publish rate is much higher than your subscriber's tick rate and you cannot afford drops, either increase the subscriber's rate, increase the buffer capacity, or use recv_all() to drain each tick.
Design Decisions
Why a fixed-size ring buffer instead of an unbounded queue? Unbounded queues leak memory. A LiDAR publishing at 40 Hz on a 1 Hz subscriber would accumulate 40 messages per second indefinitely. After an hour, that is 144,000 messages consuming memory that will never be reclaimed because the subscriber will never catch up. A fixed-size ring buffer caps memory at capacity * message_size, and the publisher keeps running at full speed regardless of subscriber behavior. In robotics, predictable memory usage is a requirement, not a luxury.
Why drop the oldest message instead of the newest? When the buffer is full and a new message arrives, the system has two choices: drop the new message (keep old data) or drop the oldest message (keep new data). In robotics, newer data is almost always more valuable -- the robot has moved since the old reading was taken. Keeping old data at the expense of new data means the subscriber is working with increasingly stale information. Dropping the oldest keeps the buffer as fresh as possible.
Why no backpressure? Backpressure means the publisher slows down or blocks when the subscriber cannot keep up. In robotics, publishers are often tied to physical sensors (a camera produces frames at 30 Hz regardless of what the subscriber does). Blocking the sensor read thread causes hardware buffer overflows, dropped interrupts, or device resets. Even for software publishers, backpressure from a slow logger should never cause a motor controller to miss its deadline. The ring buffer decouples publisher and subscriber timing completely.
Why recv() returns the oldest, not the latest? FIFO ordering preserves causality. If a trajectory planner sends commands [1, 2, 3, 4, 5], the motor controller should execute them in that order. Returning the latest would skip commands. Developers who want only the latest can use the recv_all()[-1] pattern, but the default behavior preserves message ordering for safety-critical communication.
Why 1024 as the default capacity? It is large enough to absorb multi-second bursts between a fast publisher and slow subscriber (e.g., 100 Hz pub, 10 Hz sub = 10 messages per subscriber tick, 1024 covers ~100 seconds of backlog). It is small enough that memory usage stays reasonable for small messages (1024 * 64 bytes = 64 KB per topic). Large messages (images, point clouds) should explicitly reduce the capacity.
Trade-offs
| Gain | Cost |
|---|---|
| Publishers never block -- sensor nodes run at full speed regardless of subscribers | Subscribers can miss messages with no notification |
Fixed memory -- capacity * message_size, never grows | Must choose capacity at construction time; too small drops messages, too large wastes memory |
| FIFO ordering -- messages arrive in publish order | Subscribers that only need the latest must drain the buffer themselves |
| No backpressure -- fast publisher cannot stall slow subscriber | No built-in mechanism to signal "subscriber is falling behind" |
| Zero-copy shared memory -- publisher and subscriber share the same buffer | Buffer is per-subscriber; N subscribers means N copies of each message |
| Default capacity 1024 -- works for most small-message topics without tuning | Image topics at 1024 slots use nearly 1 GB; must tune manually |
See Also
- Python Bindings -- Full
recv(),recv_all(),has_msg(), andsend()reference - Topics: How Nodes Talk -- Beginner introduction to topic communication
- Topics -- Full Reference -- Topic architecture, zero-copy IPC, and shared memory
- Shared Memory -- How HORUS uses shared memory for zero-copy transport
- Python Examples -- Complete working examples