Telemetry Logger (Python)

Subscribes to sensor and motor topics and writes timestamped data to a CSV file. Uses async def tick so file I/O runs on an async thread pool and never blocks the real-time control loop.

Problem

You need to log topic data to disk for post-run analysis without affecting the timing of real-time control nodes.

When To Use

  • Recording flight/drive data for post-analysis
  • Capturing sensor streams for offline algorithm development
  • Debugging intermittent issues that require long recordings

Prerequisites

horus.toml

[package]
name = "telemetry-logger-py"
version = "0.1.0"
description = "Non-blocking topic logger to CSV (Python)"
language = "python"

Complete Code

#!/usr/bin/env python3
"""Async telemetry logger — writes CSV without blocking the control loop."""

import os
import time
import aiofiles
import asyncio
import horus
from horus import Node, Odometry, Imu, us, ms

# ── Configuration ────────────────────────────────────────────

LOG_FILE = f"telemetry_{int(time.time())}.csv"
CSV_HEADER = "tick,timestamp,odom_x,odom_y,odom_theta,odom_speed,imu_accel_z,imu_gyro_z\n"

# ── State ────────────────────────────────────────────────────

file_handle = [None]
tick_count = [0]
lines_written = [0]

# ── Node callbacks ───────────────────────────────────────────

async def logger_init(node):
    """Open log file and write CSV header — runs once before tick loop."""
    file_handle[0] = await aiofiles.open(LOG_FILE, mode="w")
    await file_handle[0].write(CSV_HEADER)
    await file_handle[0].flush()
    print(f"Logger: writing to {LOG_FILE}")

async def logger_tick(node):
    """Read topics and append a CSV line — async file I/O, never blocks RT nodes."""
    tick_count[0] += 1

    # IMPORTANT: always recv() every tick to drain buffers
    odom = node.recv("pose.fused")
    imu = node.recv("imu.raw")

    # Use defaults if topics haven't published yet
    odom_x = odom.x if odom else 0.0
    odom_y = odom.y if odom else 0.0
    odom_theta = odom.theta if odom else 0.0
    odom_speed = odom.linear_velocity if odom else 0.0
    imu_az = imu.accel_z if imu else 0.0
    imu_gz = imu.gyro_z if imu else 0.0

    # Write CSV line — async I/O so this never blocks the scheduler
    if file_handle[0] is not None:
        line = (
            f"{tick_count[0]},"
            f"{time.time():.6f},"
            f"{odom_x:.4f},"
            f"{odom_y:.4f},"
            f"{odom_theta:.4f},"
            f"{odom_speed:.4f},"
            f"{imu_az:.4f},"
            f"{imu_gz:.4f}\n"
        )
        await file_handle[0].write(line)
        lines_written[0] += 1

        # Flush every 100 lines to balance performance and data safety
        if lines_written[0] % 100 == 0:
            await file_handle[0].flush()

async def logger_shutdown(node):
    """Flush and close the log file."""
    if file_handle[0] is not None:
        await file_handle[0].flush()
        await file_handle[0].close()
        file_handle[0] = None
    print(f"Logger: wrote {lines_written[0]} lines to {LOG_FILE}")

# ── Main ─────────────────────────────────────────────────────

logger_node = Node(
    name="Logger",
    tick=logger_tick,              # async def — auto-detected, runs on async I/O pool
    init=logger_init,
    shutdown=logger_shutdown,
    rate=10,                       # 10 Hz logging — enough for post-analysis
    order=99,                      # Runs AFTER all data-producing nodes
    subs=["pose.fused", "imu.raw"],
    pubs=[],
)

if __name__ == "__main__":
    horus.run(logger_node)

Expected Output

[HORUS] Scheduler running — tick_rate: 1000 Hz
Logger: writing to telemetry_1711036800.csv
[HORUS] Node "Logger" started (10 Hz)
^C
Logger: wrote 150 lines to telemetry_1711036800.csv
[HORUS] Shutting down...
[HORUS] Node "Logger" shutdown complete

Generated CSV:

tick,timestamp,odom_x,odom_y,odom_theta,odom_speed,imu_accel_z,imu_gyro_z
1,1711036800.100000,0.0000,0.0000,0.0000,0.0000,9.8100,0.0000
2,1711036800.200000,0.0100,0.0000,0.0100,0.5000,9.8100,0.0500
3,1711036800.300000,0.0200,0.0010,0.0200,0.5000,9.8100,0.0500
...

Synchronous Alternative

If you do not need async I/O (or aiofiles is not available), use a synchronous tick with compute=True to run on a thread pool:

import horus
from horus import Node

log_file = [None]

def sync_init(node):
    log_file[0] = open("telemetry.csv", "w")
    log_file[0].write("tick,data\n")

def sync_tick(node):
    data = node.recv("sensor.data")
    if log_file[0] and data:
        log_file[0].write(f"{data}\n")

def sync_shutdown(node):
    if log_file[0]:
        log_file[0].flush()
        log_file[0].close()

logger = Node(
    name="SyncLogger",
    tick=sync_tick,
    init=sync_init,
    shutdown=sync_shutdown,
    rate=10,
    order=99,
    compute=True,                  # thread pool — file I/O won't block RT nodes
    subs=["sensor.data"],
)

if __name__ == "__main__":
    horus.run(logger)

Key Points

  • async def tick is auto-detected by horus.Node — marks the node for async I/O execution class
  • aiofiles provides non-blocking file writes — the scheduler thread is never blocked by disk I/O
  • init() opens the file once before the tick loop starts
  • shutdown() flushes and closes the file — prevents data loss on Ctrl+C
  • Periodic flush (every 100 lines) balances write performance with crash-safety
  • 10 Hz logging is typical for post-flight analysis; use 100 Hz+ for real-time debugging
  • order=99 ensures the logger runs after all data-producing nodes have published

Variations

  • Binary format: Replace CSV with MessagePack or struct packing for smaller files
  • Rotating logs: Create a new file every N minutes or N MB to prevent single-file bloat
  • Selective logging: Add a log.enable topic to toggle logging on/off at runtime
  • Network streaming: Replace file writes with UDP socket sends for live ground station telemetry

Common Errors

SymptomCauseFix
File is empty after runshutdown() did not flushAlways call flush() and close() in shutdown()
CSV has all zerosTopics not publishing before logger startsUse defaults for missing data and check that upstream nodes are active
Disk fills up quicklyLogging at too high a rateReduce rate or switch to binary format
RT nodes slowed downLogger not using async or compute modeUse async def tick or compute=True
ModuleNotFoundError: aiofilesaiofiles not installedRun pip install aiofiles or use the synchronous alternative

See Also