Publisher and Subscriber API

HORUS uses shared memory (SHM) for inter-process communication. Publisher<T> writes messages, Subscriber<T> reads them -- both zero-copy. The loan pattern lets you write directly into shared memory without any intermediate buffers or serialization.

Rust: See Topic<T> for the Rust unified pub/sub API. Python: See node.send() / node.recv() for the Python wrapper.

// simplified
#include <horus/topic.hpp>
#include <horus/messages.hpp>

Quick Reference -- Publisher<T>

MethodReturnsDescription
.loan()LoanedSample<T>Get a writable buffer from SHM (zero-copy)
.publish(LoanedSample<T>&&)voidMake a loaned sample visible to subscribers
.send(const T&)voidSend a message by copy (simpler API)
.name()const string&Get the topic name

Quick Reference -- Subscriber<T>

MethodReturnsDescription
.recv()optional<BorrowedSample<T>>Receive the next message (nullopt if none)
.has_msg()boolCheck if a message is available without consuming
.name()const string&Get the topic name

Quick Reference -- LoanedSample<T>

MethodReturnsDescription
operator->()T* / const T*Direct SHM pointer access (zero-copy write)
operator*()T& / const T&Dereference to the data
.get()T* / const T*Raw pointer to the data

Quick Reference -- BorrowedSample<T>

MethodReturnsDescription
operator->()const T*Read-only pointer access
operator*()const T&Read-only dereference
.get()const T*Raw read-only pointer

Creating Topics

Topics are created on the Scheduler or inside a Node / LambdaNode. Topic names use dots as separators (not slashes), required for macOS shm_open compatibility.

On the Scheduler (for Lambda Nodes)

auto cmd_pub  = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto scan_sub = sched.subscribe<horus::msg::LaserScan>("lidar.scan");

Inside a Node Subclass (Constructor)

class Controller : public horus::Node {
public:
    Controller() : Node("controller") {
        scan_sub_ = subscribe<horus::msg::LaserScan>("lidar.scan");
        cmd_pub_  = advertise<horus::msg::CmdVel>("motor.cmd");
    }
    // ...
private:
    horus::Subscriber<horus::msg::LaserScan>* scan_sub_;
    horus::Publisher<horus::msg::CmdVel>* cmd_pub_;
};

Inside a LambdaNode (Builder)

auto node = horus::LambdaNode("controller")
    .sub<horus::msg::LaserScan>("lidar.scan")
    .pub<horus::msg::CmdVel>("motor.cmd");

Zero-Copy Loan Pattern

The loan pattern is the primary way to publish messages in HORUS. It eliminates all copies between producer and consumer.

How It Works

1. pub.loan()                    --> SHM allocates a slot, returns LoanedSample<T>
2. sample->field = value         --> writes directly into SHM (0 ns copy overhead)
3. pub.publish(std::move(sample))--> flips visibility flag, subscribers can read

Full Example

auto pub = sched.advertise<horus::msg::Odometry>("odom");

sched.add("localizer")
    .rate(100_hz)
    .tick([&] {
        // Step 1: Loan a writable slot from shared memory
        auto sample = pub.loan();

        // Step 2: Write directly into SHM (zero copy)
        sample->pose.x = current_x;
        sample->pose.y = current_y;
        sample->pose.theta = current_theta;
        sample->timestamp_ns = now_ns();

        // Step 3: Publish (consumes the sample via move)
        pub.publish(std::move(sample));
    })
    .build();

After publish(), the LoanedSample is consumed (moved). Attempting to use it after publish is a compile error.

LoanedSample Access Patterns

auto sample = pub.loan();

// Arrow operator -- most common
sample->linear = 0.5f;
sample->angular = 0.1f;

// Dereference operator
horus::msg::CmdVel& ref = *sample;
ref.linear = 0.5f;

// Raw pointer
horus::msg::CmdVel* ptr = sample.get();
ptr->linear = 0.5f;

Send-by-Copy Pattern

For simplicity or small messages, send() copies the message into shared memory in one call:

auto pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");

// Single-call publish -- copies msg into SHM
pub.send(horus::msg::CmdVel{0, 0.5f, 0.1f});

This is equivalent to loan() + write + publish() but performs one copy. For small messages like CmdVel (16 bytes), the overhead is negligible. For large messages like LaserScan (1468 bytes) or Odometry, prefer the loan pattern.

PatternCopy CountBest For
loan() + publish()0Large messages, high-frequency topics
send()1Small messages, simple code

Receiving Messages

Basic Receive

recv() returns std::optional<BorrowedSample<T>>. If no message is available, it returns std::nullopt:

auto sub = sched.subscribe<horus::msg::LaserScan>("lidar.scan");

sched.add("processor")
    .rate(50_hz)
    .tick([&] {
        auto scan = sub.recv();
        if (!scan) return;  // no message this tick

        // Access fields through arrow operator (read-only)
        float front_range = scan->ranges[0];
        float min_range = scan->min_range;

        // Dereference
        const horus::msg::LaserScan& data = *scan;

        // Raw pointer
        const horus::msg::LaserScan* ptr = scan->get();
    })
    .build();

Check Before Consuming

Use has_msg() to check availability without consuming the message:

sched.add("conditional_processor")
    .rate(50_hz)
    .tick([&] {
        if (!scan_sub.has_msg()) {
            // No scan available -- do something else
            return;
        }
        auto scan = scan_sub.recv();
        // guaranteed to have a value
    })
    .build();

BorrowedSample Lifetime

The BorrowedSample is valid until it goes out of scope. RAII releases the SHM slot automatically:

{
    auto scan = sub.recv();
    if (scan) {
        process(*scan);   // valid here
    }
}   // BorrowedSample released here -- SHM slot freed

Move Semantics

All topic types are move-only. Copy constructors and copy assignment operators are deleted. This enforces single ownership, matching Rust's ownership model:

auto pub = sched.advertise<horus::msg::CmdVel>("cmd");
// auto pub2 = pub;                 // COMPILE ERROR: copy deleted
auto pub2 = std::move(pub);         // OK: ownership transferred

auto sample = pub2.loan();
// auto sample2 = sample;           // COMPILE ERROR: copy deleted
pub2.publish(std::move(sample));     // OK: consumed by publish
// sample->linear = 1.0;            // COMPILE ERROR: sample was moved

Supported Message Types (51)

All message types live in the horus::msg namespace. They are #[repr(C)] compatible with Rust for zero-copy SHM IPC. Each type gets a Publisher<T> and Subscriber<T> template specialization generated internally via HORUS_TOPIC_IMPL in impl/topic_impl.hpp.

Geometry (12): Point3, Vector3, Quaternion, Twist, Pose2D, TransformStamped, Pose3D, PoseStamped, PoseWithCovariance, TwistWithCovariance, Accel, AccelStamped

Sensor (11): LaserScan, Imu, Odometry, RangeSensor, BatteryState, NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance, JointState

Control (7): CmdVel, MotorCommand, DifferentialDriveCommand, ServoCommand, PidConfig, TrajectoryPoint, JointCommand

Navigation (5): NavGoal, GoalResult, Waypoint, VelocityObstacle, PathPlan

Diagnostics (8): Heartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage, NodeHeartbeat, SafetyStatus, DiagnosticValue, DiagnosticReport

Detection (12): BoundingBox2D, BoundingBox3D, Detection, Detection3D, Landmark, Landmark3D, LandmarkArray, TrackedObject, TrackingHeader, PlaneDetection, SegmentationMask, PointField

Vision (3): CameraInfo, RegionOfInterest, StereoInfo

Force (5): WrenchStamped, ImpedanceParameters, ForceCommand, ContactInfo, HapticFeedback

Time (4): Clock, TimeReference, SimSync, RateRequest

Input (3): KeyboardInput, JoystickInput, AudioFrame


Common Patterns

Multi-Topic Node

A single node can publish and subscribe to multiple topics:

auto imu_sub   = sched.subscribe<horus::msg::Imu>("imu.data");
auto odom_sub  = sched.subscribe<horus::msg::Odometry>("odom");
auto cmd_pub   = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto diag_pub  = sched.advertise<horus::msg::Heartbeat>("health.ctrl");

sched.add("fusion_controller")
    .rate(100_hz)
    .tick([&] {
        auto imu = imu_sub.recv();
        auto odom = odom_sub.recv();

        if (imu && odom) {
            auto cmd = cmd_pub.loan();
            cmd->linear = compute_speed(*odom);
            cmd->angular = compute_turn(*imu);
            cmd_pub.publish(std::move(cmd));
        }

        // Always publish heartbeat
        uint64_t seq = 0;
        diag_pub.send(horus::msg::Heartbeat{
            {}, seq++, true, 0.0f, 0.0f, 0
        });
    })
    .build();

Pipeline Pattern

Chain nodes through topics to form a processing pipeline:

auto raw_sub  = sched.subscribe<horus::msg::LaserScan>("lidar.raw");
auto filt_pub = sched.advertise<horus::msg::LaserScan>("lidar.filtered");
auto filt_sub = sched.subscribe<horus::msg::LaserScan>("lidar.filtered");
auto cmd_pub  = sched.advertise<horus::msg::CmdVel>("motor.cmd");

// Stage 1: Filter raw scan
sched.add("filter")
    .rate(50_hz).order(1)
    .tick([&] {
        auto raw = raw_sub.recv();
        if (!raw) return;
        auto filtered = filt_pub.loan();
        *filtered = *raw;  // copy, then modify in place
        for (int i = 0; i < 360; ++i) {
            if (filtered->ranges[i] < 0.05f)
                filtered->ranges[i] = 0.0f;
        }
        filt_pub.publish(std::move(filtered));
    })
    .build();

// Stage 2: React to filtered scan
sched.add("controller")
    .rate(50_hz).order(2)
    .tick([&] {
        auto scan = filt_sub.recv();
        if (!scan) return;
        cmd_pub.send(horus::msg::CmdVel{0, 0.3f, 0.0f});
    })
    .build();

Topic Name Conventions

Use dot-separated hierarchical names:

sensor.lidar.scan        # LiDAR data
sensor.imu.data          # IMU readings
motor.cmd                # Motor commands
nav.goal                 # Navigation goal
safety.alert             # Safety alerts
diagnostics.heartbeat    # Heartbeat

See Also