AI Integration

HORUS's sub-microsecond IPC makes it well-suited for combining real-time control with AI inference. This guide covers patterns for integrating ML models into HORUS applications.

Overview

HORUS supports AI integration through two main approaches:

Python ML Nodes (Recommended for Prototyping)

  • Use any Python ML library (PyTorch, TensorFlow, ONNX, etc.)
  • Hardware nodes handle camera/sensor capture
  • Pub/sub connects ML pipeline to control nodes
  • 10-100ms typical inference latency

Rust Inference (For Production)

  • ONNX Runtime via ort crate
  • Tract (pure Rust inference engine)
  • 1-50ms typical inference latency

Architecture Pattern

Loading diagram...
AI pipeline: Sensor → ML inference → Real-time control

The key insight: keep AI inference in dedicated nodes. HORUS topics decouple the fast control loop from slower ML processing, so a slow inference step doesn't block motor commands.


Python ML Integration

The fastest way to add AI to a HORUS application is through Python nodes. Python has the richest ML ecosystem, and HORUS's Python bindings give you full access to the pub/sub system.

Camera + ML Pipeline

from horus import Node, Scheduler
import numpy as np

# Simulated camera node (replace with your camera capture logic)
def camera_tick(node):
    # In a real robot, capture from camera hardware here
    frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    node.send("cam.image_raw", frame.tolist())

cam = Node(name="camera", pubs=["cam.image_raw"], tick=camera_tick, rate=30)

# ML processing node
def ml_tick(node):
    if node.has_msg("cam.image_raw"):
        frame = node.get("cam.image_raw")
        # Run your ML model here
        # e.g., model.predict(frame), torch inference, etc.
        result = process_frame(frame)
        node.send("detections", result)

ml_node = Node(
    name="ml_processor",
    subs=["cam.image_raw"],
    pubs=["detections"],
    tick=ml_tick,
    rate=10  # Process at 10 FPS
)

# Control node reacts to detections
def control_tick(node):
    if node.has_msg("detections"):
        detections = node.get("detections")
        # React to ML output
        if detections.get("obstacle_detected"):
            node.send("cmd_vel", {"linear": 0.0, "angular": 0.5})
        else:
            node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})

controller = Node(
    name="controller",
    subs=["detections"],
    pubs=["cmd_vel"],
    tick=control_tick,
    rate=30
)

scheduler = Scheduler()
scheduler.add(cam, order=0)
scheduler.add(ml_node, order=1)
scheduler.add(controller, order=2)
scheduler.run()

PyTorch Example

from horus import Node, run
import torch

# Load model once at startup
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()

def detect_tick(node):
    if node.has_msg("cam.image_raw"):
        frame = node.get("cam.image_raw")
        results = model(frame)
        detections = results.pandas().xyxy[0].to_dict('records')
        node.send("detections", detections)

node = Node(
    name="yolo_detector",
    subs=["cam.image_raw"],
    pubs=["detections"],
    tick=detect_tick,
    rate=10
)

run(node)

ONNX Runtime (Python)

from horus import Node, run
import onnxruntime as ort
import numpy as np

session = ort.InferenceSession("model.onnx")
input_name = session.get_inputs()[0].name

def inference_tick(node):
    if node.has_msg("cam.image_raw"):
        frame = node.get("cam.image_raw")
        # Preprocess
        input_data = np.array(frame).astype(np.float32)
        input_data = np.expand_dims(input_data, axis=0)
        # Run inference
        outputs = session.run(None, {input_name: input_data})
        node.send("ml.output", outputs[0].tolist())

node = Node(
    name="onnx_inference",
    subs=["cam.image_raw"],
    pubs=["ml.output"],
    tick=inference_tick,
    rate=15
)

run(node)

ML Utilities

HORUS provides Python ML utilities for common patterns:

from horus.ml_utils import ONNXInferenceNode, PerformanceMonitor

# Pre-built ONNX inference node
class MyDetector(ONNXInferenceNode):
    def __init__(self):
        super().__init__(
            model_path="models/detector.onnx",
            input_topic="cam.image_raw",
            output_topic="detections"
        )

    def preprocess(self, frame):
        # Resize, normalize, etc.
        return processed_frame

    def postprocess(self, output):
        # Parse model output into detections
        return detections

# Performance monitoring
monitor = PerformanceMonitor(window_size=100)
monitor.record(12.5)  # Record inference time in ms
stats = monitor.get_stats()
print(f"Avg: {stats['avg_latency_ms']:.1f}ms, P95: {stats['p95_latency_ms']:.1f}ms, FPS: {stats['fps']:.0f}")

See ML Utilities for the full API.


Rust Inference

For production deployments where you need maximum performance, integrate ML inference directly in Rust.

ONNX Runtime (ort crate)

The ort crate provides Rust bindings for ONNX Runtime:

Add to your Cargo.toml:

[dependencies]
horus = { path = "..." }
horus_library = { path = "..." }
ort = "2.0"
ndarray = "0.15"
use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::Array;

struct InferenceNode {
    session: Session,
    input_name: String,
}

impl InferenceNode {
    fn new(model_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let session = Session::builder()?
            .with_optimization_level(GraphOptimizationLevel::Level3)?
            .commit_from_file(model_path)?;

        let input_name = session.inputs[0].name.clone();

        Ok(Self { session, input_name })
    }

    fn infer(&self, input: &[f32]) -> Option<Vec<f32>> {
        let input_array = Array::from_shape_vec((1, input.len()), input.to_vec()).ok()?;
        let outputs = self.session.run(
            ort::inputs![&self.input_name => input_array.view()].ok()?
        ).ok()?;
        let output = outputs[0].try_extract_tensor::<f32>().ok()?;
        Some(output.view().iter().copied().collect())
    }
}

Tract (Pure Rust)

Tract runs ONNX models with zero external dependencies:

Add to your Cargo.toml:

[dependencies]
horus = { path = "..." }
horus_library = { path = "..." }
tract-onnx = "0.21"
use tract_onnx::prelude::*;

fn load_model(path: &str) -> TractResult<SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>> {
    tract_onnx::onnx()
        .model_for_path(path)?
        .into_optimized()?
        .into_runnable()
}

fn run_inference(model: &SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>, input: &[f32]) -> Option<Vec<f32>> {
    let input_tensor = tract_ndarray::arr1(input).into_dyn();
    let result = model.run(tvec!(input_tensor.into())).ok()?;
    let output = result[0].to_array_view::<f32>().ok()?;
    Some(output.iter().copied().collect())
}

Model Format Comparison

FormatCrateUse CaseExternal Deps
ONNXortGeneral (PyTorch, TF exports)ONNX Runtime C lib
ONNXtract-onnxPure Rust inferenceNone
TFLitetfliteEdge/mobile modelsTFLite C lib

Cloud API Integration

For complex reasoning tasks (task planning, scene understanding, natural language), call cloud APIs from HORUS nodes.

from horus import Node, run
import requests
import os

API_KEY = os.environ["OPENAI_API_KEY"]

def planner_tick(node):
    if node.has_msg("user.goal"):
        goal = node.get("user.goal")
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "model": "gpt-4",
                "messages": [
                    {"role": "system", "content": "Generate robot action plans as JSON."},
                    {"role": "user", "content": goal}
                ],
                "max_tokens": 500
            }
        )
        plan = response.json()["choices"][0]["message"]["content"]
        node.send("robot.plan", plan)

node = Node(
    name="planner",
    subs=["user.goal"],
    pubs=["robot.plan"],
    tick=planner_tick,
    rate=1  # Check for goals once per second
)

run(node)

Rust (reqwest)

use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};

#[derive(Serialize)]
struct ChatRequest {
    model: String,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
}

#[derive(Serialize, Deserialize)]
struct ChatMessage {
    role: String,
    content: String,
}

#[derive(Deserialize)]
struct ChatResponse {
    choices: Vec<ChatChoice>,
}

#[derive(Deserialize)]
struct ChatChoice {
    message: ChatMessage,
}

fn call_llm(client: &Client, api_key: &str, prompt: &str) -> Option<String> {
    let request = ChatRequest {
        model: "gpt-4".to_string(),
        messages: vec![ChatMessage {
            role: "user".to_string(),
            content: prompt.to_string(),
        }],
        max_tokens: 500,
    };

    let response = client
        .post("https://api.openai.com/v1/chat/completions")
        .header("Authorization", format!("Bearer {}", api_key))
        .json(&request)
        .send()
        .ok()?;

    let chat_response: ChatResponse = response.json().ok()?;
    Some(chat_response.choices[0].message.content.clone())
}

Performance Considerations

Latency Budget

Typical robotics control loop at 100Hz (10ms cycle):

Sensor capture:  ~1-16ms  (hardware dependent)
ML inference:    ~5-50ms  (model dependent)
Topic transfer:  ~85ns    (HORUS shared memory)
Control logic:   ~1μs     (HORUS node tick)
Motor command:   ~1ms     (hardware actuator)

ML inference is typically the bottleneck. Strategies to manage this:

Throttle Inference

Process every Nth frame instead of every frame:

frame_count = 0

def ml_tick(node):
    global frame_count
    if node.has_msg("cam.image_raw"):
        frame_count += 1
        if frame_count % 5 == 0:  # Every 5th frame
            frame = node.get("cam.image_raw")
            result = model.predict(frame)
            node.send("detections", result)

Async Processing

Run ML in a background thread so the control loop isn't blocked:

from horus import AsyncNode, AsyncTopic
import asyncio

class AsyncMLNode(AsyncNode):
    async def async_init(self):
        self.input = AsyncTopic(dict)
        self.output = AsyncTopic(dict)

    async def async_tick(self):
        frame = await self.input.try_recv()
        if frame:
            # Run in thread pool to avoid blocking
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(None, model.predict, frame)
            await self.output.send(result)

Use Appropriate Models

TaskCPU ModelGPU ModelCloud API
Object DetectionYOLOv8n (ONNX)YOLOv8xGPT-4 Vision
ClassificationMobileNet (TFLite)EfficientNetCloud Vision
Pose EstimationMediaPipeOpenPose-
Task PlanningPhi-3 MiniLlama 3GPT-4 / Claude
Depth EstimationMiDaS SmallMiDaS Large-

Best Practices

  1. Separate concerns: Keep AI inference in dedicated nodes. Don't mix ML code with control logic.

  2. Handle failures gracefully: AI inference can fail. Always have a safe fallback:

    def control_tick(node):
        if node.has_msg("detections"):
            detections = node.get("detections")
            react_to(detections)
        else:
            # Safe default when no detections available
            node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
    
  3. Monitor performance: Use horus monitor to watch node timing and message flow:

    horus monitor  # See which nodes are slow
    
  4. Start with Python: Prototype in Python first, then move performance-critical inference to Rust if needed.

  5. Cache results: For cloud APIs, cache common responses to reduce latency and cost.


See Also