AI Context — Complete Framework Reference

Mental Model

HORUS is a tick-based robotics runtime. Applications are composed of Nodes (units of computation) that exchange data through Topics (typed pub/sub channels backed by shared memory). A Scheduler orchestrates node execution in priority order each tick cycle. Communication is local shared memory IPC with latencies from ~3ns (same-thread) to ~167ns (cross-process). Systems can be single-process (all nodes in one scheduler) or multi-process (nodes in separate processes sharing topics via /dev/shm/horus/). There are no callbacks — nodes implement tick() which the scheduler calls each cycle.


Node Trait (Complete)

All nodes implement the Node trait. Only tick() is required; all others have defaults.

use horus::prelude::*;
MethodSignatureDefaultDescription
namefn name(&self) -> &strRequiredUnique node identifier
tickfn tick(&mut self)RequiredCalled every scheduler cycle; do work here
initfn init(&mut self) -> Result<()>Ok(())Called once before first tick; open hardware, allocate buffers
shutdownfn shutdown(&mut self) -> Result<()>Ok(())Called on Ctrl+C / scheduler stop; release hardware, zero actuators
publishersfn publishers(&self) -> Vec<String>vec![]Declare published topic names for introspection
subscribersfn subscribers(&self) -> Vec<String>vec![]Declare subscribed topic names for introspection
on_errorfn on_error(&mut self, error: &Error)no-opCalled when tick() panics or returns error
is_safe_statefn is_safe_state(&self) -> booltrueQuery whether node is in a safe state
enter_safe_statefn enter_safe_state(&mut self)no-opCalled by safety monitor on deadline miss with Miss::SafeMode

NodeBuilder API (Complete)

Chain after scheduler.add(node). Finalize with .build().

MethodParameterEffect
.order(n)u32Execution priority; lower = runs first. 0-9 critical, 10-49 high, 50-99 normal, 100-199 low, 200+ background
.rate(freq)FrequencyPer-node tick rate. Auto-derives budget (80% period), deadline (95% period). Auto-marks as RT
.budget(dur)DurationMax allowed tick execution time. Auto-marks as RT
.deadline(dur)DurationHard deadline for tick completion. Auto-marks as RT
.on_miss(policy)MissDeadline miss policy: Warn, Skip, SafeMode, Stop
.compute()Mark as CPU-heavy compute node (may use worker threads)
.on(topic)&strEvent-driven: node ticks only when topic receives a message
.async_io()Mark as async I/O node (non-blocking network/file)
.monitoring(bool)boolEnable per-node monitoring
.prefer_rt()Request RT features, degrade gracefully
.require_rt()Require RT features, panic if unavailable
.deterministic(bool)boolEnable deterministic execution mode
.max_deadline_misses(n)u32Emergency stop after n misses
.cores(list)&[usize]Pin node to specific CPU cores
.with_profiling()Enable per-node profiling
.with_blackbox(mb)usizePer-node blackbox buffer in MB
.verbose(bool)boolEnable/disable logging for this node
.failure_policy(p)FailurePolicyOverride failure handling: Fatal, Restart, Skip, Ignore
.build()Finalize and register the node. Returns Result<()>

RT auto-detection: Setting .budget(), .deadline(), or .rate(Frequency) automatically sets is_rt=true and ExecutionClass::Rt. There is no .rt() method. This is order-independent via deferred finalize().


Topic API (Complete)

Topic<T> requires T: Clone + Serialize + Deserialize + Send + Sync + 'static.

MethodSignatureBehavior
newTopic::new(name: &str) -> Result<Self>Create/connect to a named topic. Auto-selects optimal IPC backend
with_capacityTopic::with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> Result<Self>Custom ring buffer capacity
sendfn send(&self, msg: T)Publish message. Infallible. Overwrites oldest on buffer full
try_sendfn try_send(&self, msg: T) -> Result<(), T>Attempt send; returns message on failure
send_blockingfn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>Block until space available or timeout. For critical commands
recvfn recv(&self) -> Option<T>Non-blocking receive. Returns None if empty
try_recvfn try_recv(&self) -> Option<T>Same as recv() for most use cases
read_latestfn read_latest(&self) -> Option<T> where T: CopyRead latest without advancing consumer position. Requires T: Copy
has_messagefn has_message(&self) -> boolCheck if messages are available without consuming
pending_countfn pending_count(&self) -> u64Number of messages waiting
namefn name(&self) -> &strTopic name
metricsfn metrics(&self) -> TopicMetricsMessage counts, send/recv failures
dropped_countfn dropped_count(&self) -> u64Messages lost to buffer overflow

Scheduler API

MethodSignatureDescription
newScheduler::new() -> SchedulerCreate with auto-detected capabilities (~30-100us)
.tick_rate(freq)FrequencyGlobal tick rate (default: 100 Hz)
.prefer_rt()Try RT features, degrade gracefully
.require_rt()Enable RT features, panic if unavailable
.watchdog(dur)DurationFrozen node detection, auto-creates safety monitor
.deterministic(bool)boolEnable deterministic mode
.with_blackbox(mb)usizeBlackBox flight recorder buffer
.with_recording()Enable record/replay
.max_deadline_misses(n)u32Emergency stop after n deadline misses (default: 100)
.verbose(bool)boolEnable/disable non-emergency logging
.add(node)impl NodeAdd node, returns chainable builder
.run()-> Result<()>Main loop until Ctrl+C
.run_for(dur)Duration -> Result<()>Run for specific duration
.tick_once()-> Result<()>Execute exactly one tick cycle
.tick(names)&[&str] -> Result<()>One tick cycle for named nodes only
.set_node_rate(name, freq)&str, FrequencyChange per-node rate at runtime
.stop()Stop the scheduler
.is_running()-> boolCheck if running
.metrics()-> Vec<NodeMetrics>Per-node performance metrics
.safety_stats()-> Option<SafetyStats>Budget overruns, deadline misses, watchdog expirations
.node_list()-> Vec<String>Registered node names

DurationExt and Frequency

MethodTypeExampleResult
.ns()Duration500.ns()500 nanoseconds
.us()Duration200.us()200 microseconds
.ms()Duration10.ms()10 milliseconds
.secs()Duration1.secs()1 second
.hz()Frequency100.hz()100 Hz (period = 10ms)

Frequency methods: value() -> f64, period() -> Duration, budget_default() -> Duration (80% period), deadline_default() -> Duration (95% period).

Validation: Frequency panics on 0, negative, NaN, or infinity.

Works on u64, f64, i32. Import via use horus::prelude::*;.


Execution Classes

ClassWhen UsedThread ModelHow Triggered
RtReal-time nodes with timing guaranteesPriority-scheduled, optional CPU pinningAuto: set .budget(), .deadline(), or .rate()
ComputeCPU-heavy work (ML inference, path planning)May use worker thread pool.compute()
EventReact to incoming dataWakes on topic message.on("topic.name")
AsyncIoNetwork, file, database I/ONon-blocking async runtime.async_io()
BestEffortDefault; no special schedulingRuns in tick orderNo method called (default)

Execution classes are mutually exclusive per node. RT is always implicit from timing parameters.


Miss (Deadline Policy)

PolicyWhat HappensUse For
Miss::WarnLog warning, continue normallySoft RT (logging, UI). Default
Miss::SkipSkip this node for current tickFirm RT (video encoding)
Miss::SafeModeCall enter_safe_state() on the nodeMotor controllers, safety nodes
Miss::StopStop entire schedulerHard RT safety-critical

Safety Rules (CRITICAL)

Rule one: Always implement shutdown() for actuators. Without it, motors continue at last velocity on Ctrl+C.

fn shutdown(&mut self) -> Result<()> {
    self.motor.set_velocity(0.0);
    Ok(())
}

Rule two: Always call recv() every tick. Ring buffers overwrite old messages. Skipping ticks loses data.

fn tick(&mut self) {
    // ALWAYS recv, cache result
    if let Some(msg) = self.sub.recv() {
        self.cached = Some(msg);
    }
    // Then use cached value
}

Rule three: Never sleep() in tick(). It blocks the entire scheduler. All nodes share the tick cycle.

// BAD: std::thread::sleep(Duration::from_millis(100));
// GOOD: Use scheduler rate control instead

Rule four: Never do blocking I/O in tick(). File reads, network calls, and database queries belong in init() or in an .async_io() node.

// BAD: let data = std::fs::read_to_string("file.txt")?;
// GOOD: Read in init(), use cached data in tick()

Rule five: Use dots not slashes in topic names. Slashes conflict with shared memory paths and fail on macOS.

// CORRECT: Topic::new("sensors.lidar")
// WRONG:   Topic::new("sensors/lidar")

Common Patterns

Pattern: Publisher

use horus::prelude::*;

struct TempSensor {
    pub_topic: Topic<f32>,
}

impl TempSensor {
    fn new() -> Result<Self> {
        Ok(Self { pub_topic: Topic::new("sensor.temperature")? })
    }
}

impl Node for TempSensor {
    fn name(&self) -> &str { "TempSensor" }
    fn tick(&mut self) {
        self.pub_topic.send(self.read_temperature());
    }
}

Pattern: Subscriber

use horus::prelude::*;

struct Logger {
    sub: Topic<f32>,
}

impl Logger {
    fn new() -> Result<Self> {
        Ok(Self { sub: Topic::new("sensor.temperature")? })
    }
}

impl Node for Logger {
    fn name(&self) -> &str { "Logger" }
    fn tick(&mut self) {
        if let Some(temp) = self.sub.recv() {
            hlog!(info, "Temperature: {:.1}C", temp);
        }
    }
}

Pattern: Pub+Sub Pipeline

use horus::prelude::*;

struct Filter {
    scan_sub: Topic<LaserScan>,
    cmd_pub: Topic<CmdVel>,
}

impl Filter {
    fn new() -> Result<Self> {
        Ok(Self {
            scan_sub: Topic::new("scan")?,
            cmd_pub: Topic::new("cmd_vel")?,
        })
    }
}

impl Node for Filter {
    fn name(&self) -> &str { "Filter" }
    fn tick(&mut self) {
        if let Some(scan) = self.scan_sub.recv() {
            if let Some(min) = scan.min_range() {
                if min < 0.5 {
                    self.cmd_pub.send(CmdVel::zero());
                } else {
                    self.cmd_pub.send(CmdVel::new(1.0, 0.0));
                }
            }
        }
    }
}

Pattern: Multi-Rate System

use horus::prelude::*;

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new()
        .tick_rate(1000.hz())
        .watchdog(500.ms());

    scheduler.add(ImuSensor::new()?).order(0).rate(100.hz()).build()?;
    scheduler.add(PidController::new()?).order(10).rate(50.hz())
        .budget(500.us()).on_miss(Miss::Skip).build()?;
    scheduler.add(PathPlanner::new()?).order(50).rate(10.hz())
        .compute().build()?;
    scheduler.add(TelemetryLogger::new()?).order(200).rate(1.hz())
        .async_io().build()?;

    scheduler.run()
}

Pattern: State Machine

Always call recv() unconditionally, outside the state match.

use horus::prelude::*;

struct StateMachine {
    cmd_sub: Topic<CmdVel>,
    motor_pub: Topic<MotorCommand>,
    state: State,
    last_cmd: Option<CmdVel>,
}

enum State { Idle, Moving, Stopping }

impl Node for StateMachine {
    fn name(&self) -> &str { "StateMachine" }

    fn tick(&mut self) {
        // ALWAYS recv first, regardless of state
        if let Some(cmd) = self.cmd_sub.recv() {
            self.last_cmd = Some(cmd);
        }

        match self.state {
            State::Idle => {
                if let Some(ref cmd) = self.last_cmd {
                    if cmd.linear.abs() > 0.01 {
                        self.state = State::Moving;
                    }
                }
            }
            State::Moving => {
                if let Some(ref cmd) = self.last_cmd {
                    self.motor_pub.send(MotorCommand::from_cmd_vel(cmd));
                    if cmd.linear.abs() < 0.01 {
                        self.state = State::Stopping;
                    }
                }
            }
            State::Stopping => {
                self.motor_pub.send(MotorCommand::stop());
                self.state = State::Idle;
                self.last_cmd = None;
            }
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        self.motor_pub.send(MotorCommand::stop());
        Ok(())
    }
}

Pattern: Aggregator with Caching

Synchronize multiple topics by caching the latest value from each.

use horus::prelude::*;

struct Aggregator {
    imu_sub: Topic<Imu>,
    odom_sub: Topic<Odometry>,
    fused_pub: Topic<Pose2D>,
    last_imu: Option<Imu>,
    last_odom: Option<Odometry>,
}

impl Node for Aggregator {
    fn name(&self) -> &str { "Aggregator" }

    fn tick(&mut self) {
        // Always drain both topics
        if let Some(imu) = self.imu_sub.recv() {
            self.last_imu = Some(imu);
        }
        if let Some(odom) = self.odom_sub.recv() {
            self.last_odom = Some(odom);
        }

        // Fuse when both are available
        if let (Some(ref imu), Some(ref odom)) = (&self.last_imu, &self.last_odom) {
            let fused = self.fuse(imu, odom);
            self.fused_pub.send(fused);
        }
    }
}

Standard Message Types

All types available via use horus::prelude::*;. All fixed-size types support zero-copy shared memory transport.

Geometry

TypeKey Fields
Twistlinear: [f64; 3], angular: [f64; 3], timestamp_ns: u64
CmdVellinear: f32, angular: f32, timestamp_ns: u64
Pose2Dx: f64, y: f64, theta: f64, timestamp_ns: u64
Pose3Dposition: Point3, orientation: Quaternion, timestamp_ns: u64
PoseStampedpose: Pose3D, frame_id: [u8; 32]
PoseWithCovariancepose: Pose3D, covariance: [f64; 36]
TwistWithCovariancetwist: Twist, covariance: [f64; 36]
Point3x: f64, y: f64, z: f64
Vector3x: f64, y: f64, z: f64
Quaternionx: f64, y: f64, z: f64, w: f64
TransformStampedtranslation: [f64; 3], rotation: [f64; 4]
Accel / AccelStampedlinear: [f64; 3], angular: [f64; 3]

Sensors

TypeKey Fields
Imuorientation: [f64; 4], angular_velocity: [f64; 3], linear_acceleration: [f64; 3], covariance arrays
LaserScanranges: [f32; 360], angle_min/max: f32, range_min/max: f32
Odometrypose: Pose2D, twist: Twist, covariance arrays, frame IDs
JointStatenames: [[u8;32];16], positions/velocities/efforts: [f64;16], joint_count: u8
NavSatFixlatitude/longitude/altitude: f64, status: u8, satellites_visible: u16
BatteryStatevoltage/current/charge/capacity: f32, percentage: f32, temperature: f32
RangeSensorrange: f32, sensor_type: u8, field_of_view: f32
Temperaturetemperature: f64, variance: f64
FluidPressurefluid_pressure: f64, variance: f64
Illuminanceilluminance: f64, variance: f64
MagneticFieldmagnetic_field: [f64; 3], covariance: [f64; 9]

Control

TypeKey Fields
MotorCommandmotor_id: u8, mode: u8 (0=vel,1=pos,2=torque,3=voltage), target: f64
ServoCommandservo_id: u8, position: f32 (rad), speed: f32 (0-1)
JointCommandJoint-level position/velocity/torque commands
PidConfigkp/ki/kd: f64, integral_limit: f64, output_limit: f64
DifferentialDriveCommandleft_velocity: f64, right_velocity: f64 (rad/s)
TrajectoryPointposition: [f64;3], velocity: [f64;3], orientation: [f64;4], time_from_start: f64
TypeKey Fields
NavGoaltarget_pose: Pose2D, tolerance_position/angle: f64, timeout_seconds: f64
NavPathwaypoints: [Waypoint; 256], waypoint_count: u16, total_length: f64
OccupancyGridGrid-based occupancy map (variable-size)
CostMapNavigation cost map (variable-size)
PathPlanPlanned path with algorithm metadata

Vision

TypeKey Fields
ImagePool-backed RAII. Image::new(w, h, encoding)?. Zero-copy via shared memory pool
DepthImagePool-backed RAII. F32 or U16 depth data
CompressedImageformat: [u8;8], data: Vec<u8> (variable-size, MessagePack serialized)
CameraInfowidth/height: u32, camera_matrix: [f64;9], distortion_coefficients: [f64;8]
RegionOfInterestx_offset/y_offset/width/height: u32, do_rectify: bool

Perception

TypeKey Fields
Detectionbbox: BoundingBox2D, confidence: f32, class_id: u32
Detection3Dbbox: BoundingBox3D, confidence: f32, velocity_x/y/z: f32
BoundingBox2Dx/y/width/height: f32 (pixels)
BoundingBox3Dcx/cy/cz/length/width/height: f32 (meters), roll/pitch/yaw: f32
PointCloudPool-backed RAII. PointXYZ, PointXYZI, PointXYZRGB formats
Landmark / Landmark3Dx/y(/z): f32, visibility: f32, index: u32
LandmarkArrayUp to N landmarks. Presets: coco_pose(), mediapipe_pose/hand/face()
SegmentationMaskwidth/height: u32, num_classes: u32, mask_type: u32
PlaneDetectioncoefficients: [f64;4], center: Point3, normal: Vector3
TrackedObjectObject tracking with ID and velocity

Force and Impedance

TypeKey Fields
WrenchStampedforce: [f64;3], torque: [f64;3], frame_id
ForceCommandForce/torque command for compliant control
ImpedanceParametersStiffness, damping, inertia for impedance control

Diagnostics

TypeKey Fields
DiagnosticReportcomponent: [u8;32], up to 16 DiagnosticValue entries
DiagnosticStatusStatus level + message
DiagnosticValueTyped key-value: string(), int(), float(), bool()
EmergencyStopEmergency stop command
Heartbeat / NodeHeartbeatPeriodic health signal with tick count and rate
SafetyStatusOverall system safety state
ResourceUsageCPU, memory, disk usage

Input

TypeKey Fields
JoystickInputAxes, buttons, hat switches for teleoperation
KeyboardInputKey events for HID control

Clock

TypeKey Fields
ClockSimulation/wall time. Sources: SOURCE_WALL, SOURCE_SIM, SOURCE_REPLAY
TimeReferenceTime synchronization reference

Python API Quick Reference

Functional Node

import horus

def my_tick(node):
    node.send("temperature", 25.5)

sensor = horus.Node(
    name="temp_sensor",
    pubs="temperature",
    tick=my_tick,
    rate=10
)
horus.run(sensor, duration=30)

Stateful Node (class container)

import horus
from horus import CmdVel

class DriveState:
    def tick(self, node):
        node.send("cmd_vel", CmdVel(linear=0.5, angular=0.0))

    def shutdown(self, node):
        node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))

drive = DriveState()
node = horus.Node(name="drive_node", tick=drive.tick,
                  shutdown=drive.shutdown, pubs=["cmd_vel"],
                  rate=50, order=0)

Run

horus.run(node)

One-Liner

horus.run(sensor_node, controller_node, duration=60)

Topic

from horus import Topic, CmdVel

pub = Topic("cmd_vel", CmdVel)
pub.send(CmdVel(linear=1.0, angular=0.0))

sub = Topic("cmd_vel", CmdVel)
msg = sub.recv()  # Returns None if empty

Functional recv

def process(node):
    if node.has_msg("scan"):
        scan = node.recv("scan")
        all_scans = node.recv_all("scan")

CLI Quick Reference

CommandUsage
horus new <name>Create new project with horus.toml + src/
horus run [files...]Build and run application
horus buildBuild without running
horus test [filter]Run tests
horus checkValidate horus.toml and workspace
horus clean --shmClean stale shared memory regions
horus monitorWeb + TUI monitoring dashboard
horus topic listList active topics
horus topic echo <name>Print messages on a topic
horus node listList running nodes
horus tf treePrint transform frame tree
horus install <pkg>Install package from registry
horus launch <file.yaml>Launch multi-node system from YAML
horus param get <key>Get runtime parameter
horus deploy [target]Deploy to remote robot
horus doctorComprehensive health check
horus fmtFormat code (Rust + Python)
horus lintLint code (clippy + ruff)

horus.toml Format

horus.toml is the single source of truth. Native build files (Cargo.toml, pyproject.toml) are generated into .horus/ automatically.

[package]
name = "my-robot"
version = "0.1.0"
description = "My robot project"
authors = ["Name <email>"]

[dependencies]
# Rust deps (auto-detected as crates.io)
serde = { version = "1.0", source = "crates.io", features = ["derive"] }
nalgebra = "0.32"

# Python deps (specify source = "pypi")
numpy = { version = ">=1.24", source = "pypi" }
torch = { version = ">=2.0", source = "pypi" }

# System deps
libudev = { version = "*", source = "system" }

# Path deps
my_lib = { path = "../my_lib" }

# Git deps
some_crate = { git = "https://github.com/user/repo", branch = "main" }

[dev-dependencies]
criterion = { version = "0.5", source = "crates.io" }
pytest = { version = ">=7.0", source = "pypi" }

[scripts]
sim = "horus sim start --world warehouse"
deploy = "horus deploy pi@robot --release"
test-hw = "horus run tests/hardware_check.rs"

[drivers]
realsense = { version = "0.3" }
dynamixel = { version = "0.2" }

[hooks]
pre_run = ["fmt", "lint"]
post_build = ["test"]

Dependency sources: crates.io (Rust, default), pypi (Python), system, path, git.

Generated files: horus build creates .horus/Cargo.toml and .horus/pyproject.toml. Never edit these directly.


Import Pattern

use horus::prelude::*;
// Provides: Node, Topic, Scheduler, DurationExt, Frequency, Miss,
// all message types, error types, macros, services, actions, etc.
// 165+ types in one import.

Custom Messages

use serde::{Serialize, Deserialize};

#[derive(Clone, Serialize, Deserialize)]
struct MyMessage {
    x: f32,
    y: f32,
    label: String,
}

let topic: Topic<MyMessage> = Topic::new("my.data")?;

Macros

MacroPurpose
message!Define custom message types
service!Define request/response service types
action!Define long-running action types (goal/feedback/result)
node!Define node with automatic topic registration
topics!Compile-time topic name + type descriptors
hlog!(level, ...)Structured node logging
hlog_once!(level, ...)Log once per execution
hlog_every!(n, level, ...)Throttled logging every n calls

Error Types

use horus::prelude::*;
// Error, Result<T>, HorusError
// Variants: CommunicationError, ConfigError, MemoryError, NodeError,
// NotFoundError, ParseError, ResourceError, SerializationError,
// TimeoutError, TransformError, ValidationError
// Helpers: retry_transient(), RetryConfig

Performance Reference

MetricValue
Same-thread topic~3 ns
Same-process 1:1~18 ns
Same-process N:M~36 ns
Cross-process~50-167 ns
Scheduler tick overhead~50-100 ns
Shared memory allocation~100 ns