Building Your Second Application

Now that you've built your first HORUS application, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.

What You'll Build

A real-time temperature monitoring system with:

  1. SensorNode: Publishes simulated temperature readings every second
  2. FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
  3. DisplayNode: Subscribes to filtered data, displays to console

This demonstrates:

  • Multi-node communication patterns
  • Data pipeline processing
  • Real-time filtering
  • Monitor monitoring

Architecture

Loading diagram...
Temperature pipeline: SensorNode → FilterNode → DisplayNode

Step 1: Create the Project

horus new temperature_pipeline
cd temperature_pipeline

Step 2: Write the Code

Replace main.rs with this complete, runnable code:

use horus::prelude::*;
use std::time::{Duration, Instant};

// ============================================================================
// Node 1: SensorNode - Publishes temperature readings
// ============================================================================

struct SensorNode {
    temp_pub: Topic<f32>,
    last_publish: Instant,
    reading: f32,
}

impl SensorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("raw_temp")?,
            last_publish: Instant::now(),
            reading: 20.0,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &str {
        "SensorNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Temperature sensor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        // Publish every 1 second
        if self.last_publish.elapsed() >= Duration::from_secs(1) {
            // Simulate realistic temperature with noise
            // Base temperature oscillates between 20-30°C
            let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;

            // Add random noise (+/- 2°C)
            let noise = (self.reading * 0.7).sin() * 2.0;
            let temperature = base_temp + noise;

            // Publish raw temperature
            self.temp_pub.send(temperature);

            hlog!(info, "Published raw temp: {:.2}°C", temperature);

            self.reading += 1.0;
            self.last_publish = Instant::now();
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Node 2: FilterNode - Removes noise with exponential moving average
// ============================================================================

struct FilterNode {
    raw_sub: Topic<f32>,
    filtered_pub: Topic<f32>,
    filtered_value: Option<f32>,
    alpha: f32,  // Smoothing factor (0.0 - 1.0)
}

impl FilterNode {
    fn new() -> Result<Self> {
        Ok(Self {
            raw_sub: Topic::new("raw_temp")?,
            filtered_pub: Topic::new("filtered_temp")?,
            filtered_value: None,
            alpha: 0.3,  // 30% new data, 70% previous (smooth but responsive)
        })
    }
}

impl Node for FilterNode {
    fn name(&self) -> &str {
        "FilterNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Filter initialized (alpha = {:.2})", self.alpha);
        Ok(())
    }

    fn tick(&mut self) {
        // Check for new temperature reading
        if let Some(raw_temp) = self.raw_sub.recv() {
            // Apply exponential moving average filter
            let filtered = match self.filtered_value {
                Some(prev) => self.alpha * raw_temp + (1.0 - self.alpha) * prev,
                None => raw_temp,  // First reading, no previous value
            };

            self.filtered_value = Some(filtered);

            // Publish filtered temperature
            self.filtered_pub.send(filtered);

            hlog!(info, "Filtered: {:.2}°C -> {:.2}°C (removed {:.2}°C noise)",
                    raw_temp, filtered, raw_temp - filtered);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Filter shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Node 3: DisplayNode - Shows filtered temperature on console
// ============================================================================

struct DisplayNode {
    filtered_sub: Topic<f32>,
    display_counter: u32,
}

impl DisplayNode {
    fn new() -> Result<Self> {
        Ok(Self {
            filtered_sub: Topic::new("filtered_temp")?,
            display_counter: 0,
        })
    }
}

impl Node for DisplayNode {
    fn name(&self) -> &str {
        "DisplayNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Display initialized");
        println!("\n========================================");
        println!("  Temperature Monitor - Press Ctrl+C to stop");
        println!("========================================\n");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(temp) = self.filtered_sub.recv() {
            self.display_counter += 1;

            // Display temperature with visual indicator
            let status = if temp < 22.0 {
                "COLD"
            } else if temp > 28.0 {
                "HOT"
            } else {
                "NORMAL"
            };

            println!(
                "[Reading #{}] Temperature: {:.1}°C - Status: {}",
                self.display_counter, temp, status
            );

            hlog!(debug, "Displayed reading #{}", self.display_counter);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        println!("\n========================================");
        println!("  Total readings displayed: {}", self.display_counter);
        println!("========================================\n");
        hlog!(info, "Display shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Main Application - Configure and run the scheduler
// ============================================================================

fn main() -> Result<()> {
    println!("Starting Temperature Pipeline...\n");

    let mut scheduler = Scheduler::new();

    // Add nodes in priority order:
    // 1. SensorNode (order 0) - Runs first to generate data
    scheduler.add(SensorNode::new()?).order(0).build()?;

    // 2. FilterNode (order 1) - Runs second to process data
    scheduler.add(FilterNode::new()?).order(1).build()?;

    // 3. DisplayNode (order 2) - Runs last to display results
    scheduler.add(DisplayNode::new()?).order(2).build()?;

    println!("All nodes initialized. Running...\n");

    // Run the scheduler (blocks until Ctrl+C)
    scheduler.run()?;

    Ok(())
}

Step 3: Run the Application

horus run

Expected Output:

Starting Temperature Pipeline...

All nodes initialized. Running...

========================================
  Temperature Monitor - Press Ctrl+C to stop
========================================

[Reading #1] Temperature: 23.4°C - Status: NORMAL
[Reading #2] Temperature: 24.1°C - Status: NORMAL
[Reading #3] Temperature: 25.8°C - Status: NORMAL
[Reading #4] Temperature: 27.2°C - Status: NORMAL
[Reading #5] Temperature: 28.6°C - Status: HOT
[Reading #6] Temperature: 27.9°C - Status: NORMAL
[Reading #7] Temperature: 26.3°C - Status: NORMAL

Press Ctrl+C to stop:

^C
Ctrl+C received! Shutting down HORUS scheduler...

========================================
  Total readings displayed: 7
========================================

Step 4: Monitor with Monitor

Open a second terminal and run:

horus monitor

The monitor will show:

Nodes Tab

  • SensorNode: Publishing to raw_temp every ~1 second
  • FilterNode: Subscribing to raw_temp, publishing to filtered_temp
  • DisplayNode: Subscribing to filtered_temp

Topics Tab

  • raw_temp (f32): Noisy temperature readings
  • filtered_temp (f32): Smooth temperature readings

Metrics Tab

  • IPC Latency: ~85ns-437ns (sub-microsecond!)
  • Tick Duration: How long each node takes to execute
  • Message Counts: Total messages sent/received

Understanding the Code

SensorNode

// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
    let temperature = 25.0 + noise;
    self.temp_pub.send(temperature);
}

Key Points:

  • Uses Instant to track time between publishes
  • Simulates realistic sensor data with noise
  • Publishes to "raw_temp" topic

FilterNode

// Exponential moving average filter
let filtered = self.alpha * raw_temp + (1.0 - self.alpha) * prev;
self.filtered_pub.send(filtered);

Key Points:

  • Subscribes to "raw_temp", publishes to "filtered_temp"
  • Implements exponential moving average (EMA) filter
  • alpha = 0.3 balances responsiveness vs smoothness

Filter Behavior:

  • High alpha (0.8): Fast response, less smoothing
  • Low alpha (0.2): Slow response, more smoothing

DisplayNode

if let Some(temp) = self.filtered_sub.recv() {
    println!("[Reading #{}] Temperature: {:.1}°C", count, temp);
}

Key Points:

  • Subscribes to "filtered_temp"
  • Only receives when new data is available
  • recv() returns None when no message (not an error!)

Common Issues & Fixes

Issue 1: No Output Displayed

Symptom:

Starting Temperature Pipeline...
All nodes initialized. Running...

========================================
  Temperature Monitor - Press Ctrl+C to stop
========================================

[Nothing appears]

Cause: Topics not connecting (typo in topic names)

Fix:

  • Check topic names match exactly: "raw_temp" and "filtered_temp"
  • Verify with monitor: horus monitor -> Topics tab
  • Ensure all nodes are running in same scheduler

Issue 2: Too Much/Too Little Smoothing

Symptom: Temperature changes too fast or too slow

Fix: Adjust the alpha value in FilterNode:

alpha: 0.3,  // Current: moderate smoothing

// Try these alternatives:
alpha: 0.7,  // More responsive, less smooth
alpha: 0.1,  // Very smooth, slower response

Issue 3: Monitor Shows No Nodes

Symptom: Monitor is empty

Cause: Application not running or monitor started before app

Fix:

  1. Start the application first: horus run
  2. Then start monitor in separate terminal: horus monitor
  3. Monitor auto-discovers running nodes

Issue 4: Build Errors

Symptom:

error[E0433]: failed to resolve: use of undeclared type `Topic`

Fix:

  • Ensure HORUS is installed: horus --help
  • Check import: use horus::prelude::*;
  • Run from project directory (where horus.toml is)

Experiments to Try

1. Change Update Rate

Make the sensor publish faster:

// In SensorNode::tick()
if self.last_publish.elapsed() >= Duration::from_millis(500) {  // 2 Hz instead of 1 Hz

2. Add Temperature Alerts

Add to DisplayNode:

if temp > 30.0 {
    println!("  WARNING: High temperature detected!");
}

3. Log Data to File

Add to DisplayNode::tick():

use std::fs::OpenOptions;
use std::io::Write;

let mut file = OpenOptions::new()
    .create(true)
    .append(true)
    .open("temperature_log.txt")
    .unwrap();

writeln!(file, "{:.1}", temp).ok();

4. Use Rate-Limited Logging

In a real robot running at 1kHz, you don't want every tick flooding the log. Use hlog_once! for one-time events and hlog_every! for periodic status:

fn tick(&mut self) {
    // Log once when sensor first connects
    hlog_once!(info, "Sensor online, streaming data");

    // Log status every 5 seconds (not every tick)
    hlog_every!(5000, info, "Filter running — last value: {:.1}°C", self.filtered_value.unwrap_or(0.0));

    // Log warnings at most once per second
    if let Some(temp) = self.raw_sub.recv() {
        if temp > 35.0 {
            hlog_every!(1000, warn, "High temperature: {:.1}°C", temp);
        }
    }
}

5. Add Multiple Sensors

Create a second sensor node:

// In main()
scheduler.add(SensorNode::new()?).order(0).build()?;  // Sensor 1
scheduler.add(SensorNode::new()?).order(0).build()?;  // Sensor 2

Both will publish to the same topic, and FilterNode will process both!

Key Concepts Demonstrated

Pipeline Pattern: Data flows through stages (Sensor -> Filter -> Display)

Pub/Sub Decoupling: Nodes don't know about each other, only topics

Real-Time Processing: Filtering happens as data arrives

Shared Memory IPC: Sub-microsecond communication between nodes

Priority Scheduling: Sensor runs before filter, filter before display

Shorter Version: node! Macro

The entire pipeline above can be written more concisely with the node! macro. Here's the same 3-node system in roughly half the code:

use horus::prelude::*;
use std::time::{Duration, Instant};

node! {
    SensorNode {
        pub { temp_pub: f32 -> "raw_temp" }
        data {
            last_publish: Instant = Instant::now(),
            reading: f32 = 20.0,
        }
        init {
            hlog!(info, "Temperature sensor initialized");
            Ok(())
        }
        tick {
            if self.last_publish.elapsed() >= Duration::from_secs(1) {
                let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
                let noise = (self.reading * 0.7).sin() * 2.0;
                self.temp_pub.send(base_temp + noise);
                self.reading += 1.0;
                self.last_publish = Instant::now();
            }
        }
    }
}

node! {
    FilterNode {
        sub { raw_sub: f32 -> "raw_temp" }
        pub { filtered_pub: f32 -> "filtered_temp" }
        data {
            filtered_value: Option<f32> = None,
            alpha: f32 = 0.3,
        }
        tick {
            if let Some(raw) = self.raw_sub.recv() {
                let filtered = match self.filtered_value {
                    Some(prev) => self.alpha * raw + (1.0 - self.alpha) * prev,
                    None => raw,
                };
                self.filtered_value = Some(filtered);
                self.filtered_pub.send(filtered);
            }
        }
    }
}

node! {
    DisplayNode {
        sub { filtered_sub: f32 -> "filtered_temp" }
        data { count: u32 = 0 }
        tick {
            if let Some(temp) = self.filtered_sub.recv() {
                self.count += 1;
                let status = if temp < 22.0 { "COLD" } else if temp > 28.0 { "HOT" } else { "NORMAL" };
                println!("[#{}] {:.1}°C - {}", self.count, temp, status);
            }
        }
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new()).order(0).build()?;
    scheduler.add(FilterNode::new()).order(1).build()?;
    scheduler.add(DisplayNode::new()).order(2).build()?;
    scheduler.run()?;
    Ok(())
}

The node! macro generates the struct, constructor, and Node trait implementation. Both versions produce identical runtime behavior. See the node! Macro Guide for the full syntax.

Next Steps

Now that you've built a 3-node pipeline, try:

  1. Testing - Learn how to unit test your nodes
  2. Using Pre-Built Nodes - Use library nodes instead of writing from scratch
  3. node! Macro - Reduce boilerplate with macros
  4. Message Types - Use complex message types instead of primitives

Full Code

The complete code above is production-ready. To save it:

  1. Copy the entire code block
  2. Replace main.rs in your project
  3. Run with horus run

For additional examples, see Basic Examples.