AI Context — Complete Framework Reference
Mental Model
HORUS is a tick-based robotics runtime. Applications are composed of Nodes (units of computation) that exchange data through Topics (typed pub/sub channels backed by shared memory). A Scheduler orchestrates node execution in priority order each tick cycle. Communication is local shared memory IPC with latencies from ~3ns (same-thread) to ~167ns (cross-process). Systems can be single-process (all nodes in one scheduler) or multi-process (nodes in separate processes sharing topics via /dev/shm/horus/). There are no callbacks — nodes implement tick() which the scheduler calls each cycle.
Node Trait (Complete)
All nodes implement the Node trait. Only tick() is required; all others have defaults.
use horus::prelude::*;
| Method | Signature | Default | Description |
|---|---|---|---|
name | fn name(&self) -> &str | Required | Unique node identifier |
tick | fn tick(&mut self) | Required | Called every scheduler cycle; do work here |
init | fn init(&mut self) -> Result<()> | Ok(()) | Called once before first tick; open hardware, allocate buffers |
shutdown | fn shutdown(&mut self) -> Result<()> | Ok(()) | Called on Ctrl+C / scheduler stop; release hardware, zero actuators |
publishers | fn publishers(&self) -> Vec<String> | vec![] | Declare published topic names for introspection |
subscribers | fn subscribers(&self) -> Vec<String> | vec![] | Declare subscribed topic names for introspection |
on_error | fn on_error(&mut self, error: &Error) | no-op | Called when tick() panics or returns error |
is_safe_state | fn is_safe_state(&self) -> bool | true | Query whether node is in a safe state |
enter_safe_state | fn enter_safe_state(&mut self) | no-op | Called by safety monitor on deadline miss with Miss::SafeMode |
NodeBuilder API (Complete)
Chain after scheduler.add(node). Finalize with .build().
| Method | Parameter | Effect |
|---|---|---|
.order(n) | u32 | Execution priority; lower = runs first. 0-9 critical, 10-49 high, 50-99 normal, 100-199 low, 200+ background |
.rate(freq) | Frequency | Per-node tick rate. Auto-derives budget (80% period), deadline (95% period). Auto-marks as RT |
.budget(dur) | Duration | Max allowed tick execution time. Auto-marks as RT |
.deadline(dur) | Duration | Hard deadline for tick completion. Auto-marks as RT |
.on_miss(policy) | Miss | Deadline miss policy: Warn, Skip, SafeMode, Stop |
.compute() | — | Mark as CPU-heavy compute node (may use worker threads) |
.on(topic) | &str | Event-driven: node ticks only when topic receives a message |
.async_io() | — | Mark as async I/O node (non-blocking network/file) |
.monitoring(bool) | bool | Enable per-node monitoring |
.prefer_rt() | — | Request RT features, degrade gracefully |
.require_rt() | — | Require RT features, panic if unavailable |
.deterministic(bool) | bool | Enable deterministic execution mode |
.max_deadline_misses(n) | u32 | Emergency stop after n misses |
.cores(list) | &[usize] | Pin node to specific CPU cores |
.with_profiling() | — | Enable per-node profiling |
.with_blackbox(mb) | usize | Per-node blackbox buffer in MB |
.verbose(bool) | bool | Enable/disable logging for this node |
.failure_policy(p) | FailurePolicy | Override failure handling: Fatal, Restart, Skip, Ignore |
.build() | — | Finalize and register the node. Returns Result<()> |
RT auto-detection: Setting .budget(), .deadline(), or .rate(Frequency) automatically sets is_rt=true and ExecutionClass::Rt. There is no .rt() method. This is order-independent via deferred finalize().
Topic API (Complete)
Topic<T> requires T: Clone + Serialize + Deserialize + Send + Sync + 'static.
| Method | Signature | Behavior |
|---|---|---|
new | Topic::new(name: &str) -> Result<Self> | Create/connect to a named topic. Auto-selects optimal IPC backend |
with_capacity | Topic::with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> Result<Self> | Custom ring buffer capacity |
send | fn send(&self, msg: T) | Publish message. Infallible. Overwrites oldest on buffer full |
try_send | fn try_send(&self, msg: T) -> Result<(), T> | Attempt send; returns message on failure |
send_blocking | fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError> | Block until space available or timeout. For critical commands |
recv | fn recv(&self) -> Option<T> | Non-blocking receive. Returns None if empty |
try_recv | fn try_recv(&self) -> Option<T> | Same as recv() for most use cases |
read_latest | fn read_latest(&self) -> Option<T> where T: Copy | Read latest without advancing consumer position. Requires T: Copy |
has_message | fn has_message(&self) -> bool | Check if messages are available without consuming |
pending_count | fn pending_count(&self) -> u64 | Number of messages waiting |
name | fn name(&self) -> &str | Topic name |
metrics | fn metrics(&self) -> TopicMetrics | Message counts, send/recv failures |
dropped_count | fn dropped_count(&self) -> u64 | Messages lost to buffer overflow |
Scheduler API
| Method | Signature | Description |
|---|---|---|
new | Scheduler::new() -> Scheduler | Create with auto-detected capabilities (~30-100us) |
.tick_rate(freq) | Frequency | Global tick rate (default: 100 Hz) |
.prefer_rt() | — | Try RT features, degrade gracefully |
.require_rt() | — | Enable RT features, panic if unavailable |
.watchdog(dur) | Duration | Frozen node detection, auto-creates safety monitor |
.deterministic(bool) | bool | Enable deterministic mode |
.with_blackbox(mb) | usize | BlackBox flight recorder buffer |
.with_recording() | — | Enable record/replay |
.max_deadline_misses(n) | u32 | Emergency stop after n deadline misses (default: 100) |
.verbose(bool) | bool | Enable/disable non-emergency logging |
.add(node) | impl Node | Add node, returns chainable builder |
.run() | -> Result<()> | Main loop until Ctrl+C |
.run_for(dur) | Duration -> Result<()> | Run for specific duration |
.tick_once() | -> Result<()> | Execute exactly one tick cycle |
.tick(names) | &[&str] -> Result<()> | One tick cycle for named nodes only |
.set_node_rate(name, freq) | &str, Frequency | Change per-node rate at runtime |
.stop() | — | Stop the scheduler |
.is_running() | -> bool | Check if running |
.metrics() | -> Vec<NodeMetrics> | Per-node performance metrics |
.safety_stats() | -> Option<SafetyStats> | Budget overruns, deadline misses, watchdog expirations |
.node_list() | -> Vec<String> | Registered node names |
DurationExt and Frequency
| Method | Type | Example | Result |
|---|---|---|---|
.ns() | Duration | 500.ns() | 500 nanoseconds |
.us() | Duration | 200.us() | 200 microseconds |
.ms() | Duration | 10.ms() | 10 milliseconds |
.secs() | Duration | 1.secs() | 1 second |
.hz() | Frequency | 100.hz() | 100 Hz (period = 10ms) |
Frequency methods: value() -> f64, period() -> Duration, budget_default() -> Duration (80% period), deadline_default() -> Duration (95% period).
Validation: Frequency panics on 0, negative, NaN, or infinity.
Works on u64, f64, i32. Import via use horus::prelude::*;.
Execution Classes
| Class | When Used | Thread Model | How Triggered |
|---|---|---|---|
| Rt | Real-time nodes with timing guarantees | Priority-scheduled, optional CPU pinning | Auto: set .budget(), .deadline(), or .rate() |
| Compute | CPU-heavy work (ML inference, path planning) | May use worker thread pool | .compute() |
| Event | React to incoming data | Wakes on topic message | .on("topic.name") |
| AsyncIo | Network, file, database I/O | Non-blocking async runtime | .async_io() |
| BestEffort | Default; no special scheduling | Runs in tick order | No method called (default) |
Execution classes are mutually exclusive per node. RT is always implicit from timing parameters.
Miss (Deadline Policy)
| Policy | What Happens | Use For |
|---|---|---|
Miss::Warn | Log warning, continue normally | Soft RT (logging, UI). Default |
Miss::Skip | Skip this node for current tick | Firm RT (video encoding) |
Miss::SafeMode | Call enter_safe_state() on the node | Motor controllers, safety nodes |
Miss::Stop | Stop entire scheduler | Hard RT safety-critical |
Safety Rules (CRITICAL)
Rule one: Always implement shutdown() for actuators. Without it, motors continue at last velocity on Ctrl+C.
fn shutdown(&mut self) -> Result<()> {
self.motor.set_velocity(0.0);
Ok(())
}
Rule two: Always call recv() every tick. Ring buffers overwrite old messages. Skipping ticks loses data.
fn tick(&mut self) {
// ALWAYS recv, cache result
if let Some(msg) = self.sub.recv() {
self.cached = Some(msg);
}
// Then use cached value
}
Rule three: Never sleep() in tick(). It blocks the entire scheduler. All nodes share the tick cycle.
// BAD: std::thread::sleep(Duration::from_millis(100));
// GOOD: Use scheduler rate control instead
Rule four: Never do blocking I/O in tick(). File reads, network calls, and database queries belong in init() or in an .async_io() node.
// BAD: let data = std::fs::read_to_string("file.txt")?;
// GOOD: Read in init(), use cached data in tick()
Rule five: Use dots not slashes in topic names. Slashes conflict with shared memory paths and fail on macOS.
// CORRECT: Topic::new("sensors.lidar")
// WRONG: Topic::new("sensors/lidar")
Common Patterns
Pattern: Publisher
use horus::prelude::*;
struct TempSensor {
pub_topic: Topic<f32>,
}
impl TempSensor {
fn new() -> Result<Self> {
Ok(Self { pub_topic: Topic::new("sensor.temperature")? })
}
}
impl Node for TempSensor {
fn name(&self) -> &str { "TempSensor" }
fn tick(&mut self) {
self.pub_topic.send(self.read_temperature());
}
}
Pattern: Subscriber
use horus::prelude::*;
struct Logger {
sub: Topic<f32>,
}
impl Logger {
fn new() -> Result<Self> {
Ok(Self { sub: Topic::new("sensor.temperature")? })
}
}
impl Node for Logger {
fn name(&self) -> &str { "Logger" }
fn tick(&mut self) {
if let Some(temp) = self.sub.recv() {
hlog!(info, "Temperature: {:.1}C", temp);
}
}
}
Pattern: Pub+Sub Pipeline
use horus::prelude::*;
struct Filter {
scan_sub: Topic<LaserScan>,
cmd_pub: Topic<CmdVel>,
}
impl Filter {
fn new() -> Result<Self> {
Ok(Self {
scan_sub: Topic::new("scan")?,
cmd_pub: Topic::new("cmd_vel")?,
})
}
}
impl Node for Filter {
fn name(&self) -> &str { "Filter" }
fn tick(&mut self) {
if let Some(scan) = self.scan_sub.recv() {
if let Some(min) = scan.min_range() {
if min < 0.5 {
self.cmd_pub.send(CmdVel::zero());
} else {
self.cmd_pub.send(CmdVel::new(1.0, 0.0));
}
}
}
}
}
Pattern: Multi-Rate System
use horus::prelude::*;
fn main() -> Result<()> {
let mut scheduler = Scheduler::new()
.tick_rate(1000.hz())
.watchdog(500.ms());
scheduler.add(ImuSensor::new()?).order(0).rate(100.hz()).build()?;
scheduler.add(PidController::new()?).order(10).rate(50.hz())
.budget(500.us()).on_miss(Miss::Skip).build()?;
scheduler.add(PathPlanner::new()?).order(50).rate(10.hz())
.compute().build()?;
scheduler.add(TelemetryLogger::new()?).order(200).rate(1.hz())
.async_io().build()?;
scheduler.run()
}
Pattern: State Machine
Always call recv() unconditionally, outside the state match.
use horus::prelude::*;
struct StateMachine {
cmd_sub: Topic<CmdVel>,
motor_pub: Topic<MotorCommand>,
state: State,
last_cmd: Option<CmdVel>,
}
enum State { Idle, Moving, Stopping }
impl Node for StateMachine {
fn name(&self) -> &str { "StateMachine" }
fn tick(&mut self) {
// ALWAYS recv first, regardless of state
if let Some(cmd) = self.cmd_sub.recv() {
self.last_cmd = Some(cmd);
}
match self.state {
State::Idle => {
if let Some(ref cmd) = self.last_cmd {
if cmd.linear.abs() > 0.01 {
self.state = State::Moving;
}
}
}
State::Moving => {
if let Some(ref cmd) = self.last_cmd {
self.motor_pub.send(MotorCommand::from_cmd_vel(cmd));
if cmd.linear.abs() < 0.01 {
self.state = State::Stopping;
}
}
}
State::Stopping => {
self.motor_pub.send(MotorCommand::stop());
self.state = State::Idle;
self.last_cmd = None;
}
}
}
fn shutdown(&mut self) -> Result<()> {
self.motor_pub.send(MotorCommand::stop());
Ok(())
}
}
Pattern: Aggregator with Caching
Synchronize multiple topics by caching the latest value from each.
use horus::prelude::*;
struct Aggregator {
imu_sub: Topic<Imu>,
odom_sub: Topic<Odometry>,
fused_pub: Topic<Pose2D>,
last_imu: Option<Imu>,
last_odom: Option<Odometry>,
}
impl Node for Aggregator {
fn name(&self) -> &str { "Aggregator" }
fn tick(&mut self) {
// Always drain both topics
if let Some(imu) = self.imu_sub.recv() {
self.last_imu = Some(imu);
}
if let Some(odom) = self.odom_sub.recv() {
self.last_odom = Some(odom);
}
// Fuse when both are available
if let (Some(ref imu), Some(ref odom)) = (&self.last_imu, &self.last_odom) {
let fused = self.fuse(imu, odom);
self.fused_pub.send(fused);
}
}
}
Standard Message Types
All types available via use horus::prelude::*;. All fixed-size types support zero-copy shared memory transport.
Geometry
| Type | Key Fields |
|---|---|
Twist | linear: [f64; 3], angular: [f64; 3], timestamp_ns: u64 |
CmdVel | linear: f32, angular: f32, timestamp_ns: u64 |
Pose2D | x: f64, y: f64, theta: f64, timestamp_ns: u64 |
Pose3D | position: Point3, orientation: Quaternion, timestamp_ns: u64 |
PoseStamped | pose: Pose3D, frame_id: [u8; 32] |
PoseWithCovariance | pose: Pose3D, covariance: [f64; 36] |
TwistWithCovariance | twist: Twist, covariance: [f64; 36] |
Point3 | x: f64, y: f64, z: f64 |
Vector3 | x: f64, y: f64, z: f64 |
Quaternion | x: f64, y: f64, z: f64, w: f64 |
TransformStamped | translation: [f64; 3], rotation: [f64; 4] |
Accel / AccelStamped | linear: [f64; 3], angular: [f64; 3] |
Sensors
| Type | Key Fields |
|---|---|
Imu | orientation: [f64; 4], angular_velocity: [f64; 3], linear_acceleration: [f64; 3], covariance arrays |
LaserScan | ranges: [f32; 360], angle_min/max: f32, range_min/max: f32 |
Odometry | pose: Pose2D, twist: Twist, covariance arrays, frame IDs |
JointState | names: [[u8;32];16], positions/velocities/efforts: [f64;16], joint_count: u8 |
NavSatFix | latitude/longitude/altitude: f64, status: u8, satellites_visible: u16 |
BatteryState | voltage/current/charge/capacity: f32, percentage: f32, temperature: f32 |
RangeSensor | range: f32, sensor_type: u8, field_of_view: f32 |
Temperature | temperature: f64, variance: f64 |
FluidPressure | fluid_pressure: f64, variance: f64 |
Illuminance | illuminance: f64, variance: f64 |
MagneticField | magnetic_field: [f64; 3], covariance: [f64; 9] |
Control
| Type | Key Fields |
|---|---|
MotorCommand | motor_id: u8, mode: u8 (0=vel,1=pos,2=torque,3=voltage), target: f64 |
ServoCommand | servo_id: u8, position: f32 (rad), speed: f32 (0-1) |
JointCommand | Joint-level position/velocity/torque commands |
PidConfig | kp/ki/kd: f64, integral_limit: f64, output_limit: f64 |
DifferentialDriveCommand | left_velocity: f64, right_velocity: f64 (rad/s) |
TrajectoryPoint | position: [f64;3], velocity: [f64;3], orientation: [f64;4], time_from_start: f64 |
Navigation
| Type | Key Fields |
|---|---|
NavGoal | target_pose: Pose2D, tolerance_position/angle: f64, timeout_seconds: f64 |
NavPath | waypoints: [Waypoint; 256], waypoint_count: u16, total_length: f64 |
OccupancyGrid | Grid-based occupancy map (variable-size) |
CostMap | Navigation cost map (variable-size) |
PathPlan | Planned path with algorithm metadata |
Vision
| Type | Key Fields |
|---|---|
Image | Pool-backed RAII. Image::new(w, h, encoding)?. Zero-copy via shared memory pool |
DepthImage | Pool-backed RAII. F32 or U16 depth data |
CompressedImage | format: [u8;8], data: Vec<u8> (variable-size, MessagePack serialized) |
CameraInfo | width/height: u32, camera_matrix: [f64;9], distortion_coefficients: [f64;8] |
RegionOfInterest | x_offset/y_offset/width/height: u32, do_rectify: bool |
Perception
| Type | Key Fields |
|---|---|
Detection | bbox: BoundingBox2D, confidence: f32, class_id: u32 |
Detection3D | bbox: BoundingBox3D, confidence: f32, velocity_x/y/z: f32 |
BoundingBox2D | x/y/width/height: f32 (pixels) |
BoundingBox3D | cx/cy/cz/length/width/height: f32 (meters), roll/pitch/yaw: f32 |
PointCloud | Pool-backed RAII. PointXYZ, PointXYZI, PointXYZRGB formats |
Landmark / Landmark3D | x/y(/z): f32, visibility: f32, index: u32 |
LandmarkArray | Up to N landmarks. Presets: coco_pose(), mediapipe_pose/hand/face() |
SegmentationMask | width/height: u32, num_classes: u32, mask_type: u32 |
PlaneDetection | coefficients: [f64;4], center: Point3, normal: Vector3 |
TrackedObject | Object tracking with ID and velocity |
Force and Impedance
| Type | Key Fields |
|---|---|
WrenchStamped | force: [f64;3], torque: [f64;3], frame_id |
ForceCommand | Force/torque command for compliant control |
ImpedanceParameters | Stiffness, damping, inertia for impedance control |
Diagnostics
| Type | Key Fields |
|---|---|
DiagnosticReport | component: [u8;32], up to 16 DiagnosticValue entries |
DiagnosticStatus | Status level + message |
DiagnosticValue | Typed key-value: string(), int(), float(), bool() |
EmergencyStop | Emergency stop command |
Heartbeat / NodeHeartbeat | Periodic health signal with tick count and rate |
SafetyStatus | Overall system safety state |
ResourceUsage | CPU, memory, disk usage |
Input
| Type | Key Fields |
|---|---|
JoystickInput | Axes, buttons, hat switches for teleoperation |
KeyboardInput | Key events for HID control |
Clock
| Type | Key Fields |
|---|---|
Clock | Simulation/wall time. Sources: SOURCE_WALL, SOURCE_SIM, SOURCE_REPLAY |
TimeReference | Time synchronization reference |
Python API Quick Reference
Functional Node
import horus
def my_tick(node):
node.send("temperature", 25.5)
sensor = horus.Node(
name="temp_sensor",
pubs="temperature",
tick=my_tick,
rate=10
)
horus.run(sensor, duration=30)
Stateful Node (class container)
import horus
from horus import CmdVel
class DriveState:
def tick(self, node):
node.send("cmd_vel", CmdVel(linear=0.5, angular=0.0))
def shutdown(self, node):
node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))
drive = DriveState()
node = horus.Node(name="drive_node", tick=drive.tick,
shutdown=drive.shutdown, pubs=["cmd_vel"],
rate=50, order=0)
Run
horus.run(node)
One-Liner
horus.run(sensor_node, controller_node, duration=60)
Topic
from horus import Topic, CmdVel
pub = Topic("cmd_vel", CmdVel)
pub.send(CmdVel(linear=1.0, angular=0.0))
sub = Topic("cmd_vel", CmdVel)
msg = sub.recv() # Returns None if empty
Functional recv
def process(node):
if node.has_msg("scan"):
scan = node.recv("scan")
all_scans = node.recv_all("scan")
CLI Quick Reference
| Command | Usage |
|---|---|
horus new <name> | Create new project with horus.toml + src/ |
horus run [files...] | Build and run application |
horus build | Build without running |
horus test [filter] | Run tests |
horus check | Validate horus.toml and workspace |
horus clean --shm | Clean stale shared memory regions |
horus monitor | Web + TUI monitoring dashboard |
horus topic list | List active topics |
horus topic echo <name> | Print messages on a topic |
horus node list | List running nodes |
horus tf tree | Print transform frame tree |
horus install <pkg> | Install package from registry |
horus launch <file.yaml> | Launch multi-node system from YAML |
horus param get <key> | Get runtime parameter |
horus deploy [target] | Deploy to remote robot |
horus doctor | Comprehensive health check |
horus fmt | Format code (Rust + Python) |
horus lint | Lint code (clippy + ruff) |
horus.toml Format
horus.toml is the single source of truth. Native build files (Cargo.toml, pyproject.toml) are generated into .horus/ automatically.
[package]
name = "my-robot"
version = "0.1.0"
description = "My robot project"
authors = ["Name <email>"]
[dependencies]
# Rust deps (auto-detected as crates.io)
serde = { version = "1.0", source = "crates.io", features = ["derive"] }
nalgebra = "0.32"
# Python deps (specify source = "pypi")
numpy = { version = ">=1.24", source = "pypi" }
torch = { version = ">=2.0", source = "pypi" }
# System deps
libudev = { version = "*", source = "system" }
# Path deps
my_lib = { path = "../my_lib" }
# Git deps
some_crate = { git = "https://github.com/user/repo", branch = "main" }
[dev-dependencies]
criterion = { version = "0.5", source = "crates.io" }
pytest = { version = ">=7.0", source = "pypi" }
[scripts]
sim = "horus sim start --world warehouse"
deploy = "horus deploy pi@robot --release"
test-hw = "horus run tests/hardware_check.rs"
[drivers]
realsense = { version = "0.3" }
dynamixel = { version = "0.2" }
[hooks]
pre_run = ["fmt", "lint"]
post_build = ["test"]
Dependency sources: crates.io (Rust, default), pypi (Python), system, path, git.
Generated files: horus build creates .horus/Cargo.toml and .horus/pyproject.toml. Never edit these directly.
Import Pattern
use horus::prelude::*;
// Provides: Node, Topic, Scheduler, DurationExt, Frequency, Miss,
// all message types, error types, macros, services, actions, etc.
// 165+ types in one import.
Custom Messages
use serde::{Serialize, Deserialize};
#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
x: f32,
y: f32,
label: String,
}
let topic: Topic<MyMessage> = Topic::new("my.data")?;
Macros
| Macro | Purpose |
|---|---|
message! | Define custom message types |
service! | Define request/response service types |
action! | Define long-running action types (goal/feedback/result) |
node! | Define node with automatic topic registration |
topics! | Compile-time topic name + type descriptors |
hlog!(level, ...) | Structured node logging |
hlog_once!(level, ...) | Log once per execution |
hlog_every!(n, level, ...) | Throttled logging every n calls |
Error Types
use horus::prelude::*;
// Error, Result<T>, HorusError
// Variants: CommunicationError, ConfigError, MemoryError, NodeError,
// NotFoundError, ParseError, ResourceError, SerializationError,
// TimeoutError, TransformError, ValidationError
// Helpers: retry_transient(), RetryConfig
Performance Reference
| Metric | Value |
|---|---|
| Same-thread topic | ~3 ns |
| Same-process 1:1 | ~18 ns |
| Same-process N:M | ~36 ns |
| Cross-process | ~50-167 ns |
| Scheduler tick overhead | ~50-100 ns |
| Shared memory allocation | ~100 ns |