Building Your Second Application
Now that you've built your first HORUS application, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.
Prerequisites
- HORUS installed (Installation Guide)
- Quick Start completed (you know how to create a project and run it)
Time: ~20 minutes
What You'll Build
A real-time temperature monitoring system with:
- SensorNode: Publishes simulated temperature readings every second
- FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
- DisplayNode: Subscribes to filtered data, displays to console
This demonstrates:
- Multi-node communication patterns
- Data pipeline processing
- Real-time filtering
- Monitor monitoring
Architecture
Step 1: Create the Project
horus new temperature_pipeline
cd temperature_pipeline
Step 2: Write the Code
Replace src/main.rs with this complete, runnable code:
use horus::prelude::*;
use std::time::{Duration, Instant};
// ============================================================================
// Node 1: SensorNode - Publishes temperature readings
// ============================================================================
struct SensorNode {
temp_pub: Topic<f32>,
last_publish: Instant,
reading: f32,
}
impl SensorNode {
fn new() -> Result<Self> {
Ok(Self {
temp_pub: Topic::new("raw_temp")?,
last_publish: Instant::now(),
reading: 20.0,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &str {
"SensorNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Temperature sensor initialized");
Ok(())
}
fn tick(&mut self) {
// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
// Simulate realistic temperature with noise
// Base temperature oscillates between 20-30°C
let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
// Add random noise (+/- 2°C)
let noise = (self.reading * 0.7).sin() * 2.0;
let temperature = base_temp + noise;
// Publish raw temperature
self.temp_pub.send(temperature);
hlog!(info, "Published raw temp: {:.2}°C", temperature);
self.reading += 1.0;
self.last_publish = Instant::now();
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor shutdown complete");
Ok(())
}
}
// ============================================================================
// Node 2: FilterNode - Removes noise with exponential moving average
// ============================================================================
struct FilterNode {
raw_sub: Topic<f32>,
filtered_pub: Topic<f32>,
filtered_value: Option<f32>,
alpha: f32, // Smoothing factor (0.0 - 1.0)
}
impl FilterNode {
fn new() -> Result<Self> {
Ok(Self {
raw_sub: Topic::new("raw_temp")?,
filtered_pub: Topic::new("filtered_temp")?,
filtered_value: None,
alpha: 0.3, // 30% new data, 70% previous (smooth but responsive)
})
}
}
impl Node for FilterNode {
fn name(&self) -> &str {
"FilterNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Filter initialized (alpha = {:.2})", self.alpha);
Ok(())
}
fn tick(&mut self) {
// Check for new temperature reading
if let Some(raw_temp) = self.raw_sub.recv() {
// Apply exponential moving average filter
let filtered = match self.filtered_value {
Some(prev) => self.alpha * raw_temp + (1.0 - self.alpha) * prev,
None => raw_temp, // First reading, no previous value
};
self.filtered_value = Some(filtered);
// Publish filtered temperature
self.filtered_pub.send(filtered);
hlog!(info, "Filtered: {:.2}°C -> {:.2}°C (removed {:.2}°C noise)",
raw_temp, filtered, raw_temp - filtered);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Filter shutdown complete");
Ok(())
}
}
// ============================================================================
// Node 3: DisplayNode - Shows filtered temperature on console
// ============================================================================
struct DisplayNode {
filtered_sub: Topic<f32>,
display_counter: u32,
}
impl DisplayNode {
fn new() -> Result<Self> {
Ok(Self {
filtered_sub: Topic::new("filtered_temp")?,
display_counter: 0,
})
}
}
impl Node for DisplayNode {
fn name(&self) -> &str {
"DisplayNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Display initialized");
println!("\n========================================");
println!(" Temperature Monitor - Press Ctrl+C to stop");
println!("========================================\n");
Ok(())
}
fn tick(&mut self) {
if let Some(temp) = self.filtered_sub.recv() {
self.display_counter += 1;
// Display temperature with visual indicator
let status = if temp < 22.0 {
"COLD"
} else if temp > 28.0 {
"HOT"
} else {
"NORMAL"
};
println!(
"[Reading #{}] Temperature: {:.1}°C - Status: {}",
self.display_counter, temp, status
);
hlog!(debug, "Displayed reading #{}", self.display_counter);
}
}
fn shutdown(&mut self) -> Result<()> {
println!("\n========================================");
println!(" Total readings displayed: {}", self.display_counter);
println!("========================================\n");
hlog!(info, "Display shutdown complete");
Ok(())
}
}
// ============================================================================
// Main Application - Configure and run the scheduler
// ============================================================================
fn main() -> Result<()> {
println!("Starting Temperature Pipeline...\n");
let mut scheduler = Scheduler::new();
// Add nodes in priority order:
// 1. SensorNode (order 0) - Runs first to generate data
scheduler.add(SensorNode::new()?).order(0).build()?;
// 2. FilterNode (order 1) - Runs second to process data
scheduler.add(FilterNode::new()?).order(1).build()?;
// 3. DisplayNode (order 2) - Runs last to display results
scheduler.add(DisplayNode::new()?).order(2).build()?;
// Optional: Enable watchdog for production use.
// If any node hangs for more than 5 seconds, the scheduler
// logs a warning (1x), skips the node (2x), then isolates it (3x).
// See the Safety Monitor guide for details.
println!("All nodes initialized. Running...\n");
// Run the scheduler (blocks until Ctrl+C)
scheduler.run()?;
Ok(())
}
Step 3: Run the Application
horus run
Expected Output:
Starting Temperature Pipeline...
All nodes initialized. Running...
========================================
Temperature Monitor - Press Ctrl+C to stop
========================================
[Reading #1] Temperature: 23.4°C - Status: NORMAL
[Reading #2] Temperature: 24.1°C - Status: NORMAL
[Reading #3] Temperature: 25.8°C - Status: NORMAL
[Reading #4] Temperature: 27.2°C - Status: NORMAL
[Reading #5] Temperature: 28.6°C - Status: HOT
[Reading #6] Temperature: 27.9°C - Status: NORMAL
[Reading #7] Temperature: 26.3°C - Status: NORMAL
Press Ctrl+C to stop:
^C
Ctrl+C received! Shutting down HORUS scheduler...
========================================
Total readings displayed: 7
========================================
Step 4: Monitor with Monitor
Open a second terminal and run:
horus monitor
The monitor will show:
Nodes Tab
- SensorNode: Publishing to
raw_tempevery ~1 second - FilterNode: Subscribing to
raw_temp, publishing tofiltered_temp - DisplayNode: Subscribing to
filtered_temp
Topics Tab
- raw_temp (f32): Noisy temperature readings
- filtered_temp (f32): Smooth temperature readings
Metrics Tab
- IPC Latency: ~85ns-437ns (sub-microsecond!)
- Tick Duration: How long each node takes to execute
- Message Counts: Total messages sent/received
Understanding the Code
SensorNode
// simplified
// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
let temperature = 25.0 + noise;
self.temp_pub.send(temperature);
}
Key Points:
- Uses
Instantto track time between publishes - Simulates realistic sensor data with noise
- Publishes to
"raw_temp"topic
FilterNode
// simplified
// Exponential moving average filter
let filtered = self.alpha * raw_temp + (1.0 - self.alpha) * prev;
self.filtered_pub.send(filtered);
Key Points:
- Subscribes to
"raw_temp", publishes to"filtered_temp" - Implements exponential moving average (EMA) filter
alpha = 0.3balances responsiveness vs smoothness
Filter Behavior:
- High alpha (0.8): Fast response, less smoothing
- Low alpha (0.2): Slow response, more smoothing
DisplayNode
// simplified
if let Some(temp) = self.filtered_sub.recv() {
println!("[Reading #{}] Temperature: {:.1}°C", count, temp);
}
Key Points:
- Subscribes to
"filtered_temp" - Only receives when new data is available
recv()returnsNonewhen no message (not an error!)
Common Issues & Fixes
Issue 1: No Output Displayed
Symptom:
Starting Temperature Pipeline...
All nodes initialized. Running...
========================================
Temperature Monitor - Press Ctrl+C to stop
========================================
[Nothing appears]
Cause: Topics not connecting (typo in topic names)
Fix:
- Check topic names match exactly:
"raw_temp"and"filtered_temp" - Verify with monitor:
horus monitor-> Topics tab - Ensure all nodes are running in same scheduler
Issue 2: Too Much/Too Little Smoothing
Symptom: Temperature changes too fast or too slow
Fix: Adjust the alpha value in FilterNode:
// simplified
alpha: 0.3, // Current: moderate smoothing
// Try these alternatives:
alpha: 0.7, // More responsive, less smooth
alpha: 0.1, // Very smooth, slower response
Issue 3: Monitor Shows No Nodes
Symptom: Monitor is empty
Cause: Application not running or monitor started before app
Fix:
- Start the application first:
horus run - Then start monitor in separate terminal:
horus monitor - Monitor auto-discovers running nodes
Issue 4: Build Errors
Symptom:
error[E0433]: failed to resolve: use of undeclared type `Topic`
Fix:
- Ensure HORUS is installed:
horus --help - Check import:
use horus::prelude::*; - Run from project directory (where
horus.tomlis)
Experiments to Try
1. Change Update Rate
Make the sensor publish faster:
// simplified
// In SensorNode::tick()
if self.last_publish.elapsed() >= Duration::from_millis(500) { // 2 Hz instead of 1 Hz
2. Add Temperature Alerts
Add to DisplayNode:
// simplified
if temp > 30.0 {
println!(" WARNING: High temperature detected!");
}
3. Log Data to File
Add to DisplayNode::tick():
// simplified
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("temperature_log.txt")
.unwrap();
writeln!(file, "{:.1}", temp).ok();
4. Use Rate-Limited Logging
In a real robot running at 1kHz, you don't want every tick flooding the log. Use hlog_once! for one-time events and hlog_every! for periodic status:
// simplified
fn tick(&mut self) {
// Log once when sensor first connects
hlog_once!(info, "Sensor online, streaming data");
// Log status every 5 seconds (not every tick)
hlog_every!(5000, info, "Filter running — last value: {:.1}°C", self.filtered_value.unwrap_or(0.0));
// Log warnings at most once per second
if let Some(temp) = self.raw_sub.recv() {
if temp > 35.0 {
hlog_every!(1000, warn, "High temperature: {:.1}°C", temp);
}
}
}
5. Add Multiple Sensors
Create a second sensor node:
// simplified
// In main()
scheduler.add(SensorNode::new()?).order(0).build()?; // Sensor 1
scheduler.add(SensorNode::new()?).order(0).build()?; // Sensor 2
Both will publish to the same topic, and FilterNode will process both!
Key Concepts Demonstrated
Pipeline Pattern: Data flows through stages (Sensor -> Filter -> Display)
Pub/Sub Decoupling: Nodes don't know about each other, only topics
Real-Time Processing: Filtering happens as data arrives
Shared Memory IPC: Sub-microsecond communication between nodes
Priority Scheduling: Sensor runs before filter, filter before display
Shorter Version: node! Macro
The entire pipeline above can be written more concisely with the node! macro. Here's the same 3-node system in roughly half the code:
use horus::prelude::*;
use std::time::{Duration, Instant};
node! {
SensorNode {
pub { temp_pub: f32 -> "raw_temp" }
data {
last_publish: Instant = Instant::now(),
reading: f32 = 20.0,
}
init {
hlog!(info, "Temperature sensor initialized");
Ok(())
}
tick {
if self.last_publish.elapsed() >= Duration::from_secs(1) {
let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
let noise = (self.reading * 0.7).sin() * 2.0;
self.temp_pub.send(base_temp + noise);
self.reading += 1.0;
self.last_publish = Instant::now();
}
}
}
}
node! {
FilterNode {
sub { raw_sub: f32 -> "raw_temp" }
pub { filtered_pub: f32 -> "filtered_temp" }
data {
filtered_value: Option<f32> = None,
alpha: f32 = 0.3,
}
tick {
if let Some(raw) = self.raw_sub.recv() {
let filtered = match self.filtered_value {
Some(prev) => self.alpha * raw + (1.0 - self.alpha) * prev,
None => raw,
};
self.filtered_value = Some(filtered);
self.filtered_pub.send(filtered);
}
}
}
}
node! {
DisplayNode {
sub { filtered_sub: f32 -> "filtered_temp" }
data { count: u32 = 0 }
tick {
if let Some(temp) = self.filtered_sub.recv() {
self.count += 1;
let status = if temp < 22.0 { "COLD" } else if temp > 28.0 { "HOT" } else { "NORMAL" };
println!("[#{}] {:.1}°C - {}", self.count, temp, status);
}
}
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new()).order(0).build()?;
scheduler.add(FilterNode::new()).order(1).build()?;
scheduler.add(DisplayNode::new()).order(2).build()?;
scheduler.run()?;
Ok(())
}
The node! macro generates the struct, constructor, and Node trait implementation. Both versions produce identical runtime behavior. See the node! Macro Guide for the full syntax.
Step 5: Add a Watchdog (Production)
The pipeline above works great for learning, but a production robot needs safety monitoring. What if the FilterNode hangs — a bug causes an infinite loop, or a hardware sensor blocks? Without a watchdog, the scheduler keeps calling the broken node forever.
Add a watchdog to detect and respond to frozen nodes:
// simplified
fn main() -> Result<()> {
let mut scheduler = Scheduler::new()
.watchdog(5000_u64.ms()); // global: 5-second timeout
scheduler.add(SensorNode::new()?).order(0).build()?;
scheduler.add(FilterNode::new()?).order(1).build()?;
scheduler.add(DisplayNode::new()?).order(2).build()?;
scheduler.run()
}
With this change, if any node stops completing its tick() within 5 seconds:
- At 5 seconds: Warning logged — "FilterNode watchdog warning"
- At 10 seconds: Node marked Unhealthy, skipped. Sensor and Display keep running
- At 15 seconds: Node Isolated,
enter_safe_state()called (if implemented)
For safety-critical nodes (motors, actuators), you'd use a much tighter timeout and implement enter_safe_state(). See Safety Monitor for the full guide.
Key Takeaways
- Pipeline pattern: Data flows through stages (Sensor -> Filter -> Display), each a separate node
- Pub/sub decoupling: Nodes only know about topic names, not each other
- Execution order:
.order()controls which node runs first in each tick cycle - recv() is non-blocking: Returns
Nonewhen no message is available -- not an error - Watchdog:
.watchdog()detects frozen nodes and isolates them — critical for production - node! macro: Reduces boilerplate by ~75% while producing identical runtime behavior
Next Steps
Now that you've built a 3-node pipeline, try:
- Choosing Your Configuration — Progressive guide from prototype to production (Level 0→4)
- Execution Classes — When to use
.compute(),.on("topic"), or.async_io()for different node types - Testing — Unit test your nodes with
tick_once() - Custom Messages — Define your own message types
- node! Macro — Reduce boilerplate with macros
Production Features
When you're ready to move beyond prototyping:
- Safety Monitor — Watchdog, deadline enforcement, graduated degradation
- BlackBox — Flight recorder for post-mortem crash analysis
- Deterministic Mode — Reproducible execution for simulation and CI testing
- Record & Replay — Tick-perfect replay for reproducing field bugs
- Fault Tolerance — Per-node failure policies (restart, skip, fatal)
- Emergency Stop — Event-driven e-stop pattern for actuator safety
Full Code
The complete code above is production-ready. To save it:
- Copy the entire code block
- Replace
src/main.rsin your project - Run with
horus run
For additional examples, see Basic Examples.
See Also
- Tutorials — Continue learning with guided tutorials
- Recipes — Production-ready patterns (PID, sensor fusion, hardware, telemetry)
- Core Concepts — Understand how HORUS works
- Real-Time Systems — What real-time means and when you need it
- Framework Clock —
horus::now(),dt(),elapsed(),budget_remaining()