Telemetry Logger (Python)
Subscribes to sensor and motor topics and writes timestamped data to a CSV file. Uses async def tick so file I/O runs on an async thread pool and never blocks the real-time control loop.
Problem
You need to log topic data to disk for post-run analysis without affecting the timing of real-time control nodes.
When To Use
- Recording flight/drive data for post-analysis
- Capturing sensor streams for offline algorithm development
- Debugging intermittent issues that require long recordings
Prerequisites
- HORUS installed (Installation Guide)
- Upstream nodes publishing topics you want to log
horus.toml
[package]
name = "telemetry-logger-py"
version = "0.1.0"
description = "Non-blocking topic logger to CSV (Python)"
language = "python"
Complete Code
#!/usr/bin/env python3
"""Async telemetry logger — writes CSV without blocking the control loop."""
import os
import time
import aiofiles
import asyncio
import horus
from horus import Node, Odometry, Imu, us, ms
# ── Configuration ────────────────────────────────────────────
LOG_FILE = f"telemetry_{int(time.time())}.csv"
CSV_HEADER = "tick,timestamp,odom_x,odom_y,odom_theta,odom_speed,imu_accel_z,imu_gyro_z\n"
# ── State ────────────────────────────────────────────────────
file_handle = [None]
tick_count = [0]
lines_written = [0]
# ── Node callbacks ───────────────────────────────────────────
async def logger_init(node):
"""Open log file and write CSV header — runs once before tick loop."""
file_handle[0] = await aiofiles.open(LOG_FILE, mode="w")
await file_handle[0].write(CSV_HEADER)
await file_handle[0].flush()
print(f"Logger: writing to {LOG_FILE}")
async def logger_tick(node):
"""Read topics and append a CSV line — async file I/O, never blocks RT nodes."""
tick_count[0] += 1
# IMPORTANT: always recv() every tick to drain buffers
odom = node.recv("pose.fused")
imu = node.recv("imu.raw")
# Use defaults if topics haven't published yet
odom_x = odom.x if odom else 0.0
odom_y = odom.y if odom else 0.0
odom_theta = odom.theta if odom else 0.0
odom_speed = odom.linear_velocity if odom else 0.0
imu_az = imu.accel_z if imu else 0.0
imu_gz = imu.gyro_z if imu else 0.0
# Write CSV line — async I/O so this never blocks the scheduler
if file_handle[0] is not None:
line = (
f"{tick_count[0]},"
f"{time.time():.6f},"
f"{odom_x:.4f},"
f"{odom_y:.4f},"
f"{odom_theta:.4f},"
f"{odom_speed:.4f},"
f"{imu_az:.4f},"
f"{imu_gz:.4f}\n"
)
await file_handle[0].write(line)
lines_written[0] += 1
# Flush every 100 lines to balance performance and data safety
if lines_written[0] % 100 == 0:
await file_handle[0].flush()
async def logger_shutdown(node):
"""Flush and close the log file."""
if file_handle[0] is not None:
await file_handle[0].flush()
await file_handle[0].close()
file_handle[0] = None
print(f"Logger: wrote {lines_written[0]} lines to {LOG_FILE}")
# ── Main ─────────────────────────────────────────────────────
logger_node = Node(
name="Logger",
tick=logger_tick, # async def — auto-detected, runs on async I/O pool
init=logger_init,
shutdown=logger_shutdown,
rate=10, # 10 Hz logging — enough for post-analysis
order=99, # Runs AFTER all data-producing nodes
subs=["pose.fused", "imu.raw"],
pubs=[],
)
if __name__ == "__main__":
horus.run(logger_node)
Expected Output
[HORUS] Scheduler running — tick_rate: 1000 Hz
Logger: writing to telemetry_1711036800.csv
[HORUS] Node "Logger" started (10 Hz)
^C
Logger: wrote 150 lines to telemetry_1711036800.csv
[HORUS] Shutting down...
[HORUS] Node "Logger" shutdown complete
Generated CSV:
tick,timestamp,odom_x,odom_y,odom_theta,odom_speed,imu_accel_z,imu_gyro_z
1,1711036800.100000,0.0000,0.0000,0.0000,0.0000,9.8100,0.0000
2,1711036800.200000,0.0100,0.0000,0.0100,0.5000,9.8100,0.0500
3,1711036800.300000,0.0200,0.0010,0.0200,0.5000,9.8100,0.0500
...
Synchronous Alternative
If you do not need async I/O (or aiofiles is not available), use a synchronous tick with compute=True to run on a thread pool:
import horus
from horus import Node
log_file = [None]
def sync_init(node):
log_file[0] = open("telemetry.csv", "w")
log_file[0].write("tick,data\n")
def sync_tick(node):
data = node.recv("sensor.data")
if log_file[0] and data:
log_file[0].write(f"{data}\n")
def sync_shutdown(node):
if log_file[0]:
log_file[0].flush()
log_file[0].close()
logger = Node(
name="SyncLogger",
tick=sync_tick,
init=sync_init,
shutdown=sync_shutdown,
rate=10,
order=99,
compute=True, # thread pool — file I/O won't block RT nodes
subs=["sensor.data"],
)
if __name__ == "__main__":
horus.run(logger)
Key Points
async def tickis auto-detected byhorus.Node— marks the node for async I/O execution classaiofilesprovides non-blocking file writes — the scheduler thread is never blocked by disk I/Oinit()opens the file once before the tick loop startsshutdown()flushes and closes the file — prevents data loss on Ctrl+C- Periodic flush (every 100 lines) balances write performance with crash-safety
- 10 Hz logging is typical for post-flight analysis; use 100 Hz+ for real-time debugging
order=99ensures the logger runs after all data-producing nodes have published
Variations
- Binary format: Replace CSV with MessagePack or struct packing for smaller files
- Rotating logs: Create a new file every N minutes or N MB to prevent single-file bloat
- Selective logging: Add a
log.enabletopic to toggle logging on/off at runtime - Network streaming: Replace file writes with UDP socket sends for live ground station telemetry
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| File is empty after run | shutdown() did not flush | Always call flush() and close() in shutdown() |
| CSV has all zeros | Topics not publishing before logger starts | Use defaults for missing data and check that upstream nodes are active |
| Disk fills up quickly | Logging at too high a rate | Reduce rate or switch to binary format |
| RT nodes slowed down | Logger not using async or compute mode | Use async def tick or compute=True |
ModuleNotFoundError: aiofiles | aiofiles not installed | Run pip install aiofiles or use the synchronous alternative |
See Also
- Telemetry Logger (Rust) — Rust version with
.async_io() - Multi-Sensor Fusion (Python) — Produces
pose.fusedtopic