AI Integration

You need to run AI/ML inference (object detection, path planning, scene understanding) alongside real-time robot control. Here is how to integrate ML models into HORUS nodes without blocking the control loop.

When To Use This

  • Adding object detection, classification, or pose estimation to a robot
  • Running ONNX or PyTorch models alongside real-time control nodes
  • Calling cloud APIs (GPT-4, Claude) for task planning from HORUS
  • Prototyping ML pipelines in Python with HORUS pub/sub

Use AI-Assisted Development instead if you want to use AI coding agents to write HORUS code, not embed ML models in your robot.

Prerequisites

  • A HORUS project (Rust or Python)
  • For Python ML: PyTorch, ONNX Runtime, or your preferred ML library installed
  • For Rust ML: ort or tract-onnx crate added to dependencies
  • Familiarity with Topics (ML nodes communicate via pub/sub)

Overview

HORUS supports AI integration through two main approaches:

Python ML Nodes (Recommended for Prototyping)

  • Use any Python ML library (PyTorch, TensorFlow, ONNX, etc.)
  • Hardware nodes handle camera/sensor capture
  • Pub/sub connects ML pipeline to control nodes
  • 10-100ms typical inference latency

Rust Inference (For Production)

  • ONNX Runtime via ort crate
  • Tract (pure Rust inference engine)
  • 1-50ms typical inference latency

Architecture Pattern

AI pipeline: Sensor → ML inference → Real-time control

The key insight: keep AI inference in dedicated nodes. HORUS topics decouple the fast control loop from slower ML processing, so a slow inference step doesn't block motor commands.


ML Inference

For production deployments where you need maximum performance, integrate ML inference directly in Rust.

ONNX Runtime (ort crate)

The ort crate provides Rust bindings for ONNX Runtime:

Add to your horus.toml:

[dependencies]
ort = "2.0"
ndarray = "0.15"
// simplified
use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::Array;

struct InferenceNode {
    session: Session,
    input_name: String,
}

impl InferenceNode {
    fn new(model_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let session = Session::builder()?
            .with_optimization_level(GraphOptimizationLevel::Level3)?
            .commit_from_file(model_path)?;

        let input_name = session.inputs[0].name.clone();

        Ok(Self { session, input_name })
    }

    fn infer(&self, input: &[f32]) -> Option<Vec<f32>> {
        let input_array = Array::from_shape_vec((1, input.len()), input.to_vec()).ok()?;
        let outputs = self.session.run(
            ort::inputs![&self.input_name => input_array.view()].ok()?
        ).ok()?;
        let output = outputs[0].try_extract_tensor::<f32>().ok()?;
        Some(output.view().iter().copied().collect())
    }
}

Tract (Pure Rust)

Tract runs ONNX models with zero external dependencies:

Add to your horus.toml:

[dependencies]
tract-onnx = "0.21"
// simplified
use tract_onnx::prelude::*;

fn load_model(path: &str) -> TractResult<SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>> {
    tract_onnx::onnx()
        .model_for_path(path)?
        .into_optimized()?
        .into_runnable()
}

fn run_inference(model: &SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>, input: &[f32]) -> Option<Vec<f32>> {
    let input_tensor = tract_ndarray::arr1(input).into_dyn();
    let result = model.run(tvec!(input_tensor.into())).ok()?;
    let output = result[0].to_array_view::<f32>().ok()?;
    Some(output.iter().copied().collect())
}

Model Format Comparison

FormatCrateUse CaseExternal Deps
ONNXortGeneral (PyTorch, TF exports)ONNX Runtime C lib
ONNXtract-onnxPure Rust inferenceNone
TFLitetfliteEdge/mobile modelsTFLite C lib

Cloud API Integration

For complex reasoning tasks (task planning, scene understanding, natural language), call cloud APIs from HORUS nodes.

// simplified
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};

#[derive(Serialize)]
struct ChatRequest {
    model: String,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
}

#[derive(Serialize, Deserialize)]
struct ChatMessage {
    role: String,
    content: String,
}

#[derive(Deserialize)]
struct ChatResponse {
    choices: Vec<ChatChoice>,
}

#[derive(Deserialize)]
struct ChatChoice {
    message: ChatMessage,
}

fn call_llm(client: &Client, api_key: &str, prompt: &str) -> Option<String> {
    let request = ChatRequest {
        model: "gpt-4".to_string(),
        messages: vec![ChatMessage {
            role: "user".to_string(),
            content: prompt.to_string(),
        }],
        max_tokens: 500,
    };

    let response = client
        .post("https://api.openai.com/v1/chat/completions")
        .header("Authorization", format!("Bearer {}", api_key))
        .json(&request)
        .send()
        .ok()?;

    let chat_response: ChatResponse = response.json().ok()?;
    Some(chat_response.choices[0].message.content.clone())
}

Performance Considerations

Latency Budget

Typical robotics control loop at 100Hz (10ms cycle):

Sensor capture:  ~1-16ms  (hardware dependent)
ML inference:    ~5-50ms  (model dependent)
Topic transfer:  ~85ns    (HORUS shared memory)
Control logic:   ~1μs     (HORUS node tick)
Motor command:   ~1ms     (hardware actuator)

ML inference is typically the bottleneck. Strategies to manage this:

Throttle Inference

Process every Nth frame instead of every frame:

// simplified
node! {
    ThrottledMlNode {
        sub { image: Vec<u8> -> "cam.image_raw" }
        pub { detections: Vec<f32> -> "detections" }
        data { frame_count: u32 = 0 }

        tick {
            if let Some(frame) = self.image.recv() {
                self.frame_count += 1;
                if self.frame_count % 5 == 0 {  // Every 5th frame
                    let result = run_inference(&frame);
                    self.detections.send(result);
                }
            }
        }
    }
}

Async Processing

Run ML in a background thread so the control loop isn't blocked:

// simplified
// Use .async_io() execution class for nodes that do heavy work
scheduler.add(ml_node)
    .async_io()  // Runs in its own thread, won't block the scheduler
    .rate(10_u64.hz())
    .build()?;

Use Appropriate Models

TaskCPU ModelGPU ModelCloud API
Object DetectionYOLOv8n (ONNX)YOLOv8xGPT-4 Vision
ClassificationMobileNet (TFLite)EfficientNetCloud Vision
Pose EstimationMediaPipeOpenPose-
Task PlanningPhi-3 MiniLlama 3GPT-4 / Claude
Depth EstimationMiDaS SmallMiDaS Large-

Best Practices

  1. Separate concerns: Keep AI inference in dedicated nodes. Don't mix ML code with control logic.

  2. Handle failures gracefully: AI inference can fail. Always have a safe fallback:

    def control_tick(node):
        if node.has_msg("detections"):
            detections = node.recv("detections")
            react_to(detections)
        else:
            # Safe default when no detections available
            node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
    
  3. Monitor performance: Use horus monitor to watch node timing and message flow:

    horus monitor  # See which nodes are slow
    
  4. Start with Python: Prototype in Python first, then move performance-critical inference to Rust if needed.

  5. Cache results: For cloud APIs, cache common responses to reduce latency and cost.

Common Errors

SymptomCauseFix
Control loop stutters when ML node runsML inference blocking the schedulerRun ML node at a lower rate (rate=10) or use .compute() / .async_io() execution class
CUDA out of memoryModel too large for GPUUse a smaller model variant (YOLOv8n instead of YOLOv8x) or offload to CPU
Python ML node not receiving messagesTopic name mismatch or wrong subscriptionVerify topic names match exactly (use dots, e.g., cam.image_raw)
Cloud API timeoutNetwork latency or API rate limitAdd retry logic, use .async_io() for network nodes, cache common responses
ONNX model fails to loadWrong ONNX opset version or missing operatorsCheck model compatibility with your ort or tract version

See Also