Telemetry Logger
Subscribes to topics and writes them to a CSV log file using .async_io() execution class. File I/O happens on a Tokio blocking thread — it never blocks or delays the real-time control loop.
horus.toml
[package]
name = "telemetry-logger"
version = "0.1.0"
description = "Non-blocking topic logger to CSV"
Complete Code
use horus::prelude::*;
use std::fs::File;
use std::io::Write as IoWrite;
/// Pose data to log
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct FusedPose {
x: f32,
y: f32,
theta: f32,
speed: f32,
confidence: f32,
}
/// Motor telemetry to log
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct MotorTelemetry {
left_rpm: f32,
right_rpm: f32,
battery_voltage: f32,
}
// ── Logger Node ─────────────────────────────────────────────
struct LoggerNode {
pose_sub: Topic<FusedPose>,
motor_sub: Topic<MotorTelemetry>,
file: Option<File>,
line_count: u64,
}
impl LoggerNode {
fn new() -> Result<Self> {
Ok(Self {
pose_sub: Topic::new("pose.fused")?,
motor_sub: Topic::new("motor.telemetry")?,
file: None,
line_count: 0,
})
}
}
impl Node for LoggerNode {
fn name(&self) -> &str { "Logger" }
fn init(&mut self) -> Result<()> {
// Open log file in init() — runs once before tick loop
let mut f = File::create("telemetry.csv")
.map_err(|e| Error::Config(
format!("Failed to create log file: {}", e)
)))?;
writeln!(f, "tick,x,y,theta,speed,confidence,left_rpm,right_rpm,battery_v")
.map_err(|e| Error::Config(
format!("Failed to write header: {}", e)
)))?;
self.file = Some(f);
Ok(())
}
fn tick(&mut self) {
self.line_count += 1;
// IMPORTANT: always recv() every tick to drain buffers
let pose = self.pose_sub.recv().unwrap_or_default();
let motor = self.motor_sub.recv().unwrap_or_default();
// Write CSV line — file I/O is safe here because we use async_io()
if let Some(ref mut f) = self.file {
let _ = writeln!(
f,
"{},{:.4},{:.4},{:.4},{:.4},{:.2},{:.1},{:.1},{:.2}",
self.line_count,
pose.x, pose.y, pose.theta, pose.speed, pose.confidence,
motor.left_rpm, motor.right_rpm, motor.battery_voltage,
);
}
}
fn shutdown(&mut self) -> Result<()> {
// Flush and close the file
if let Some(ref mut f) = self.file {
let _ = f.flush();
}
self.file = None;
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Execution order: logger runs on async I/O thread pool — never blocks RT nodes
scheduler.add(LoggerNode::new()?)
.order(99) // runs after all data-producing nodes
.async_io() // Tokio blocking pool — file I/O is safe
.rate(10_u64.hz()) // 10Hz logging — enough for post-analysis
.build()?;
scheduler.run()
}
Expected Output
[HORUS] Scheduler running — tick_rate: 10 Hz
[HORUS] Node "Logger" started (AsyncIo, 10 Hz)
^C
[HORUS] Shutting down...
[HORUS] Node "Logger" shutdown complete
Generated telemetry.csv:
tick,x,y,theta,speed,confidence,left_rpm,right_rpm,battery_v
1,0.0000,0.0000,0.0000,0.0000,0.00,0.0,0.0,0.00
2,0.0100,0.0000,0.0100,0.5000,0.90,45.0,47.0,12.40
...
Key Points
.async_io()runs the node on a Tokio blocking thread pool — file writes never block the RT schedulerinit()opens the file once before the tick loop startsshutdown()flushes and closes the file — prevents data lossunwrap_or_default()on recv — logger uses default (zeros) if a topic hasn't published yet- 10Hz logging is typical for post-flight analysis; use 100Hz+ for real-time debugging
- Combine with any other recipe — just match the topic names and message types