Building Your Second Application (Python)

Now that you have built your first HORUS application in Python, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.

Prerequisites

Time: ~20 minutes

What You'll Build

A real-time temperature monitoring system with three nodes:

  1. SensorNode: Publishes simulated temperature readings every second
  2. FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
  3. DisplayNode: Subscribes to filtered data, displays to console

This demonstrates:

  • Multi-node communication patterns
  • Data pipeline processing
  • Real-time filtering with state
  • CLI introspection tools

Architecture

Loading diagram...
Temperature pipeline: SensorNode → FilterNode → DisplayNode

Step 1: Create the Project

horus new temperature_pipeline -p
cd temperature_pipeline

The -p flag creates a Python project with src/main.py, horus.toml, and .horus/.

Step 2: Write the Code

Replace src/main.py with this complete, runnable code:

import math
import horus

# ============================================================================
# Node 1: SensorNode — Publishes temperature readings
# ============================================================================

def make_sensor():
    state = {"reading": 0.0}

    def init(node):
        node.log_info("Temperature sensor initialized")

    def tick(node):
        # Simulate realistic temperature with noise
        # Base temperature oscillates between 20-30°C
        base_temp = 25.0 + math.sin(state["reading"] * 0.1) * 5.0

        # Add random noise (+/- 2°C)
        noise = math.sin(state["reading"] * 0.7) * 2.0
        temperature = base_temp + noise

        # Publish raw temperature
        node.send("raw_temp", temperature)
        node.log_info(f"Published raw temp: {temperature:.2f}°C")

        state["reading"] += 1.0

    def shutdown(node):
        node.log_info("Sensor shutdown complete")

    return horus.Node(
        name="SensorNode",
        tick=tick,
        init=init,
        shutdown=shutdown,
        rate=1,            # 1 Hz — one reading per second
        order=0,           # Runs first in each tick cycle
        pubs=["raw_temp"],
    )


# ============================================================================
# Node 2: FilterNode — Removes noise with exponential moving average
# ============================================================================

def make_filter():
    state = {
        "filtered_value": None,
        "alpha": 0.3,  # Smoothing factor: 30% new data, 70% previous
    }

    def init(node):
        node.log_info(f"Filter initialized (alpha = {state['alpha']:.2f})")

    def tick(node):
        raw_temp = node.recv("raw_temp")
        if raw_temp is None:
            return

        alpha = state["alpha"]

        # Apply exponential moving average filter
        if state["filtered_value"] is None:
            filtered = raw_temp  # First reading, no previous value
        else:
            prev = state["filtered_value"]
            filtered = alpha * raw_temp + (1.0 - alpha) * prev

        state["filtered_value"] = filtered

        # Publish filtered temperature
        node.send("filtered_temp", filtered)

        noise_removed = raw_temp - filtered
        node.log_info(
            f"Filtered: {raw_temp:.2f}°C -> {filtered:.2f}°C "
            f"(removed {noise_removed:.2f}°C noise)"
        )

    def shutdown(node):
        node.log_info("Filter shutdown complete")

    return horus.Node(
        name="FilterNode",
        tick=tick,
        init=init,
        shutdown=shutdown,
        rate=1,
        order=1,           # Runs after SensorNode
        pubs=["filtered_temp"],
        subs=["raw_temp"],
    )


# ============================================================================
# Node 3: DisplayNode — Shows filtered temperature on console
# ============================================================================

def make_display():
    state = {"count": 0}

    def init(node):
        node.log_info("Display initialized")
        print("\n========================================")
        print("  Temperature Monitor — Press Ctrl+C to stop")
        print("========================================\n")

    def tick(node):
        temp = node.recv("filtered_temp")
        if temp is None:
            return

        state["count"] += 1

        # Display temperature with status indicator
        if temp < 22.0:
            status = "COLD"
        elif temp > 28.0:
            status = "HOT"
        else:
            status = "NORMAL"

        print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C — Status: {status}")
        node.log_debug(f"Displayed reading #{state['count']}")

    def shutdown(node):
        print(f"\n========================================")
        print(f"  Total readings displayed: {state['count']}")
        print(f"========================================\n")
        node.log_info("Display shutdown complete")

    return horus.Node(
        name="DisplayNode",
        tick=tick,
        init=init,
        shutdown=shutdown,
        rate=1,
        order=2,           # Runs last — after FilterNode
        subs=["filtered_temp"],
    )


# ============================================================================
# Main — Configure and run the pipeline
# ============================================================================

if __name__ == "__main__":
    print("Starting Temperature Pipeline...\n")
    horus.run(make_sensor(), make_filter(), make_display())

Step 3: Run the Application

horus run

Expected output:

Starting Temperature Pipeline...

========================================
  Temperature Monitor — Press Ctrl+C to stop
========================================

[Reading #1] Temperature: 23.4°C — Status: NORMAL
[Reading #2] Temperature: 24.1°C — Status: NORMAL
[Reading #3] Temperature: 25.8°C — Status: NORMAL
[Reading #4] Temperature: 27.2°C — Status: NORMAL
[Reading #5] Temperature: 28.6°C — Status: HOT
[Reading #6] Temperature: 27.9°C — Status: NORMAL
[Reading #7] Temperature: 26.3°C — Status: NORMAL

Press Ctrl+C to stop:

^C
Ctrl+C received! Shutting down HORUS scheduler...

========================================
  Total readings displayed: 7
========================================

Always use horus run — do NOT run python src/main.py directly.

horus run sets up the SHM namespace, environment variables, and build pipeline before executing your code. Running python src/main.py directly means topics won't connect to other processes, and CLI introspection tools like horus topic echo won't work.

Step 4: Inspect with CLI Tools

While the application is running, open a second terminal to inspect the live data.

List Active Topics

horus topic list

You should see both topics:

raw_temp        (active, 1 publisher)
filtered_temp   (active, 1 publisher)

Echo Raw Sensor Data

Watch the noisy sensor readings in real time:

horus topic echo raw_temp
[1] 23.42
[2] 25.87
[3] 27.14
[4] 24.63
...

Press Ctrl+C to stop echoing.

Echo Filtered Data

Compare with the smoothed output:

horus topic echo filtered_temp
[1] 23.42
[2] 25.14
[3] 25.74
[4] 25.41
...

Notice how the filtered values change more gradually than the raw values.

Measure Publishing Rate

Verify each topic publishes at the expected rate:

horus topic hz raw_temp
average rate: 1.00 Hz
    min: 0.998s, max: 1.002s, std dev: 0.001s
horus topic hz filtered_temp

The filtered topic should also show ~1 Hz since the FilterNode republishes every time it receives a new reading.

Monitor Dashboard

For a full overview of nodes, topics, and metrics:

horus monitor

The monitor will show:

  • Nodes tab: SensorNode, FilterNode, DisplayNode with their rates and states
  • Topics tab: raw_temp and filtered_temp with message counts
  • Metrics tab: IPC latency, tick duration, and message throughput

Understanding the Code

SensorNode

def tick(node):
    base_temp = 25.0 + math.sin(state["reading"] * 0.1) * 5.0
    noise = math.sin(state["reading"] * 0.7) * 2.0
    temperature = base_temp + noise
    node.send("raw_temp", temperature)

Key points:

  • Publishes to "raw_temp" topic at 1 Hz (set by rate=1)
  • State lives in a dictionary captured by the closure
  • node.send(topic, data) publishes any serializable value

FilterNode

def tick(node):
    raw_temp = node.recv("raw_temp")
    if raw_temp is None:
        return
    filtered = alpha * raw_temp + (1.0 - alpha) * prev
    node.send("filtered_temp", filtered)

Key points:

  • Subscribes to "raw_temp", publishes to "filtered_temp"
  • Implements an exponential moving average (EMA) filter
  • alpha = 0.3 balances responsiveness vs smoothness
  • node.recv() returns None when no message is available (not an error)

Filter behavior:

  • High alpha (0.8): Fast response, less smoothing
  • Low alpha (0.2): Slow response, more smoothing

DisplayNode

def tick(node):
    temp = node.recv("filtered_temp")
    if temp is None:
        return
    print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C — Status: {status}")

Key points:

  • Subscribes to "filtered_temp" only
  • Only prints when new data is available
  • Uses init and shutdown callbacks for banner display

State Management

Python nodes manage state through closures. Each make_*() factory function creates a state dictionary that the tick, init, and shutdown functions capture:

def make_sensor():
    state = {"reading": 0.0}  # Mutable state

    def tick(node):
        state["reading"] += 1.0  # Mutate via dict
        node.send("raw_temp", state["reading"])

    return horus.Node(name="SensorNode", tick=tick, ...)

Dictionaries work because tick closes over the dict reference, and dict values are mutable in place. You can also use a list ([0.0]) for single values or a class instance for complex state.

Execution Order

The order parameter controls when each node runs within a tick cycle:

make_sensor()   # order=0 — runs first, produces data
make_filter()   # order=1 — runs second, consumes and re-publishes
make_display()  # order=2 — runs third, consumes final output

Lower order values run first. This ensures data flows through the pipeline in a single tick cycle without a one-tick delay between stages.

Common Issues and Fixes

Issue: No Output Displayed

Symptom:

Starting Temperature Pipeline...

========================================
  Temperature Monitor — Press Ctrl+C to stop
========================================

[Nothing appears]

Cause: Topic names don't match between publisher and subscriber.

Fix:

  • Check topic names match exactly: "raw_temp" and "filtered_temp"
  • Verify with horus topic list in a second terminal
  • Ensure all three nodes are passed to horus.run()

Issue: Too Much or Too Little Smoothing

Symptom: Temperature changes too fast or too slow.

Fix: Adjust the alpha value in make_filter():

state = {
    "alpha": 0.3,  # Current: moderate smoothing

    # Try these alternatives:
    # "alpha": 0.7,  # More responsive, less smooth
    # "alpha": 0.1,  # Very smooth, slower response
}

Issue: ModuleNotFoundError: horus

Cause: Python bindings not installed.

Fix:

pip install horus-robotics

For source builds:

cd horus_py && maturin develop --release

Issue: horus topic echo Shows Nothing

Cause: Application not running, or running with python src/main.py instead of horus run.

Fix:

  1. Start the application with horus run (not python src/main.py)
  2. In a separate terminal, run horus topic echo raw_temp
  3. If still empty, run horus topic list to check which topics exist

Issue: Stale Shared Memory

Symptom: Failed to create Topic or topics from a previous run interfere.

Fix:

horus clean --shm

Experiments to Try

Change the Update Rate

Make the sensor publish faster (2 Hz instead of 1 Hz):

return horus.Node(
    name="SensorNode",
    tick=tick,
    rate=2,  # 2 Hz instead of 1 Hz
    ...
)

Remember to update FilterNode and DisplayNode rates to match, or they will only process every other reading.

Add Temperature Alerts

Add a warning when the temperature exceeds a threshold:

def tick(node):
    temp = node.recv("filtered_temp")
    if temp is None:
        return

    state["count"] += 1
    print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C")

    if temp > 30.0:
        print("  WARNING: High temperature detected!")
        node.log_warning(f"High temp alert: {temp:.1f}°C")

Log Data to a File

Add file logging to the DisplayNode:

def make_display():
    state = {"count": 0, "logfile": None}

    def init(node):
        state["logfile"] = open("temperature_log.csv", "w")
        state["logfile"].write("reading,temperature\n")

    def tick(node):
        temp = node.recv("filtered_temp")
        if temp is None:
            return
        state["count"] += 1
        state["logfile"].write(f"{state['count']},{temp:.2f}\n")
        state["logfile"].flush()
        print(f"[#{state['count']}] {temp:.1f}°C (logged)")

    def shutdown(node):
        if state["logfile"]:
            state["logfile"].close()
            node.log_info("Log file closed")

    return horus.Node(name="DisplayNode", tick=tick, init=init,
                      shutdown=shutdown, rate=1, order=2,
                      subs=["filtered_temp"])

Use a Class for State

If you prefer classes over closures:

class SensorState:
    def __init__(self):
        self.reading = 0.0

sensor = SensorState()

def sensor_tick(node):
    sensor.reading += 1.0
    base_temp = 25.0 + math.sin(sensor.reading * 0.1) * 5.0
    noise = math.sin(sensor.reading * 0.7) * 2.0
    node.send("raw_temp", base_temp + noise)

sensor_node = horus.Node(
    name="SensorNode", tick=sensor_tick, rate=1, order=0,
    pubs=["raw_temp"],
)

Both approaches are valid. Closures keep state co-located with the factory function; classes work better when state is complex or shared.

Use the Scheduler Directly

For more control over execution, use horus.Scheduler instead of horus.run():

sched = horus.Scheduler(tick_rate=100, watchdog_ms=500)
sched.add(make_sensor())
sched.add(make_filter())
sched.add(make_display())

# Run for 30 seconds, then stop automatically
sched.run(duration=30.0)

This gives you access to runtime mutation, safety configuration, and introspection methods. See the Python API Reference for the full Scheduler API.

Key Takeaways

  • Pipeline pattern: Data flows through stages (Sensor -> Filter -> Display), each a separate node
  • Pub/sub decoupling: Nodes only know about topic names, not each other
  • Execution order: order controls which node runs first in each tick cycle
  • recv() is non-blocking: Returns None when no message is available -- not an error
  • State in closures: Use dictionaries or class instances captured by tick functions
  • CLI introspection: horus topic echo, horus topic hz, and horus monitor work on live data from any terminal

Next Steps

Now that you have built a 3-node pipeline, try:

  1. Python API Reference -- Full horus.Node, horus.Scheduler, and horus.Topic docs
  2. Testing -- Learn how to unit test your nodes with tick_once()
  3. Message Types -- Use typed messages (horus.CmdVel, horus.Imu) instead of primitives
  4. Using Pre-Built Nodes -- Use library nodes instead of writing from scratch
  5. Choosing a Language -- When to use Python vs Rust

Full Code

The complete code above is production-ready. To save it:

  1. Copy the entire code block from Step 2
  2. Replace src/main.py in your project
  3. Run with horus run

For additional examples, see Python Examples.


See Also