AI Integration
You need to run AI/ML inference (object detection, path planning, scene understanding) alongside real-time robot control. Here is how to integrate ML models into HORUS nodes without blocking the control loop.
When To Use This
- Adding object detection, classification, or pose estimation to a robot
- Running ONNX or PyTorch models alongside real-time control nodes
- Calling cloud APIs (GPT-4, Claude) for task planning from HORUS
- Prototyping ML pipelines in Python with HORUS pub/sub
Use AI-Assisted Development instead if you want to use AI coding agents to write HORUS code, not embed ML models in your robot.
Prerequisites
- A HORUS project (Rust or Python)
- For Python ML: PyTorch, ONNX Runtime, or your preferred ML library installed
- For Rust ML:
ortortract-onnxcrate added to dependencies - Familiarity with Topics (ML nodes communicate via pub/sub)
Overview
HORUS supports AI integration through two main approaches:
Python ML Nodes (Recommended for Prototyping)
- Use any Python ML library (PyTorch, TensorFlow, ONNX, etc.)
- Hardware nodes handle camera/sensor capture
- Pub/sub connects ML pipeline to control nodes
- 10-100ms typical inference latency
Rust Inference (For Production)
- ONNX Runtime via
ortcrate - Tract (pure Rust inference engine)
- 1-50ms typical inference latency
Architecture Pattern
The key insight: keep AI inference in dedicated nodes. HORUS topics decouple the fast control loop from slower ML processing, so a slow inference step doesn't block motor commands.
ML Inference
For production deployments where you need maximum performance, integrate ML inference directly in Rust.
ONNX Runtime (ort crate)
The ort crate provides Rust bindings for ONNX Runtime:
Add to your horus.toml:
[dependencies]
ort = "2.0"
ndarray = "0.15"
// simplified
use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::Array;
struct InferenceNode {
session: Session,
input_name: String,
}
impl InferenceNode {
fn new(model_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let session = Session::builder()?
.with_optimization_level(GraphOptimizationLevel::Level3)?
.commit_from_file(model_path)?;
let input_name = session.inputs[0].name.clone();
Ok(Self { session, input_name })
}
fn infer(&self, input: &[f32]) -> Option<Vec<f32>> {
let input_array = Array::from_shape_vec((1, input.len()), input.to_vec()).ok()?;
let outputs = self.session.run(
ort::inputs![&self.input_name => input_array.view()].ok()?
).ok()?;
let output = outputs[0].try_extract_tensor::<f32>().ok()?;
Some(output.view().iter().copied().collect())
}
}
Tract (Pure Rust)
Tract runs ONNX models with zero external dependencies:
Add to your horus.toml:
[dependencies]
tract-onnx = "0.21"
// simplified
use tract_onnx::prelude::*;
fn load_model(path: &str) -> TractResult<SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>> {
tract_onnx::onnx()
.model_for_path(path)?
.into_optimized()?
.into_runnable()
}
fn run_inference(model: &SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>, input: &[f32]) -> Option<Vec<f32>> {
let input_tensor = tract_ndarray::arr1(input).into_dyn();
let result = model.run(tvec!(input_tensor.into())).ok()?;
let output = result[0].to_array_view::<f32>().ok()?;
Some(output.iter().copied().collect())
}
Model Format Comparison
| Format | Crate | Use Case | External Deps |
|---|---|---|---|
| ONNX | ort | General (PyTorch, TF exports) | ONNX Runtime C lib |
| ONNX | tract-onnx | Pure Rust inference | None |
| TFLite | tflite | Edge/mobile models | TFLite C lib |
Cloud API Integration
For complex reasoning tasks (task planning, scene understanding, natural language), call cloud APIs from HORUS nodes.
// simplified
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
max_tokens: u32,
}
#[derive(Serialize, Deserialize)]
struct ChatMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct ChatResponse {
choices: Vec<ChatChoice>,
}
#[derive(Deserialize)]
struct ChatChoice {
message: ChatMessage,
}
fn call_llm(client: &Client, api_key: &str, prompt: &str) -> Option<String> {
let request = ChatRequest {
model: "gpt-4".to_string(),
messages: vec![ChatMessage {
role: "user".to_string(),
content: prompt.to_string(),
}],
max_tokens: 500,
};
let response = client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", format!("Bearer {}", api_key))
.json(&request)
.send()
.ok()?;
let chat_response: ChatResponse = response.json().ok()?;
Some(chat_response.choices[0].message.content.clone())
}
Performance Considerations
Latency Budget
Typical robotics control loop at 100Hz (10ms cycle):
Sensor capture: ~1-16ms (hardware dependent)
ML inference: ~5-50ms (model dependent)
Topic transfer: ~85ns (HORUS shared memory)
Control logic: ~1μs (HORUS node tick)
Motor command: ~1ms (hardware actuator)
ML inference is typically the bottleneck. Strategies to manage this:
Throttle Inference
Process every Nth frame instead of every frame:
// simplified
node! {
ThrottledMlNode {
sub { image: Vec<u8> -> "cam.image_raw" }
pub { detections: Vec<f32> -> "detections" }
data { frame_count: u32 = 0 }
tick {
if let Some(frame) = self.image.recv() {
self.frame_count += 1;
if self.frame_count % 5 == 0 { // Every 5th frame
let result = run_inference(&frame);
self.detections.send(result);
}
}
}
}
}
Async Processing
Run ML in a background thread so the control loop isn't blocked:
// simplified
// Use .async_io() execution class for nodes that do heavy work
scheduler.add(ml_node)
.async_io() // Runs in its own thread, won't block the scheduler
.rate(10_u64.hz())
.build()?;
Use Appropriate Models
| Task | CPU Model | GPU Model | Cloud API |
|---|---|---|---|
| Object Detection | YOLOv8n (ONNX) | YOLOv8x | GPT-4 Vision |
| Classification | MobileNet (TFLite) | EfficientNet | Cloud Vision |
| Pose Estimation | MediaPipe | OpenPose | - |
| Task Planning | Phi-3 Mini | Llama 3 | GPT-4 / Claude |
| Depth Estimation | MiDaS Small | MiDaS Large | - |
Best Practices
-
Separate concerns: Keep AI inference in dedicated nodes. Don't mix ML code with control logic.
-
Handle failures gracefully: AI inference can fail. Always have a safe fallback:
def control_tick(node): if node.has_msg("detections"): detections = node.recv("detections") react_to(detections) else: # Safe default when no detections available node.send("cmd_vel", {"linear": 0.0, "angular": 0.0}) -
Monitor performance: Use
horus monitorto watch node timing and message flow:horus monitor # See which nodes are slow -
Start with Python: Prototype in Python first, then move performance-critical inference to Rust if needed.
-
Cache results: For cloud APIs, cache common responses to reduce latency and cost.
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Control loop stutters when ML node runs | ML inference blocking the scheduler | Run ML node at a lower rate (rate=10) or use .compute() / .async_io() execution class |
| CUDA out of memory | Model too large for GPU | Use a smaller model variant (YOLOv8n instead of YOLOv8x) or offload to CPU |
| Python ML node not receiving messages | Topic name mismatch or wrong subscription | Verify topic names match exactly (use dots, e.g., cam.image_raw) |
| Cloud API timeout | Network latency or API rate limit | Add retry logic, use .async_io() for network nodes, cache common responses |
| ONNX model fails to load | Wrong ONNX opset version or missing operators | Check model compatibility with your ort or tract version |
See Also
- Python Bindings — Core Python API for HORUS nodes
- ML Utilities — Pre-built ONNX inference nodes and performance monitoring
- Execution Classes — Understanding
.compute()and.async_io()for ML workloads - AI-Assisted Development — Using AI agents to write HORUS code
- Telemetry Export — Export ML inference metrics to external dashboards