Basic Examples
Learn HORUS fundamentals through simple, focused examples. Each example is complete and runnable with horus run.
Estimated time: 30-45 minutes
Prerequisites
- HORUS installed (Installation Guide)
- Completed Quick Start
- Basic Rust knowledge
1. Basic Publisher-Subscriber
The foundational pattern in HORUS: one node publishes data, another subscribes.
Publisher Node
File: publisher.rs
use horus::prelude::*;
// Define publisher node
struct SensorNode {
data_pub: Topic<f32>,
counter: f32,
}
impl SensorNode {
fn new() -> Result<Self> {
Ok(Self {
data_pub: Topic::new("sensor_data")?,
counter: 0.0,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &str {
"SensorNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Sensor initialized");
Ok(())
}
fn tick(&mut self) {
// Simulate sensor reading
let reading = self.counter.sin() * 10.0;
// Publish data
self.data_pub.send(reading);
hlog!(debug, "Published: {:.2}", reading);
self.counter += 0.1;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor shutdown");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new()?).order(0).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run publisher.rs
Subscriber Node
File: subscriber.rs
use horus::prelude::*;
struct ProcessorNode {
data_sub: Topic<f32>,
}
impl ProcessorNode {
fn new() -> Result<Self> {
Ok(Self {
data_sub: Topic::new("sensor_data")?,
})
}
}
impl Node for ProcessorNode {
fn name(&self) -> &str {
"ProcessorNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Processor initialized");
Ok(())
}
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
// Process received data
let processed = data * 2.0;
hlog!(debug, "Processed: {:.2}", processed);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Processor shutdown");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(ProcessorNode::new()?).order(0).build()?;
scheduler.run()?;
Ok(())
}
Run it:
HORUS uses a flat namespace (like ROS), so processes automatically share topics:
# Terminal 1
horus run publisher.rs
# Terminal 2 (automatically connects!)
horus run subscriber.rs
Both use the same topic name ("sensor_data") → communication works automatically!
Combined Application
File: pubsub.rs
use horus::prelude::*;
// Publisher
struct SensorNode {
data_pub: Topic<f32>,
counter: f32,
}
impl Node for SensorNode {
fn name(&self) -> &str { "SensorNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Sensor online");
Ok(())
}
fn tick(&mut self) {
let reading = self.counter.sin() * 10.0;
self.data_pub.send(reading);
self.counter += 0.1;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor offline");
Ok(())
}
}
// Subscriber
struct ProcessorNode {
data_sub: Topic<f32>,
}
impl Node for ProcessorNode {
fn name(&self) -> &str { "ProcessorNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Processor online");
Ok(())
}
fn tick(&mut self) {
if let Some(data) = self.data_sub.recv() {
let processed = data * 2.0;
hlog!(info, "Received: {:.2} -> {:.2}", data, processed);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Processor offline");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Add both nodes
scheduler.add(SensorNode {
data_pub: Topic::new("sensor_data")?,
counter: 0.0,
}).order(0).build()?;
scheduler.add(ProcessorNode {
data_sub: Topic::new("sensor_data")?,
}).order(1).build()?;
// Run both nodes together
scheduler.run()?;
Ok(())
}
Run it:
horus run pubsub.rs
Key Concepts:
- Publisher uses
Topic::new("topic")to create publisher - Subscriber uses same topic name
"sensor_data" - Priority matters: Publisher (0) runs before Subscriber (1)
recv()returnsOption<T>- handle None gracefully
2. Robot Velocity Controller
Control a robot using standard CmdVel messages.
File: robot_controller.rs
use horus::prelude::*;
// Keyboard input velocity commands
struct TeleopNode {
cmd_pub: Topic<CmdVel>,
}
impl TeleopNode {
fn new() -> Result<Self> {
Ok(Self {
cmd_pub: Topic::new("cmd_vel")?,
})
}
}
impl Node for TeleopNode {
fn name(&self) -> &str { "TeleopNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Teleop ready - sending movement commands");
Ok(())
}
fn tick(&mut self) {
// Simulate keyboard input (w/a/s/d)
// In real code, read from device::Input
let cmd = CmdVel::new(1.0, 0.5); // Forward + turn right
self.cmd_pub.send(cmd);
}
fn shutdown(&mut self) -> Result<()> {
// Send stop command on shutdown
let stop = CmdVel::zero();
self.cmd_pub.send(stop);
hlog!(info, "Teleop stopped");
Ok(())
}
}
// Velocity commands motor control
struct MotorDriverNode {
cmd_sub: Topic<CmdVel>,
}
impl MotorDriverNode {
fn new() -> Result<Self> {
Ok(Self {
cmd_sub: Topic::new("cmd_vel")?,
})
}
}
impl Node for MotorDriverNode {
fn name(&self) -> &str { "MotorDriverNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Motor driver initialized");
Ok(())
}
fn tick(&mut self) {
if let Some(cmd) = self.cmd_sub.recv() {
// Convert to differential drive (left/right wheel speeds)
let left_speed = cmd.linear - cmd.angular;
let right_speed = cmd.linear + cmd.angular;
// Send to motors
hlog!(debug, "Motors: L={:.2} m/s, R={:.2} m/s",
left_speed, right_speed);
// In real code: send to hardware
// motor_driver.set_speeds(left_speed, right_speed)?;
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Motors stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(TeleopNode::new()?).order(0).build()?;
scheduler.add(MotorDriverNode::new()?).order(1).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run robot_controller.rs
Key Concepts:
CmdVelis a standard robotics message typeCmdVel::new(linear, angular)creates velocity commands- Differential drive:
left = linear - angular,right = linear + angular - Use
shutdown()to send safe stop commands
3. Lidar Obstacle Detection
Process laser scan data to detect obstacles and stop the robot.
File: obstacle_detector.rs
use horus::prelude::*;
// Lidar Scan data
struct LidarNode {
scan_pub: Topic<LaserScan>,
angle: f32,
}
impl LidarNode {
fn new() -> Result<Self> {
Ok(Self {
scan_pub: Topic::new("scan")?,
angle: 0.0,
})
}
}
impl Node for LidarNode {
fn name(&self) -> &str { "LidarNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Lidar initialized");
Ok(())
}
fn tick(&mut self) {
let mut scan = LaserScan::new();
// Simulate lidar readings (sine wave for demo)
for i in 0..360 {
scan.ranges[i] = 5.0 + (self.angle + i as f32 * 0.01).sin() * 2.0;
}
// Add one close obstacle in front
scan.ranges[0] = 0.3; // 30cm directly ahead!
self.scan_pub.send(scan);
self.angle += 0.1;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Lidar offline");
Ok(())
}
}
// Scan data Obstacle detection Stop command
struct ObstacleDetector {
scan_sub: Topic<LaserScan>,
cmd_pub: Topic<CmdVel>,
safety_distance: f32,
}
impl ObstacleDetector {
fn new(safety_distance: f32) -> Result<Self> {
Ok(Self {
scan_sub: Topic::new("scan")?,
cmd_pub: Topic::new("cmd_vel")?,
safety_distance,
})
}
}
impl Node for ObstacleDetector {
fn name(&self) -> &str { "ObstacleDetector" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Obstacle detector active - safety distance: {:.2}m",
self.safety_distance);
Ok(())
}
fn tick(&mut self) {
if let Some(scan) = self.scan_sub.recv() {
// Find minimum distance
if let Some(min_dist) = scan.min_range() {
if min_dist < self.safety_distance {
// Emergency stop!
let stop = CmdVel::zero();
self.cmd_pub.send(stop);
hlog!(warn, "[WARNING] Obstacle detected at {:.2}m - STOPPING!",
min_dist);
} else {
// Safe to move
hlog!(debug, "Safe - closest obstacle: {:.2}m", min_dist);
}
}
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Obstacle detector offline");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(LidarNode::new()?).order(0).build()?;
// Obstacle detector runs with HIGH priority (1)
scheduler.add(ObstacleDetector::new(0.5)?).order(1).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run obstacle_detector.rs
Key Concepts:
LaserScanhas 360 range readings (one per degree)scan.min_range()finds closest obstaclescan.is_range_valid(index)checks if reading is good- Safety nodes should run at HIGH priority
4. PID Controller
Implement a PID controller for position tracking.
File: pid_controller.rs
use horus::prelude::*;
struct PIDController {
setpoint_sub: Topic<f32>, // Desired position
feedback_sub: Topic<f32>, // Current position
output_pub: Topic<f32>, // Control output
kp: f32, // Proportional gain
ki: f32, // Integral gain
kd: f32, // Derivative gain
integral: f32,
last_error: f32,
}
impl PIDController {
fn new(kp: f32, ki: f32, kd: f32) -> Result<Self> {
Ok(Self {
setpoint_sub: Topic::new("setpoint")?,
feedback_sub: Topic::new("feedback")?,
output_pub: Topic::new("control_output")?,
kp, ki, kd,
integral: 0.0,
last_error: 0.0,
})
}
}
impl Node for PIDController {
fn name(&self) -> &str { "PIDController" }
fn init(&mut self) -> Result<()> {
hlog!(info, "PID initialized - Kp: {}, Ki: {}, Kd: {}",
self.kp, self.ki, self.kd);
Ok(())
}
fn tick(&mut self) {
// Get setpoint and current position
let setpoint = self.setpoint_sub.recv().unwrap_or(0.0);
let feedback = self.feedback_sub.recv().unwrap_or(0.0);
// Calculate error
let error = setpoint - feedback;
// Integral term (accumulated error)
self.integral += error;
// Derivative term (rate of change)
let derivative = error - self.last_error;
// PID output
let output = self.kp * error
+ self.ki * self.integral
+ self.kd * derivative;
// Publish control output
self.output_pub.send(output);
hlog!(debug, "PID: setpoint={:.2}, feedback={:.2}, error={:.2}, output={:.2}",
setpoint, feedback, error, output);
// Update state
self.last_error = error;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "PID controller stopped");
Ok(())
}
}
// Simple test system node
struct TestSystem {
output_sub: Topic<f32>,
feedback_pub: Topic<f32>,
position: f32,
}
impl TestSystem {
fn new() -> Result<Self> {
Ok(Self {
output_sub: Topic::new("control_output")?,
feedback_pub: Topic::new("feedback")?,
position: 0.0,
})
}
}
impl Node for TestSystem {
fn name(&self) -> &str { "TestSystem" }
fn tick(&mut self) {
// Apply control output to position
if let Some(output) = self.output_sub.recv() {
self.position += output * 0.01; // Simple integration
}
// Publish current position
self.feedback_pub.send(self.position);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Test system stopped");
Ok(())
}
}
// Setpoint generator
struct SetpointNode {
setpoint_pub: Topic<f32>,
}
impl Node for SetpointNode {
fn name(&self) -> &str { "SetpointNode" }
fn tick(&mut self) {
// Target position
let setpoint = 10.0;
self.setpoint_pub.send(setpoint);
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Setpoint generator stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Setpoint generator
scheduler.add(SetpointNode {
setpoint_pub: Topic::new("setpoint")?,
}).order(0).build()?;
// Test system (simulates plant)
scheduler.add(TestSystem::new()?).order(1).build()?;
// PID controller (Kp=0.5, Ki=0.01, Kd=0.1)
scheduler.add(PIDController::new(0.5, 0.01, 0.1)?).order(2).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run pid_controller.rs
Key Concepts:
- PID = Proportional + Integral + Derivative
- Proportional: immediate response to error
- Integral: corrects accumulated error
- Derivative: dampens oscillations
- Tune gains (Kp, Ki, Kd) for your system
5. Multi-Node Pipeline
Chain multiple processing stages together.
File: pipeline.rs
use horus::prelude::*;
// Stage 1: Data acquisition
struct SensorNode {
raw_pub: Topic<f32>,
counter: f32,
}
impl SensorNode {
fn new() -> Result<Self> {
Ok(Self {
raw_pub: Topic::new("raw_data")?,
counter: 0.0,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &str { "SensorNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Stage 1: Sensor online");
Ok(())
}
fn tick(&mut self) {
// Simulate noisy sensor
let raw_data = 50.0 + (self.counter * 0.5).sin() * 20.0;
self.raw_pub.send(raw_data);
hlog!(debug, "Raw: {:.2}", raw_data);
self.counter += 0.1;
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor offline");
Ok(())
}
}
// Stage 2: Filtering
struct FilterNode {
raw_sub: Topic<f32>,
filtered_pub: Topic<f32>,
alpha: f32, // Low-pass filter coefficient
filtered_value: f32,
}
impl FilterNode {
fn new(alpha: f32) -> Result<Self> {
Ok(Self {
raw_sub: Topic::new("raw_data")?,
filtered_pub: Topic::new("filtered_data")?,
alpha,
filtered_value: 0.0,
})
}
}
impl Node for FilterNode {
fn name(&self) -> &str { "FilterNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Stage 2: Filter online");
Ok(())
}
fn tick(&mut self) {
if let Some(raw) = self.raw_sub.recv() {
// Exponential moving average
self.filtered_value = self.alpha * raw
+ (1.0 - self.alpha) * self.filtered_value;
self.filtered_pub.send(self.filtered_value);
hlog!(debug, "Filtered: {:.2}", self.filtered_value);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Filter offline");
Ok(())
}
}
// Stage 3: Decision making
struct ControllerNode {
filtered_sub: Topic<f32>,
cmd_pub: Topic<f32>,
threshold: f32,
}
impl ControllerNode {
fn new(threshold: f32) -> Result<Self> {
Ok(Self {
filtered_sub: Topic::new("filtered_data")?,
cmd_pub: Topic::new("commands")?,
threshold,
})
}
}
impl Node for ControllerNode {
fn name(&self) -> &str { "ControllerNode" }
fn init(&mut self) -> Result<()> {
hlog!(info, "Stage 3: Controller online");
Ok(())
}
fn tick(&mut self) {
if let Some(value) = self.filtered_sub.recv() {
let command = if value > self.threshold { 1.0 } else { 0.0 };
self.cmd_pub.send(command);
hlog!(info, "Value: {:.2}, Threshold: {:.2}, Command: {:.0}",
value, self.threshold, command);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Controller offline");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Add pipeline stages in priority order
scheduler.add(SensorNode::new()?).order(0).build()?;
scheduler.add(FilterNode::new(0.2)?).order(1).build()?;
scheduler.add(ControllerNode::new(50.0)?).order(2).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run pipeline.rs
Key Concepts:
- Data flows: Sensor Filter Controller
- Each stage has different priority (0, 1, 2)
- Exponential moving average:
filtered = α * new + (1-α) * old - Priorities ensure correct execution order
6. Camera Image Pipeline
Send and receive camera images using Image with zero-copy shared memory.
Camera Sender
File: camera_sender.rs
use horus::prelude::*;
struct CameraSender {
topic: Topic<Image>,
}
impl Node for CameraSender {
fn name(&self) -> &str { "CameraSender" }
fn tick(&mut self) {
// Create a 480x640 RGB8 image in shared memory
let mut img = Image::new(480, 640, ImageEncoding::Rgb8)
.expect("failed to allocate image");
// Fill with a solid color (blue)
img.fill(&[0, 0, 255]);
// Set a red pixel at (100, 200)
img.set_pixel(100, 200, &[255, 0, 0]);
// Send — only a lightweight descriptor is transmitted.
// The pixel data stays in shared memory (zero-copy).
self.topic.send(&img);
hlog!(debug, "Sent {}x{} image", img.width(), img.height());
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Camera sender stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(CameraSender {
topic: Topic::new("camera.rgb")?,
}).order(0).build()?;
scheduler.run()?;
Ok(())
}
Camera Receiver
File: camera_receiver.rs
use horus::prelude::*;
struct CameraReceiver {
topic: Topic<Image>,
}
impl Node for CameraReceiver {
fn name(&self) -> &str { "CameraReceiver" }
fn tick(&mut self) {
if let Some(img) = self.topic.recv() {
// Read a pixel
if let Some(px) = img.pixel(100, 200) {
hlog!(info, "Pixel at (100,200): R={} G={} B={}",
px[0], px[1], px[2]);
}
hlog!(debug, "Received {}x{} {:?} image ({} bytes)",
img.width(), img.height(), img.encoding(), img.data().len());
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Camera receiver stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(CameraReceiver {
topic: Topic::new("camera.rgb")?,
}).order(0).build()?;
scheduler.run()?;
Ok(())
}
Run it (two terminals):
# Terminal 1
horus run camera_sender.rs
# Terminal 2
horus run camera_receiver.rs
Key Concepts:
Image::new(width, height, encoding)allocates from a global shared memory pooltopic.send(&img)sends only a lightweight descriptor; pixel data stays in shared memorytopic.recv()returnsOption<Image>— zero-copy access to the sender's datapixel()/set_pixel()/fill()/roi()for pixel-level access
7. Point Cloud Processing
Create, send, and process 3D point clouds.
File: pointcloud_demo.rs
use horus::prelude::*;
struct PointCloudSender {
topic: Topic<PointCloud>,
}
impl Node for PointCloudSender {
fn name(&self) -> &str { "PointCloudSender" }
fn tick(&mut self) {
// Create a point cloud: 1000 points, 3 fields each (XYZ), float32
let mut pc = PointCloud::new(1000, 3, TensorDtype::F32)
.expect("failed to allocate point cloud");
// Fill with sample data — a sphere of radius 1.0
let floats: &mut [f32] = bytemuck::cast_slice_mut(pc.data_mut());
for i in 0..1000 {
let theta = (i as f32 / 1000.0) * std::f32::consts::TAU;
let phi = (i as f32 / 1000.0) * std::f32::consts::PI;
floats[i * 3] = phi.sin() * theta.cos(); // x
floats[i * 3 + 1] = phi.sin() * theta.sin(); // y
floats[i * 3 + 2] = phi.cos(); // z
}
pc.set_frame_id("lidar_frame");
self.topic.send(&pc);
hlog!(debug, "Sent {} points", pc.point_count());
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "PointCloud sender stopped");
Ok(())
}
}
struct PointCloudReceiver {
topic: Topic<PointCloud>,
}
impl Node for PointCloudReceiver {
fn name(&self) -> &str { "PointCloudReceiver" }
fn tick(&mut self) {
if let Some(pc) = self.topic.recv() {
// Extract all XYZ points as Vec<[f32; 3]>
if let Some(points) = pc.extract_xyz() {
let centroid = points.iter().fold([0.0f32; 3], |acc, p| {
[acc[0] + p[0], acc[1] + p[1], acc[2] + p[2]]
});
let n = points.len() as f32;
hlog!(info, "Received {} points, centroid: ({:.2}, {:.2}, {:.2})",
points.len(),
centroid[0] / n, centroid[1] / n, centroid[2] / n);
}
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "PointCloud receiver stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(PointCloudSender {
topic: Topic::new("lidar.points")?,
}).order(0).build()?;
scheduler.add(PointCloudReceiver {
topic: Topic::new("lidar.points")?,
}).order(1).build()?;
scheduler.run()?;
Ok(())
}
Run it:
horus run pointcloud_demo.rs
Key Concepts:
PointCloud::new(num_points, fields_per_point, dtype)— 3=XYZ, 4=XYZI, 6=XYZRGBextract_xyz()returnsOption<Vec<[f32; 3]>>for F32 cloudspoint_count(),fields_per_point(),is_xyz()for metadata- Uses the same zero-copy shared memory transport as Image
Next Steps
Now that you understand basic patterns, explore:
- Second Application Tutorial - Build a 3-node sensor pipeline
- Advanced Examples - Multi-process systems, Python integration
- Testing - Write tests for your nodes
- Using Prebuilt Nodes - Leverage the standard library
- Message Types - Complete message reference
Stuck? Check:
- Troubleshooting - Fix common issues
- Monitor - Debug with the web UI