Basic Examples

Learn HORUS fundamentals through simple, focused examples. Each example is complete and runnable with horus run.

Estimated time: 30-45 minutes

Prerequisites


1. Basic Publisher-Subscriber

The foundational pattern in HORUS: one node publishes data, another subscribes.

Publisher Node

File: publisher.rs

use horus::prelude::*;

// Define publisher node
struct SensorNode {
    data_pub: Topic<f32>,
    counter: f32,
}

impl SensorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            data_pub: Topic::new("sensor_data")?,
            counter: 0.0,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &str {
        "SensorNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Sensor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        // Simulate sensor reading
        let reading = self.counter.sin() * 10.0;

        // Publish data
        self.data_pub.send(reading);
        hlog!(debug, "Published: {:.2}", reading);

        self.counter += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Run it:

horus run publisher.rs

Subscriber Node

File: subscriber.rs

use horus::prelude::*;

struct ProcessorNode {
    data_sub: Topic<f32>,
}

impl ProcessorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            data_sub: Topic::new("sensor_data")?,
        })
    }
}

impl Node for ProcessorNode {
    fn name(&self) -> &str {
        "ProcessorNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Processor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            // Process received data
            let processed = data * 2.0;
            hlog!(debug, "Processed: {:.2}", processed);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Processor shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(ProcessorNode::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Run it:

HORUS uses a flat namespace (like ROS), so processes automatically share topics:

# Terminal 1
horus run publisher.rs

# Terminal 2 (automatically connects!)
horus run subscriber.rs

Both use the same topic name ("sensor_data") → communication works automatically!

Combined Application

File: pubsub.rs

use horus::prelude::*;

// Publisher
struct SensorNode {
    data_pub: Topic<f32>,
    counter: f32,
}

impl Node for SensorNode {
    fn name(&self) -> &str { "SensorNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Sensor online");
        Ok(())
    }

    fn tick(&mut self) {
        let reading = self.counter.sin() * 10.0;
        self.data_pub.send(reading);
        self.counter += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor offline");
        Ok(())
    }
}

// Subscriber
struct ProcessorNode {
    data_sub: Topic<f32>,
}

impl Node for ProcessorNode {
    fn name(&self) -> &str { "ProcessorNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Processor online");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(data) = self.data_sub.recv() {
            let processed = data * 2.0;
            hlog!(info, "Received: {:.2} -> {:.2}", data, processed);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Processor offline");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Add both nodes
    scheduler.add(SensorNode {
        data_pub: Topic::new("sensor_data")?,
        counter: 0.0,
    }).order(0).build()?;

    scheduler.add(ProcessorNode {
        data_sub: Topic::new("sensor_data")?,
    }).order(1).build()?;

    // Run both nodes together
    scheduler.run()?;
    Ok(())
}

Run it:

horus run pubsub.rs

Key Concepts:

  • Publisher uses Topic::new("topic") to create publisher
  • Subscriber uses same topic name "sensor_data"
  • Priority matters: Publisher (0) runs before Subscriber (1)
  • recv() returns Option<T> - handle None gracefully

2. Robot Velocity Controller

Control a robot using standard CmdVel messages.

File: robot_controller.rs

use horus::prelude::*;

// Keyboard input  velocity commands
struct TeleopNode {
    cmd_pub: Topic<CmdVel>,
}

impl TeleopNode {
    fn new() -> Result<Self> {
        Ok(Self {
            cmd_pub: Topic::new("cmd_vel")?,
        })
    }
}

impl Node for TeleopNode {
    fn name(&self) -> &str { "TeleopNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Teleop ready - sending movement commands");
        Ok(())
    }

    fn tick(&mut self) {
        // Simulate keyboard input (w/a/s/d)
        // In real code, read from device::Input
        let cmd = CmdVel::new(1.0, 0.5);  // Forward + turn right
        self.cmd_pub.send(cmd);
    }

    fn shutdown(&mut self) -> Result<()> {
        // Send stop command on shutdown
        let stop = CmdVel::zero();
        self.cmd_pub.send(stop);
        hlog!(info, "Teleop stopped");
        Ok(())
    }
}

// Velocity commands  motor control
struct MotorDriverNode {
    cmd_sub: Topic<CmdVel>,
}

impl MotorDriverNode {
    fn new() -> Result<Self> {
        Ok(Self {
            cmd_sub: Topic::new("cmd_vel")?,
        })
    }
}

impl Node for MotorDriverNode {
    fn name(&self) -> &str { "MotorDriverNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Motor driver initialized");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(cmd) = self.cmd_sub.recv() {
            // Convert to differential drive (left/right wheel speeds)
            let left_speed = cmd.linear - cmd.angular;
            let right_speed = cmd.linear + cmd.angular;

            // Send to motors
            hlog!(debug, "Motors: L={:.2} m/s, R={:.2} m/s",
                left_speed, right_speed);

            // In real code: send to hardware
            // motor_driver.set_speeds(left_speed, right_speed)?;
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Motors stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    scheduler.add(TeleopNode::new()?).order(0).build()?;
    scheduler.add(MotorDriverNode::new()?).order(1).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run robot_controller.rs

Key Concepts:

  • CmdVel is a standard robotics message type
  • CmdVel::new(linear, angular) creates velocity commands
  • Differential drive: left = linear - angular, right = linear + angular
  • Use shutdown() to send safe stop commands

3. Lidar Obstacle Detection

Process laser scan data to detect obstacles and stop the robot.

File: obstacle_detector.rs

use horus::prelude::*;

// Lidar  Scan data
struct LidarNode {
    scan_pub: Topic<LaserScan>,
    angle: f32,
}

impl LidarNode {
    fn new() -> Result<Self> {
        Ok(Self {
            scan_pub: Topic::new("scan")?,
            angle: 0.0,
        })
    }
}

impl Node for LidarNode {
    fn name(&self) -> &str { "LidarNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Lidar initialized");
        Ok(())
    }

    fn tick(&mut self) {
        let mut scan = LaserScan::new();

        // Simulate lidar readings (sine wave for demo)
        for i in 0..360 {
            scan.ranges[i] = 5.0 + (self.angle + i as f32 * 0.01).sin() * 2.0;
        }

        // Add one close obstacle in front
        scan.ranges[0] = 0.3;  // 30cm directly ahead!

        self.scan_pub.send(scan);
        self.angle += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Lidar offline");
        Ok(())
    }
}

// Scan data  Obstacle detection  Stop command
struct ObstacleDetector {
    scan_sub: Topic<LaserScan>,
    cmd_pub: Topic<CmdVel>,
    safety_distance: f32,
}

impl ObstacleDetector {
    fn new(safety_distance: f32) -> Result<Self> {
        Ok(Self {
            scan_sub: Topic::new("scan")?,
            cmd_pub: Topic::new("cmd_vel")?,
            safety_distance,
        })
    }
}

impl Node for ObstacleDetector {
    fn name(&self) -> &str { "ObstacleDetector" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Obstacle detector active - safety distance: {:.2}m",
            self.safety_distance);
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(scan) = self.scan_sub.recv() {
            // Find minimum distance
            if let Some(min_dist) = scan.min_range() {
                if min_dist < self.safety_distance {
                    // Emergency stop!
                    let stop = CmdVel::zero();
                    self.cmd_pub.send(stop);
                    hlog!(warn, "[WARNING] Obstacle detected at {:.2}m - STOPPING!",
                        min_dist);
                } else {
                    // Safe to move
                    hlog!(debug, "Safe - closest obstacle: {:.2}m", min_dist);
                }
            }
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Obstacle detector offline");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    scheduler.add(LidarNode::new()?).order(0).build()?;

    // Obstacle detector runs with HIGH priority (1)
    scheduler.add(ObstacleDetector::new(0.5)?).order(1).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run obstacle_detector.rs

Key Concepts:

  • LaserScan has 360 range readings (one per degree)
  • scan.min_range() finds closest obstacle
  • scan.is_range_valid(index) checks if reading is good
  • Safety nodes should run at HIGH priority

4. PID Controller

Implement a PID controller for position tracking.

File: pid_controller.rs

use horus::prelude::*;

struct PIDController {
    setpoint_sub: Topic<f32>,    // Desired position
    feedback_sub: Topic<f32>,    // Current position
    output_pub: Topic<f32>,      // Control output

    kp: f32,  // Proportional gain
    ki: f32,  // Integral gain
    kd: f32,  // Derivative gain

    integral: f32,
    last_error: f32,
}

impl PIDController {
    fn new(kp: f32, ki: f32, kd: f32) -> Result<Self> {
        Ok(Self {
            setpoint_sub: Topic::new("setpoint")?,
            feedback_sub: Topic::new("feedback")?,
            output_pub: Topic::new("control_output")?,
            kp, ki, kd,
            integral: 0.0,
            last_error: 0.0,
        })
    }
}

impl Node for PIDController {
    fn name(&self) -> &str { "PIDController" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "PID initialized - Kp: {}, Ki: {}, Kd: {}",
            self.kp, self.ki, self.kd);
        Ok(())
    }

    fn tick(&mut self) {
        // Get setpoint and current position
        let setpoint = self.setpoint_sub.recv().unwrap_or(0.0);
        let feedback = self.feedback_sub.recv().unwrap_or(0.0);

        // Calculate error
        let error = setpoint - feedback;

        // Integral term (accumulated error)
        self.integral += error;

        // Derivative term (rate of change)
        let derivative = error - self.last_error;

        // PID output
        let output = self.kp * error
                   + self.ki * self.integral
                   + self.kd * derivative;

        // Publish control output
        self.output_pub.send(output);

        hlog!(debug, "PID: setpoint={:.2}, feedback={:.2}, error={:.2}, output={:.2}",
            setpoint, feedback, error, output);

        // Update state
        self.last_error = error;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "PID controller stopped");
        Ok(())
    }
}

// Simple test system node
struct TestSystem {
    output_sub: Topic<f32>,
    feedback_pub: Topic<f32>,
    position: f32,
}

impl TestSystem {
    fn new() -> Result<Self> {
        Ok(Self {
            output_sub: Topic::new("control_output")?,
            feedback_pub: Topic::new("feedback")?,
            position: 0.0,
        })
    }
}

impl Node for TestSystem {
    fn name(&self) -> &str { "TestSystem" }

    fn tick(&mut self) {
        // Apply control output to position
        if let Some(output) = self.output_sub.recv() {
            self.position += output * 0.01;  // Simple integration
        }

        // Publish current position
        self.feedback_pub.send(self.position);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Test system stopped");
        Ok(())
    }
}

// Setpoint generator
struct SetpointNode {
    setpoint_pub: Topic<f32>,
}

impl Node for SetpointNode {
    fn name(&self) -> &str { "SetpointNode" }

    fn tick(&mut self) {
        // Target position
        let setpoint = 10.0;
        self.setpoint_pub.send(setpoint);
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Setpoint generator stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Setpoint generator
    scheduler.add(SetpointNode {
        setpoint_pub: Topic::new("setpoint")?,
    }).order(0).build()?;

    // Test system (simulates plant)
    scheduler.add(TestSystem::new()?).order(1).build()?;

    // PID controller (Kp=0.5, Ki=0.01, Kd=0.1)
    scheduler.add(PIDController::new(0.5, 0.01, 0.1)?).order(2).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run pid_controller.rs

Key Concepts:

  • PID = Proportional + Integral + Derivative
  • Proportional: immediate response to error
  • Integral: corrects accumulated error
  • Derivative: dampens oscillations
  • Tune gains (Kp, Ki, Kd) for your system

5. Multi-Node Pipeline

Chain multiple processing stages together.

File: pipeline.rs

use horus::prelude::*;

// Stage 1: Data acquisition
struct SensorNode {
    raw_pub: Topic<f32>,
    counter: f32,
}

impl SensorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            raw_pub: Topic::new("raw_data")?,
            counter: 0.0,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &str { "SensorNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Stage 1: Sensor online");
        Ok(())
    }

    fn tick(&mut self) {
        // Simulate noisy sensor
        let raw_data = 50.0 + (self.counter * 0.5).sin() * 20.0;
        self.raw_pub.send(raw_data);

        hlog!(debug, "Raw: {:.2}", raw_data);

        self.counter += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor offline");
        Ok(())
    }
}

// Stage 2: Filtering
struct FilterNode {
    raw_sub: Topic<f32>,
    filtered_pub: Topic<f32>,
    alpha: f32,  // Low-pass filter coefficient
    filtered_value: f32,
}

impl FilterNode {
    fn new(alpha: f32) -> Result<Self> {
        Ok(Self {
            raw_sub: Topic::new("raw_data")?,
            filtered_pub: Topic::new("filtered_data")?,
            alpha,
            filtered_value: 0.0,
        })
    }
}

impl Node for FilterNode {
    fn name(&self) -> &str { "FilterNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Stage 2: Filter online");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(raw) = self.raw_sub.recv() {
            // Exponential moving average
            self.filtered_value = self.alpha * raw
                                + (1.0 - self.alpha) * self.filtered_value;

            self.filtered_pub.send(self.filtered_value);
            hlog!(debug, "Filtered: {:.2}", self.filtered_value);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Filter offline");
        Ok(())
    }
}

// Stage 3: Decision making
struct ControllerNode {
    filtered_sub: Topic<f32>,
    cmd_pub: Topic<f32>,
    threshold: f32,
}

impl ControllerNode {
    fn new(threshold: f32) -> Result<Self> {
        Ok(Self {
            filtered_sub: Topic::new("filtered_data")?,
            cmd_pub: Topic::new("commands")?,
            threshold,
        })
    }
}

impl Node for ControllerNode {
    fn name(&self) -> &str { "ControllerNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Stage 3: Controller online");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(value) = self.filtered_sub.recv() {
            let command = if value > self.threshold { 1.0 } else { 0.0 };
            self.cmd_pub.send(command);

            hlog!(info, "Value: {:.2}, Threshold: {:.2}, Command: {:.0}",
                value, self.threshold, command);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Controller offline");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Add pipeline stages in priority order
    scheduler.add(SensorNode::new()?).order(0).build()?;
    scheduler.add(FilterNode::new(0.2)?).order(1).build()?;
    scheduler.add(ControllerNode::new(50.0)?).order(2).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run pipeline.rs

Key Concepts:

  • Data flows: Sensor Filter Controller
  • Each stage has different priority (0, 1, 2)
  • Exponential moving average: filtered = α * new + (1-α) * old
  • Priorities ensure correct execution order

6. Camera Image Pipeline

Send and receive camera images using Image with zero-copy shared memory.

Camera Sender

File: camera_sender.rs

use horus::prelude::*;

struct CameraSender {
    topic: Topic<Image>,
}

impl Node for CameraSender {
    fn name(&self) -> &str { "CameraSender" }

    fn tick(&mut self) {
        // Create a 480x640 RGB8 image in shared memory
        let mut img = Image::new(480, 640, ImageEncoding::Rgb8)
            .expect("failed to allocate image");

        // Fill with a solid color (blue)
        img.fill(&[0, 0, 255]);

        // Set a red pixel at (100, 200)
        img.set_pixel(100, 200, &[255, 0, 0]);

        // Send — only a lightweight descriptor is transmitted.
        // The pixel data stays in shared memory (zero-copy).
        self.topic.send(&img);
        hlog!(debug, "Sent {}x{} image", img.width(), img.height());
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Camera sender stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(CameraSender {
        topic: Topic::new("camera.rgb")?,
    }).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Camera Receiver

File: camera_receiver.rs

use horus::prelude::*;

struct CameraReceiver {
    topic: Topic<Image>,
}

impl Node for CameraReceiver {
    fn name(&self) -> &str { "CameraReceiver" }

    fn tick(&mut self) {
        if let Some(img) = self.topic.recv() {
            // Read a pixel
            if let Some(px) = img.pixel(100, 200) {
                hlog!(info, "Pixel at (100,200): R={} G={} B={}",
                    px[0], px[1], px[2]);
            }

            hlog!(debug, "Received {}x{} {:?} image ({} bytes)",
                img.width(), img.height(), img.encoding(), img.data().len());
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Camera receiver stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(CameraReceiver {
        topic: Topic::new("camera.rgb")?,
    }).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Run it (two terminals):

# Terminal 1
horus run camera_sender.rs

# Terminal 2
horus run camera_receiver.rs

Key Concepts:

  • Image::new(width, height, encoding) allocates from a global shared memory pool
  • topic.send(&img) sends only a lightweight descriptor; pixel data stays in shared memory
  • topic.recv() returns Option<Image> — zero-copy access to the sender's data
  • pixel() / set_pixel() / fill() / roi() for pixel-level access

7. Point Cloud Processing

Create, send, and process 3D point clouds.

File: pointcloud_demo.rs

use horus::prelude::*;

struct PointCloudSender {
    topic: Topic<PointCloud>,
}

impl Node for PointCloudSender {
    fn name(&self) -> &str { "PointCloudSender" }

    fn tick(&mut self) {
        // Create a point cloud: 1000 points, 3 fields each (XYZ), float32
        let mut pc = PointCloud::new(1000, 3, TensorDtype::F32)
            .expect("failed to allocate point cloud");

        // Fill with sample data — a sphere of radius 1.0
        let floats: &mut [f32] = bytemuck::cast_slice_mut(pc.data_mut());
        for i in 0..1000 {
            let theta = (i as f32 / 1000.0) * std::f32::consts::TAU;
            let phi = (i as f32 / 1000.0) * std::f32::consts::PI;
            floats[i * 3] = phi.sin() * theta.cos();     // x
            floats[i * 3 + 1] = phi.sin() * theta.sin(); // y
            floats[i * 3 + 2] = phi.cos();               // z
        }

        pc.set_frame_id("lidar_frame");
        self.topic.send(&pc);
        hlog!(debug, "Sent {} points", pc.point_count());
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "PointCloud sender stopped");
        Ok(())
    }
}

struct PointCloudReceiver {
    topic: Topic<PointCloud>,
}

impl Node for PointCloudReceiver {
    fn name(&self) -> &str { "PointCloudReceiver" }

    fn tick(&mut self) {
        if let Some(pc) = self.topic.recv() {
            // Extract all XYZ points as Vec<[f32; 3]>
            if let Some(points) = pc.extract_xyz() {
                let centroid = points.iter().fold([0.0f32; 3], |acc, p| {
                    [acc[0] + p[0], acc[1] + p[1], acc[2] + p[2]]
                });
                let n = points.len() as f32;
                hlog!(info, "Received {} points, centroid: ({:.2}, {:.2}, {:.2})",
                    points.len(),
                    centroid[0] / n, centroid[1] / n, centroid[2] / n);
            }
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "PointCloud receiver stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    scheduler.add(PointCloudSender {
        topic: Topic::new("lidar.points")?,
    }).order(0).build()?;

    scheduler.add(PointCloudReceiver {
        topic: Topic::new("lidar.points")?,
    }).order(1).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run pointcloud_demo.rs

Key Concepts:

  • PointCloud::new(num_points, fields_per_point, dtype) — 3=XYZ, 4=XYZI, 6=XYZRGB
  • extract_xyz() returns Option<Vec<[f32; 3]>> for F32 clouds
  • point_count(), fields_per_point(), is_xyz() for metadata
  • Uses the same zero-copy shared memory transport as Image

Next Steps

Now that you understand basic patterns, explore:

Stuck? Check: