Building Your Second Application
Now that you've built your first HORUS application, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.
What You'll Build
A real-time temperature monitoring system with:
- SensorNode: Publishes simulated temperature readings every second
- FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
- DisplayNode: Subscribes to filtered data, displays to console
This demonstrates:
- Multi-node communication patterns
- Data pipeline processing
- Real-time filtering
- Monitor monitoring
Architecture
Step 1: Create the Project
horus new temperature_pipeline
cd temperature_pipeline
Step 2: Write the Code
Replace main.rs with this complete, runnable code:
use horus::prelude::*;
use std::time::{Duration, Instant};
// ============================================================================
// Node 1: SensorNode - Publishes temperature readings
// ============================================================================
struct SensorNode {
temp_pub: Topic<f32>,
last_publish: Instant,
reading: f32,
}
impl SensorNode {
fn new() -> Result<Self> {
Ok(Self {
temp_pub: Topic::new("raw_temp")?,
last_publish: Instant::now(),
reading: 20.0,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &str {
"SensorNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Temperature sensor initialized");
Ok(())
}
fn tick(&mut self) {
// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
// Simulate realistic temperature with noise
// Base temperature oscillates between 20-30°C
let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
// Add random noise (+/- 2°C)
let noise = (self.reading * 0.7).sin() * 2.0;
let temperature = base_temp + noise;
// Publish raw temperature
self.temp_pub.send(temperature);
hlog!(info, "Published raw temp: {:.2}°C", temperature);
self.reading += 1.0;
self.last_publish = Instant::now();
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Sensor shutdown complete");
Ok(())
}
}
// ============================================================================
// Node 2: FilterNode - Removes noise with exponential moving average
// ============================================================================
struct FilterNode {
raw_sub: Topic<f32>,
filtered_pub: Topic<f32>,
filtered_value: Option<f32>,
alpha: f32, // Smoothing factor (0.0 - 1.0)
}
impl FilterNode {
fn new() -> Result<Self> {
Ok(Self {
raw_sub: Topic::new("raw_temp")?,
filtered_pub: Topic::new("filtered_temp")?,
filtered_value: None,
alpha: 0.3, // 30% new data, 70% previous (smooth but responsive)
})
}
}
impl Node for FilterNode {
fn name(&self) -> &str {
"FilterNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Filter initialized (alpha = {:.2})", self.alpha);
Ok(())
}
fn tick(&mut self) {
// Check for new temperature reading
if let Some(raw_temp) = self.raw_sub.recv() {
// Apply exponential moving average filter
let filtered = match self.filtered_value {
Some(prev) => self.alpha * raw_temp + (1.0 - self.alpha) * prev,
None => raw_temp, // First reading, no previous value
};
self.filtered_value = Some(filtered);
// Publish filtered temperature
self.filtered_pub.send(filtered);
hlog!(info, "Filtered: {:.2}°C -> {:.2}°C (removed {:.2}°C noise)",
raw_temp, filtered, raw_temp - filtered);
}
}
fn shutdown(&mut self) -> Result<()> {
hlog!(info, "Filter shutdown complete");
Ok(())
}
}
// ============================================================================
// Node 3: DisplayNode - Shows filtered temperature on console
// ============================================================================
struct DisplayNode {
filtered_sub: Topic<f32>,
display_counter: u32,
}
impl DisplayNode {
fn new() -> Result<Self> {
Ok(Self {
filtered_sub: Topic::new("filtered_temp")?,
display_counter: 0,
})
}
}
impl Node for DisplayNode {
fn name(&self) -> &str {
"DisplayNode"
}
fn init(&mut self) -> Result<()> {
hlog!(info, "Display initialized");
println!("\n========================================");
println!(" Temperature Monitor - Press Ctrl+C to stop");
println!("========================================\n");
Ok(())
}
fn tick(&mut self) {
if let Some(temp) = self.filtered_sub.recv() {
self.display_counter += 1;
// Display temperature with visual indicator
let status = if temp < 22.0 {
"COLD"
} else if temp > 28.0 {
"HOT"
} else {
"NORMAL"
};
println!(
"[Reading #{}] Temperature: {:.1}°C - Status: {}",
self.display_counter, temp, status
);
hlog!(debug, "Displayed reading #{}", self.display_counter);
}
}
fn shutdown(&mut self) -> Result<()> {
println!("\n========================================");
println!(" Total readings displayed: {}", self.display_counter);
println!("========================================\n");
hlog!(info, "Display shutdown complete");
Ok(())
}
}
// ============================================================================
// Main Application - Configure and run the scheduler
// ============================================================================
fn main() -> Result<()> {
println!("Starting Temperature Pipeline...\n");
let mut scheduler = Scheduler::new();
// Add nodes in priority order:
// 1. SensorNode (order 0) - Runs first to generate data
scheduler.add(SensorNode::new()?).order(0).build()?;
// 2. FilterNode (order 1) - Runs second to process data
scheduler.add(FilterNode::new()?).order(1).build()?;
// 3. DisplayNode (order 2) - Runs last to display results
scheduler.add(DisplayNode::new()?).order(2).build()?;
println!("All nodes initialized. Running...\n");
// Run the scheduler (blocks until Ctrl+C)
scheduler.run()?;
Ok(())
}
Step 3: Run the Application
horus run
Expected Output:
Starting Temperature Pipeline...
All nodes initialized. Running...
========================================
Temperature Monitor - Press Ctrl+C to stop
========================================
[Reading #1] Temperature: 23.4°C - Status: NORMAL
[Reading #2] Temperature: 24.1°C - Status: NORMAL
[Reading #3] Temperature: 25.8°C - Status: NORMAL
[Reading #4] Temperature: 27.2°C - Status: NORMAL
[Reading #5] Temperature: 28.6°C - Status: HOT
[Reading #6] Temperature: 27.9°C - Status: NORMAL
[Reading #7] Temperature: 26.3°C - Status: NORMAL
Press Ctrl+C to stop:
^C
Ctrl+C received! Shutting down HORUS scheduler...
========================================
Total readings displayed: 7
========================================
Step 4: Monitor with Monitor
Open a second terminal and run:
horus monitor
The monitor will show:
Nodes Tab
- SensorNode: Publishing to
raw_tempevery ~1 second - FilterNode: Subscribing to
raw_temp, publishing tofiltered_temp - DisplayNode: Subscribing to
filtered_temp
Topics Tab
- raw_temp (f32): Noisy temperature readings
- filtered_temp (f32): Smooth temperature readings
Metrics Tab
- IPC Latency: ~85ns-437ns (sub-microsecond!)
- Tick Duration: How long each node takes to execute
- Message Counts: Total messages sent/received
Understanding the Code
SensorNode
// Publish every 1 second
if self.last_publish.elapsed() >= Duration::from_secs(1) {
let temperature = 25.0 + noise;
self.temp_pub.send(temperature);
}
Key Points:
- Uses
Instantto track time between publishes - Simulates realistic sensor data with noise
- Publishes to
"raw_temp"topic
FilterNode
// Exponential moving average filter
let filtered = self.alpha * raw_temp + (1.0 - self.alpha) * prev;
self.filtered_pub.send(filtered);
Key Points:
- Subscribes to
"raw_temp", publishes to"filtered_temp" - Implements exponential moving average (EMA) filter
alpha = 0.3balances responsiveness vs smoothness
Filter Behavior:
- High alpha (0.8): Fast response, less smoothing
- Low alpha (0.2): Slow response, more smoothing
DisplayNode
if let Some(temp) = self.filtered_sub.recv() {
println!("[Reading #{}] Temperature: {:.1}°C", count, temp);
}
Key Points:
- Subscribes to
"filtered_temp" - Only receives when new data is available
recv()returnsNonewhen no message (not an error!)
Common Issues & Fixes
Issue 1: No Output Displayed
Symptom:
Starting Temperature Pipeline...
All nodes initialized. Running...
========================================
Temperature Monitor - Press Ctrl+C to stop
========================================
[Nothing appears]
Cause: Topics not connecting (typo in topic names)
Fix:
- Check topic names match exactly:
"raw_temp"and"filtered_temp" - Verify with monitor:
horus monitor-> Topics tab - Ensure all nodes are running in same scheduler
Issue 2: Too Much/Too Little Smoothing
Symptom: Temperature changes too fast or too slow
Fix: Adjust the alpha value in FilterNode:
alpha: 0.3, // Current: moderate smoothing
// Try these alternatives:
alpha: 0.7, // More responsive, less smooth
alpha: 0.1, // Very smooth, slower response
Issue 3: Monitor Shows No Nodes
Symptom: Monitor is empty
Cause: Application not running or monitor started before app
Fix:
- Start the application first:
horus run - Then start monitor in separate terminal:
horus monitor - Monitor auto-discovers running nodes
Issue 4: Build Errors
Symptom:
error[E0433]: failed to resolve: use of undeclared type `Topic`
Fix:
- Ensure HORUS is installed:
horus --help - Check import:
use horus::prelude::*; - Run from project directory (where
horus.tomlis)
Experiments to Try
1. Change Update Rate
Make the sensor publish faster:
// In SensorNode::tick()
if self.last_publish.elapsed() >= Duration::from_millis(500) { // 2 Hz instead of 1 Hz
2. Add Temperature Alerts
Add to DisplayNode:
if temp > 30.0 {
println!(" WARNING: High temperature detected!");
}
3. Log Data to File
Add to DisplayNode::tick():
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("temperature_log.txt")
.unwrap();
writeln!(file, "{:.1}", temp).ok();
4. Use Rate-Limited Logging
In a real robot running at 1kHz, you don't want every tick flooding the log. Use hlog_once! for one-time events and hlog_every! for periodic status:
fn tick(&mut self) {
// Log once when sensor first connects
hlog_once!(info, "Sensor online, streaming data");
// Log status every 5 seconds (not every tick)
hlog_every!(5000, info, "Filter running — last value: {:.1}°C", self.filtered_value.unwrap_or(0.0));
// Log warnings at most once per second
if let Some(temp) = self.raw_sub.recv() {
if temp > 35.0 {
hlog_every!(1000, warn, "High temperature: {:.1}°C", temp);
}
}
}
5. Add Multiple Sensors
Create a second sensor node:
// In main()
scheduler.add(SensorNode::new()?).order(0).build()?; // Sensor 1
scheduler.add(SensorNode::new()?).order(0).build()?; // Sensor 2
Both will publish to the same topic, and FilterNode will process both!
Key Concepts Demonstrated
Pipeline Pattern: Data flows through stages (Sensor -> Filter -> Display)
Pub/Sub Decoupling: Nodes don't know about each other, only topics
Real-Time Processing: Filtering happens as data arrives
Shared Memory IPC: Sub-microsecond communication between nodes
Priority Scheduling: Sensor runs before filter, filter before display
Shorter Version: node! Macro
The entire pipeline above can be written more concisely with the node! macro. Here's the same 3-node system in roughly half the code:
use horus::prelude::*;
use std::time::{Duration, Instant};
node! {
SensorNode {
pub { temp_pub: f32 -> "raw_temp" }
data {
last_publish: Instant = Instant::now(),
reading: f32 = 20.0,
}
init {
hlog!(info, "Temperature sensor initialized");
Ok(())
}
tick {
if self.last_publish.elapsed() >= Duration::from_secs(1) {
let base_temp = 25.0 + (self.reading * 0.1).sin() * 5.0;
let noise = (self.reading * 0.7).sin() * 2.0;
self.temp_pub.send(base_temp + noise);
self.reading += 1.0;
self.last_publish = Instant::now();
}
}
}
}
node! {
FilterNode {
sub { raw_sub: f32 -> "raw_temp" }
pub { filtered_pub: f32 -> "filtered_temp" }
data {
filtered_value: Option<f32> = None,
alpha: f32 = 0.3,
}
tick {
if let Some(raw) = self.raw_sub.recv() {
let filtered = match self.filtered_value {
Some(prev) => self.alpha * raw + (1.0 - self.alpha) * prev,
None => raw,
};
self.filtered_value = Some(filtered);
self.filtered_pub.send(filtered);
}
}
}
}
node! {
DisplayNode {
sub { filtered_sub: f32 -> "filtered_temp" }
data { count: u32 = 0 }
tick {
if let Some(temp) = self.filtered_sub.recv() {
self.count += 1;
let status = if temp < 22.0 { "COLD" } else if temp > 28.0 { "HOT" } else { "NORMAL" };
println!("[#{}] {:.1}°C - {}", self.count, temp, status);
}
}
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(SensorNode::new()).order(0).build()?;
scheduler.add(FilterNode::new()).order(1).build()?;
scheduler.add(DisplayNode::new()).order(2).build()?;
scheduler.run()?;
Ok(())
}
The node! macro generates the struct, constructor, and Node trait implementation. Both versions produce identical runtime behavior. See the node! Macro Guide for the full syntax.
Next Steps
Now that you've built a 3-node pipeline, try:
- Testing - Learn how to unit test your nodes
- Using Pre-Built Nodes - Use library nodes instead of writing from scratch
- node! Macro - Reduce boilerplate with macros
- Message Types - Use complex message types instead of primitives
Full Code
The complete code above is production-ready. To save it:
- Copy the entire code block
- Replace
main.rsin your project - Run with
horus run
For additional examples, see Basic Examples.