Building Your Second Application

Now that you've built your first HORUS application, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.

Prerequisites

Time: ~20 minutes

What You'll Build

A real-time temperature monitoring system with:

  1. SensorNode: Publishes simulated temperature readings every second
  2. FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
  3. DisplayNode: Subscribes to filtered data, displays to console

This demonstrates:

  • Multi-node communication patterns
  • Data pipeline processing
  • Real-time filtering
  • Monitor monitoring

Architecture

Temperature pipeline: SensorNode → FilterNode → DisplayNode

Step 1: Create the Project

horus new temperature_pipeline
cd temperature_pipeline

Step 2: Write the Code

Replace src/main.rs with this complete, runnable code:

use horus::prelude::*;
use std::time::{Duration, Instant};

// ============================================================================
// Node 1: SensorNode - Publishes temperature readings
// ============================================================================

struct SensorNode {
    temp_pub: Topic<f32>,
    last_publish: Instant,
    reading: f32,
}

impl SensorNode {
    fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("raw_temp")?,
            last_publish: Instant::now(),
            reading: 20.0,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &str {
        "SensorNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Temperature sensor initialized");
        Ok(())
    }

    fn tick(&mut self) {
        // Publish every 1 second
        if self.last_publish.elapsed() >= Duration::from_secs(1) {
            // Simulate realistic temperature with noise
            // Base temperature oscillates between 20-30°C
            let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;

            // Add random noise (+/- 2°C)
            let noise = (self.reading * 0.7).sin() * 2.0;
            let temperature = base_temp + noise;

            // Publish raw temperature
            self.temp_pub.send(temperature);

            hlog!(info, "Published raw temp: {:.2}°C", temperature);

            self.reading += 1.0;
            self.last_publish = Instant::now();
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Sensor shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Node 2: FilterNode - Removes noise with exponential moving average
// ============================================================================

struct FilterNode {
    raw_sub: Topic<f32>,
    filtered_pub: Topic<f32>,
    filtered_value: Option<f32>,
    alpha: f32,  // Smoothing factor (0.0 - 1.0)
}

impl FilterNode {
    fn new() -> Result<Self> {
        Ok(Self {
            raw_sub: Topic::new("raw_temp")?,
            filtered_pub: Topic::new("filtered_temp")?,
            filtered_value: None,
            alpha: 0.3,  // 30% new data, 70% previous (smooth but responsive)
        })
    }
}

impl Node for FilterNode {
    fn name(&self) -> &str {
        "FilterNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Filter initialized (alpha = {:.2})", self.alpha);
        Ok(())
    }

    fn tick(&mut self) {
        // Check for new temperature reading
        if let Some(raw_temp) = self.raw_sub.recv() {
            // Apply exponential moving average filter
            let filtered = match self.filtered_value {
                Some(prev) => self.alpha * raw_temp + (1.0 - self.alpha) * prev,
                None => raw_temp,  // First reading, no previous value
            };

            self.filtered_value = Some(filtered);

            // Publish filtered temperature
            self.filtered_pub.send(filtered);

            hlog!(info, "Filtered: {:.2}°C -> {:.2}°C (removed {:.2}°C noise)",
                    raw_temp, filtered, raw_temp - filtered);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Filter shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Node 3: DisplayNode - Shows filtered temperature on console
// ============================================================================

struct DisplayNode {
    filtered_sub: Topic<f32>,
    display_counter: u32,
}

impl DisplayNode {
    fn new() -> Result<Self> {
        Ok(Self {
            filtered_sub: Topic::new("filtered_temp")?,
            display_counter: 0,
        })
    }
}

impl Node for DisplayNode {
    fn name(&self) -> &str {
        "DisplayNode"
    }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Display initialized");
        println!("\n========================================");
        println!("  Temperature Monitor - Press Ctrl+C to stop");
        println!("========================================\n");
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(temp) = self.filtered_sub.recv() {
            self.display_counter += 1;

            // Display temperature with visual indicator
            let status = if temp < 22.0 {
                "COLD"
            } else if temp > 28.0 {
                "HOT"
            } else {
                "NORMAL"
            };

            println!(
                "[Reading #{}] Temperature: {:.1}°C - Status: {}",
                self.display_counter, temp, status
            );

            hlog!(debug, "Displayed reading #{}", self.display_counter);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        println!("\n========================================");
        println!("  Total readings displayed: {}", self.display_counter);
        println!("========================================\n");
        hlog!(info, "Display shutdown complete");
        Ok(())
    }
}

// ============================================================================
// Main Application - Configure and run the scheduler
// ============================================================================

fn main() -> Result<()> {
    println!("Starting Temperature Pipeline...\n");

    let mut scheduler = Scheduler::new();

    // Add nodes in priority order:
    // 1. SensorNode (order 0) - Runs first to generate data
    scheduler.add(SensorNode::new()?).order(0).build()?;

    // 2. FilterNode (order 1) - Runs second to process data
    scheduler.add(FilterNode::new()?).order(1).build()?;

    // 3. DisplayNode (order 2) - Runs last to display results
    scheduler.add(DisplayNode::new()?).order(2).build()?;

    // Optional: Enable watchdog for production use.
    // If any node hangs for more than 5 seconds, the scheduler
    // logs a warning (1x), skips the node (2x), then isolates it (3x).
    // See the Safety Monitor guide for details.

    println!("All nodes initialized. Running...\n");

    // Run the scheduler (blocks until Ctrl+C)
    scheduler.run()?;

    Ok(())
}

Step 3: Run the Application

horus run

Expected Output:

Starting Temperature Pipeline...

All nodes initialized. Running...

========================================
  Temperature Monitor - Press Ctrl+C to stop
========================================

[Reading #1] Temperature: 23.4°C - Status: NORMAL
[Reading #2] Temperature: 24.1°C - Status: NORMAL
[Reading #3] Temperature: 25.8°C - Status: NORMAL
[Reading #4] Temperature: 27.2°C - Status: NORMAL
[Reading #5] Temperature: 28.6°C - Status: HOT
[Reading #6] Temperature: 27.9°C - Status: NORMAL
[Reading #7] Temperature: 26.3°C - Status: NORMAL

Press Ctrl+C to stop:

^C
Ctrl+C received! Shutting down HORUS scheduler...

========================================
  Total readings displayed: 7
========================================

Step 4: Monitor with Monitor

Open a second terminal and run:

horus monitor

The monitor will show:

Nodes Tab

  • SensorNode: Publishing to raw_temp every ~1 second
  • FilterNode: Subscribing to raw_temp, publishing to filtered_temp
  • DisplayNode: Subscribing to filtered_temp

Topics Tab

  • raw_temp (f32): Noisy temperature readings
  • filtered_temp (f32): Smooth temperature readings

Metrics Tab

  • IPC Latency: ~85ns-437ns (sub-microsecond!)
  • Tick Duration: How long each node takes to execute
  • Message Counts: Total messages sent/received

Understanding the Code

SensorNode

// simplified
// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
    let temperature = 25.0 + noise;
    self.temp_pub.send(temperature);
}

Key Points:

  • Uses Instant to track time between publishes
  • Simulates realistic sensor data with noise
  • Publishes to "raw_temp" topic

FilterNode

// simplified
// Exponential moving average filter
let filtered = self.alpha * raw_temp + (1.0 - self.alpha) * prev;
self.filtered_pub.send(filtered);

Key Points:

  • Subscribes to "raw_temp", publishes to "filtered_temp"
  • Implements exponential moving average (EMA) filter
  • alpha = 0.3 balances responsiveness vs smoothness

Filter Behavior:

  • High alpha (0.8): Fast response, less smoothing
  • Low alpha (0.2): Slow response, more smoothing

DisplayNode

// simplified
if let Some(temp) = self.filtered_sub.recv() {
    println!("[Reading #{}] Temperature: {:.1}°C", count, temp);
}

Key Points:

  • Subscribes to "filtered_temp"
  • Only receives when new data is available
  • recv() returns None when no message (not an error!)

Common Issues & Fixes

Issue 1: No Output Displayed

Symptom:

Starting Temperature Pipeline...
All nodes initialized. Running...

========================================
  Temperature Monitor - Press Ctrl+C to stop
========================================

[Nothing appears]

Cause: Topics not connecting (typo in topic names)

Fix:

  • Check topic names match exactly: "raw_temp" and "filtered_temp"
  • Verify with monitor: horus monitor -> Topics tab
  • Ensure all nodes are running in same scheduler

Issue 2: Too Much/Too Little Smoothing

Symptom: Temperature changes too fast or too slow

Fix: Adjust the alpha value in FilterNode:

// simplified
alpha: 0.3,  // Current: moderate smoothing

// Try these alternatives:
alpha: 0.7,  // More responsive, less smooth
alpha: 0.1,  // Very smooth, slower response

Issue 3: Monitor Shows No Nodes

Symptom: Monitor is empty

Cause: Application not running or monitor started before app

Fix:

  1. Start the application first: horus run
  2. Then start monitor in separate terminal: horus monitor
  3. Monitor auto-discovers running nodes

Issue 4: Build Errors

Symptom:

error[E0433]: failed to resolve: use of undeclared type `Topic`

Fix:

  • Ensure HORUS is installed: horus --help
  • Check import: use horus::prelude::*;
  • Run from project directory (where horus.toml is)

Experiments to Try

1. Change Update Rate

Make the sensor publish faster:

// simplified
// In SensorNode::tick()
if self.last_publish.elapsed() >= Duration::from_millis(500) {  // 2 Hz instead of 1 Hz

2. Add Temperature Alerts

Add to DisplayNode:

// simplified
if temp > 30.0 {
    println!("  WARNING: High temperature detected!");
}

3. Log Data to File

Add to DisplayNode::tick():

// simplified
use std::fs::OpenOptions;
use std::io::Write;

let mut file = OpenOptions::new()
    .create(true)
    .append(true)
    .open("temperature_log.txt")
    .unwrap();

writeln!(file, "{:.1}", temp).ok();

4. Use Rate-Limited Logging

In a real robot running at 1kHz, you don't want every tick flooding the log. Use hlog_once! for one-time events and hlog_every! for periodic status:

// simplified
fn tick(&mut self) {
    // Log once when sensor first connects
    hlog_once!(info, "Sensor online, streaming data");

    // Log status every 5 seconds (not every tick)
    hlog_every!(5000, info, "Filter running — last value: {:.1}°C", self.filtered_value.unwrap_or(0.0));

    // Log warnings at most once per second
    if let Some(temp) = self.raw_sub.recv() {
        if temp > 35.0 {
            hlog_every!(1000, warn, "High temperature: {:.1}°C", temp);
        }
    }
}

5. Add Multiple Sensors

Create a second sensor node:

// simplified
// In main()
scheduler.add(SensorNode::new()?).order(0).build()?;  // Sensor 1
scheduler.add(SensorNode::new()?).order(0).build()?;  // Sensor 2

Both will publish to the same topic, and FilterNode will process both!

Key Concepts Demonstrated

Pipeline Pattern: Data flows through stages (Sensor -> Filter -> Display)

Pub/Sub Decoupling: Nodes don't know about each other, only topics

Real-Time Processing: Filtering happens as data arrives

Shared Memory IPC: Sub-microsecond communication between nodes

Priority Scheduling: Sensor runs before filter, filter before display

Shorter Version: node! Macro

The entire pipeline above can be written more concisely with the node! macro. Here's the same 3-node system in roughly half the code:

use horus::prelude::*;
use std::time::{Duration, Instant};

node! {
    SensorNode {
        pub { temp_pub: f32 -> "raw_temp" }
        data {
            last_publish: Instant = Instant::now(),
            reading: f32 = 20.0,
        }
        init {
            hlog!(info, "Temperature sensor initialized");
            Ok(())
        }
        tick {
            if self.last_publish.elapsed() >= Duration::from_secs(1) {
                let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
                let noise = (self.reading * 0.7).sin() * 2.0;
                self.temp_pub.send(base_temp + noise);
                self.reading += 1.0;
                self.last_publish = Instant::now();
            }
        }
    }
}

node! {
    FilterNode {
        sub { raw_sub: f32 -> "raw_temp" }
        pub { filtered_pub: f32 -> "filtered_temp" }
        data {
            filtered_value: Option<f32> = None,
            alpha: f32 = 0.3,
        }
        tick {
            if let Some(raw) = self.raw_sub.recv() {
                let filtered = match self.filtered_value {
                    Some(prev) => self.alpha * raw + (1.0 - self.alpha) * prev,
                    None => raw,
                };
                self.filtered_value = Some(filtered);
                self.filtered_pub.send(filtered);
            }
        }
    }
}

node! {
    DisplayNode {
        sub { filtered_sub: f32 -> "filtered_temp" }
        data { count: u32 = 0 }
        tick {
            if let Some(temp) = self.filtered_sub.recv() {
                self.count += 1;
                let status = if temp < 22.0 { "COLD" } else if temp > 28.0 { "HOT" } else { "NORMAL" };
                println!("[#{}] {:.1}°C - {}", self.count, temp, status);
            }
        }
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(SensorNode::new()).order(0).build()?;
    scheduler.add(FilterNode::new()).order(1).build()?;
    scheduler.add(DisplayNode::new()).order(2).build()?;
    scheduler.run()?;
    Ok(())
}

The node! macro generates the struct, constructor, and Node trait implementation. Both versions produce identical runtime behavior. See the node! Macro Guide for the full syntax.

Step 5: Add a Watchdog (Production)

The pipeline above works great for learning, but a production robot needs safety monitoring. What if the FilterNode hangs — a bug causes an infinite loop, or a hardware sensor blocks? Without a watchdog, the scheduler keeps calling the broken node forever.

Add a watchdog to detect and respond to frozen nodes:

// simplified
fn main() -> Result<()> {
    let mut scheduler = Scheduler::new()
        .watchdog(5000_u64.ms());  // global: 5-second timeout

    scheduler.add(SensorNode::new()?).order(0).build()?;
    scheduler.add(FilterNode::new()?).order(1).build()?;
    scheduler.add(DisplayNode::new()?).order(2).build()?;

    scheduler.run()
}

With this change, if any node stops completing its tick() within 5 seconds:

  • At 5 seconds: Warning logged — "FilterNode watchdog warning"
  • At 10 seconds: Node marked Unhealthy, skipped. Sensor and Display keep running
  • At 15 seconds: Node Isolated, enter_safe_state() called (if implemented)

For safety-critical nodes (motors, actuators), you'd use a much tighter timeout and implement enter_safe_state(). See Safety Monitor for the full guide.

Key Takeaways

  • Pipeline pattern: Data flows through stages (Sensor -> Filter -> Display), each a separate node
  • Pub/sub decoupling: Nodes only know about topic names, not each other
  • Execution order: .order() controls which node runs first in each tick cycle
  • recv() is non-blocking: Returns None when no message is available -- not an error
  • Watchdog: .watchdog() detects frozen nodes and isolates them — critical for production
  • node! macro: Reduces boilerplate by ~75% while producing identical runtime behavior

Next Steps

Now that you've built a 3-node pipeline, try:

  1. Choosing Your Configuration — Progressive guide from prototype to production (Level 0→4)
  2. Execution Classes — When to use .compute(), .on("topic"), or .async_io() for different node types
  3. Testing — Unit test your nodes with tick_once()
  4. Custom Messages — Define your own message types
  5. node! Macro — Reduce boilerplate with macros

Production Features

When you're ready to move beyond prototyping:

Full Code

The complete code above is production-ready. To save it:

  1. Copy the entire code block
  2. Replace src/main.rs in your project
  3. Run with horus run

For additional examples, see Basic Examples.


See Also

  • Tutorials — Continue learning with guided tutorials
  • Recipes — Production-ready patterns (PID, sensor fusion, hardware, telemetry)
  • Core Concepts — Understand how HORUS works
  • Real-Time Systems — What real-time means and when you need it
  • Framework Clockhorus::now(), dt(), elapsed(), budget_remaining()