AI Integration
HORUS's sub-microsecond IPC makes it well-suited for combining real-time control with AI inference. This guide covers patterns for integrating ML models into HORUS applications.
Overview
HORUS supports AI integration through two main approaches:
Python ML Nodes (Recommended for Prototyping)
- Use any Python ML library (PyTorch, TensorFlow, ONNX, etc.)
- Hardware nodes handle camera/sensor capture
- Pub/sub connects ML pipeline to control nodes
- 10-100ms typical inference latency
Rust Inference (For Production)
- ONNX Runtime via
ortcrate - Tract (pure Rust inference engine)
- 1-50ms typical inference latency
Architecture Pattern
The key insight: keep AI inference in dedicated nodes. HORUS topics decouple the fast control loop from slower ML processing, so a slow inference step doesn't block motor commands.
Python ML Integration
The fastest way to add AI to a HORUS application is through Python nodes. Python has the richest ML ecosystem, and HORUS's Python bindings give you full access to the pub/sub system.
Camera + ML Pipeline
from horus import Node, Scheduler
import numpy as np
# Simulated camera node (replace with your camera capture logic)
def camera_tick(node):
# In a real robot, capture from camera hardware here
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
node.send("cam.image_raw", frame.tolist())
cam = Node(name="camera", pubs=["cam.image_raw"], tick=camera_tick, rate=30)
# ML processing node
def ml_tick(node):
if node.has_msg("cam.image_raw"):
frame = node.get("cam.image_raw")
# Run your ML model here
# e.g., model.predict(frame), torch inference, etc.
result = process_frame(frame)
node.send("detections", result)
ml_node = Node(
name="ml_processor",
subs=["cam.image_raw"],
pubs=["detections"],
tick=ml_tick,
rate=10 # Process at 10 FPS
)
# Control node reacts to detections
def control_tick(node):
if node.has_msg("detections"):
detections = node.get("detections")
# React to ML output
if detections.get("obstacle_detected"):
node.send("cmd_vel", {"linear": 0.0, "angular": 0.5})
else:
node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})
controller = Node(
name="controller",
subs=["detections"],
pubs=["cmd_vel"],
tick=control_tick,
rate=30
)
scheduler = Scheduler()
scheduler.add(cam, order=0)
scheduler.add(ml_node, order=1)
scheduler.add(controller, order=2)
scheduler.run()
PyTorch Example
from horus import Node, run
import torch
# Load model once at startup
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()
def detect_tick(node):
if node.has_msg("cam.image_raw"):
frame = node.get("cam.image_raw")
results = model(frame)
detections = results.pandas().xyxy[0].to_dict('records')
node.send("detections", detections)
node = Node(
name="yolo_detector",
subs=["cam.image_raw"],
pubs=["detections"],
tick=detect_tick,
rate=10
)
run(node)
ONNX Runtime (Python)
from horus import Node, run
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession("model.onnx")
input_name = session.get_inputs()[0].name
def inference_tick(node):
if node.has_msg("cam.image_raw"):
frame = node.get("cam.image_raw")
# Preprocess
input_data = np.array(frame).astype(np.float32)
input_data = np.expand_dims(input_data, axis=0)
# Run inference
outputs = session.run(None, {input_name: input_data})
node.send("ml.output", outputs[0].tolist())
node = Node(
name="onnx_inference",
subs=["cam.image_raw"],
pubs=["ml.output"],
tick=inference_tick,
rate=15
)
run(node)
ML Utilities
HORUS provides Python ML utilities for common patterns:
from horus.ml_utils import ONNXInferenceNode, PerformanceMonitor
# Pre-built ONNX inference node
class MyDetector(ONNXInferenceNode):
def __init__(self):
super().__init__(
model_path="models/detector.onnx",
input_topic="cam.image_raw",
output_topic="detections"
)
def preprocess(self, frame):
# Resize, normalize, etc.
return processed_frame
def postprocess(self, output):
# Parse model output into detections
return detections
# Performance monitoring
monitor = PerformanceMonitor(window_size=100)
monitor.record(12.5) # Record inference time in ms
stats = monitor.get_stats()
print(f"Avg: {stats['avg_latency_ms']:.1f}ms, P95: {stats['p95_latency_ms']:.1f}ms, FPS: {stats['fps']:.0f}")
See ML Utilities for the full API.
Rust Inference
For production deployments where you need maximum performance, integrate ML inference directly in Rust.
ONNX Runtime (ort crate)
The ort crate provides Rust bindings for ONNX Runtime:
Add to your Cargo.toml:
[dependencies]
horus = { path = "..." }
horus_library = { path = "..." }
ort = "2.0"
ndarray = "0.15"
use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::Array;
struct InferenceNode {
session: Session,
input_name: String,
}
impl InferenceNode {
fn new(model_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let session = Session::builder()?
.with_optimization_level(GraphOptimizationLevel::Level3)?
.commit_from_file(model_path)?;
let input_name = session.inputs[0].name.clone();
Ok(Self { session, input_name })
}
fn infer(&self, input: &[f32]) -> Option<Vec<f32>> {
let input_array = Array::from_shape_vec((1, input.len()), input.to_vec()).ok()?;
let outputs = self.session.run(
ort::inputs![&self.input_name => input_array.view()].ok()?
).ok()?;
let output = outputs[0].try_extract_tensor::<f32>().ok()?;
Some(output.view().iter().copied().collect())
}
}
Tract (Pure Rust)
Tract runs ONNX models with zero external dependencies:
Add to your Cargo.toml:
[dependencies]
horus = { path = "..." }
horus_library = { path = "..." }
tract-onnx = "0.21"
use tract_onnx::prelude::*;
fn load_model(path: &str) -> TractResult<SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>> {
tract_onnx::onnx()
.model_for_path(path)?
.into_optimized()?
.into_runnable()
}
fn run_inference(model: &SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>, input: &[f32]) -> Option<Vec<f32>> {
let input_tensor = tract_ndarray::arr1(input).into_dyn();
let result = model.run(tvec!(input_tensor.into())).ok()?;
let output = result[0].to_array_view::<f32>().ok()?;
Some(output.iter().copied().collect())
}
Model Format Comparison
| Format | Crate | Use Case | External Deps |
|---|---|---|---|
| ONNX | ort | General (PyTorch, TF exports) | ONNX Runtime C lib |
| ONNX | tract-onnx | Pure Rust inference | None |
| TFLite | tflite | Edge/mobile models | TFLite C lib |
Cloud API Integration
For complex reasoning tasks (task planning, scene understanding, natural language), call cloud APIs from HORUS nodes.
Python (Recommended)
from horus import Node, run
import requests
import os
API_KEY = os.environ["OPENAI_API_KEY"]
def planner_tick(node):
if node.has_msg("user.goal"):
goal = node.get("user.goal")
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": "gpt-4",
"messages": [
{"role": "system", "content": "Generate robot action plans as JSON."},
{"role": "user", "content": goal}
],
"max_tokens": 500
}
)
plan = response.json()["choices"][0]["message"]["content"]
node.send("robot.plan", plan)
node = Node(
name="planner",
subs=["user.goal"],
pubs=["robot.plan"],
tick=planner_tick,
rate=1 # Check for goals once per second
)
run(node)
Rust (reqwest)
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
max_tokens: u32,
}
#[derive(Serialize, Deserialize)]
struct ChatMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct ChatResponse {
choices: Vec<ChatChoice>,
}
#[derive(Deserialize)]
struct ChatChoice {
message: ChatMessage,
}
fn call_llm(client: &Client, api_key: &str, prompt: &str) -> Option<String> {
let request = ChatRequest {
model: "gpt-4".to_string(),
messages: vec![ChatMessage {
role: "user".to_string(),
content: prompt.to_string(),
}],
max_tokens: 500,
};
let response = client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", format!("Bearer {}", api_key))
.json(&request)
.send()
.ok()?;
let chat_response: ChatResponse = response.json().ok()?;
Some(chat_response.choices[0].message.content.clone())
}
Performance Considerations
Latency Budget
Typical robotics control loop at 100Hz (10ms cycle):
Sensor capture: ~1-16ms (hardware dependent)
ML inference: ~5-50ms (model dependent)
Topic transfer: ~85ns (HORUS shared memory)
Control logic: ~1μs (HORUS node tick)
Motor command: ~1ms (hardware actuator)
ML inference is typically the bottleneck. Strategies to manage this:
Throttle Inference
Process every Nth frame instead of every frame:
frame_count = 0
def ml_tick(node):
global frame_count
if node.has_msg("cam.image_raw"):
frame_count += 1
if frame_count % 5 == 0: # Every 5th frame
frame = node.get("cam.image_raw")
result = model.predict(frame)
node.send("detections", result)
Async Processing
Run ML in a background thread so the control loop isn't blocked:
from horus import AsyncNode, AsyncTopic
import asyncio
class AsyncMLNode(AsyncNode):
async def async_init(self):
self.input = AsyncTopic(dict)
self.output = AsyncTopic(dict)
async def async_tick(self):
frame = await self.input.try_recv()
if frame:
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, model.predict, frame)
await self.output.send(result)
Use Appropriate Models
| Task | CPU Model | GPU Model | Cloud API |
|---|---|---|---|
| Object Detection | YOLOv8n (ONNX) | YOLOv8x | GPT-4 Vision |
| Classification | MobileNet (TFLite) | EfficientNet | Cloud Vision |
| Pose Estimation | MediaPipe | OpenPose | - |
| Task Planning | Phi-3 Mini | Llama 3 | GPT-4 / Claude |
| Depth Estimation | MiDaS Small | MiDaS Large | - |
Best Practices
-
Separate concerns: Keep AI inference in dedicated nodes. Don't mix ML code with control logic.
-
Handle failures gracefully: AI inference can fail. Always have a safe fallback:
def control_tick(node): if node.has_msg("detections"): detections = node.get("detections") react_to(detections) else: # Safe default when no detections available node.send("cmd_vel", {"linear": 0.0, "angular": 0.0}) -
Monitor performance: Use
horus monitorto watch node timing and message flow:horus monitor # See which nodes are slow -
Start with Python: Prototype in Python first, then move performance-critical inference to Rust if needed.
-
Cache results: For cloud APIs, cache common responses to reduce latency and cost.
See Also
- Python Examples - Complete example applications
- Message Library - Available message types
- Python Bindings - Core Python API