Python ML Utilities
HORUS provides Python ML utility classes in horus.ml_utils for easy integration with popular machine learning frameworks. These helper classes handle model loading, preprocessing, inference, and performance tracking.
Overview
The ml_utils module provides:
| Class | Framework | Use Case |
|---|---|---|
MLNodeBase | Any | Base class for ML inference nodes |
PyTorchInferenceNode | PyTorch | TorchScript and checkpoint models |
TensorFlowInferenceNode | TensorFlow/Keras | SavedModel and .h5 files |
ONNXInferenceNode | ONNX Runtime | Cross-platform ONNX models |
PerformanceMonitor | Any | Track inference latency and throughput |
Base Class: MLNodeBase
All ML inference nodes inherit from MLNodeBase:
from horus.ml_utils import MLNodeBase
class MLNodeBase:
def __init__(self, model_path: str, input_topic: str, output_topic: str):
"""
Args:
model_path: Path to the model file
input_topic: Topic to subscribe to for inputs
output_topic: Topic to publish predictions to
"""
...
def load_model(self):
"""Override to load your model"""
raise NotImplementedError()
def preprocess(self, data):
"""Override to preprocess input data"""
return data
def infer(self, data):
"""Override to run inference"""
raise NotImplementedError()
def postprocess(self, output):
"""Override to postprocess model output"""
return output
def run_inference(self, data) -> tuple[Any, float]:
"""Run full pipeline, returns (result, latency_ms)"""
...
def get_stats(self) -> dict:
"""Get inference statistics"""
...
PyTorch Integration
PyTorchInferenceNode
from horus import Node, Topic
from horus.ml_utils import PyTorchInferenceNode
class ObjectDetector(PyTorchInferenceNode):
def __init__(self):
super().__init__(
model_path="models/yolov8n.pt",
input_topic="camera.raw",
output_topic="detections",
device="cuda:0" # or "cpu"
)
def load_model(self):
import torch
self.model = torch.jit.load(self.model_path, map_location=self.device)
self.model.eval()
def preprocess(self, image_data):
import torch
# Convert numpy to tensor, normalize
tensor = torch.from_numpy(image_data).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0) # HWC -> NCHW
return tensor.to(self.device)
def infer(self, tensor):
import torch
with torch.no_grad():
return self.model(tensor)
def postprocess(self, output):
# Convert detections to list
return output[0].cpu().numpy().tolist()
Usage with HORUS Node
from horus import Node, Scheduler
from horus.ml_utils import PyTorchInferenceNode
def vision_tick(node):
if node.has_msg("camera.raw"):
frame = node.get("camera.raw")
result, latency_ms = detector.run_inference(frame)
node.send("detections", result)
print(f"Inference latency: {latency_ms:.2f}ms")
detector = ObjectDetector()
detector.load_model()
vision_node = Node(
name="vision",
subs=["camera.raw"],
pubs=["detections"],
tick=vision_tick,
rate=30,
)
# Run
scheduler = Scheduler()
scheduler.add(vision_node, order=0)
scheduler.run()
TensorFlow Integration
TensorFlowInferenceNode
from horus.ml_utils import TensorFlowInferenceNode
class ImageClassifier(TensorFlowInferenceNode):
def __init__(self):
super().__init__(
model_path="models/mobilenet_v2",
input_topic="camera.raw",
output_topic="classification",
use_gpu=True
)
def load_model(self):
import tensorflow as tf
# Handles both SavedModel and .h5 formats
if self.model_path.endswith(".h5"):
self.model = tf.keras.models.load_model(self.model_path)
else:
self.model = tf.saved_model.load(self.model_path)
def preprocess(self, image_data):
import tensorflow as tf
img = tf.image.resize(image_data, [224, 224])
img = img / 255.0
return tf.expand_dims(img, 0)
ONNX Integration
ONNXInferenceNode
ONNX Runtime provides cross-platform inference for models exported from PyTorch, TensorFlow, and other frameworks.
from horus.ml_utils import ONNXInferenceNode
import numpy as np
class YOLODetector(ONNXInferenceNode):
def __init__(self):
super().__init__(
model_path="models/yolov8n.onnx",
input_topic="camera.raw",
output_topic="detections",
use_gpu=True # Uses CUDAExecutionProvider if available
)
def load_model(self):
import onnxruntime as ort
providers = (
["CUDAExecutionProvider", "CPUExecutionProvider"]
if self.use_gpu else ["CPUExecutionProvider"]
)
self.session = ort.InferenceSession(self.model_path, providers=providers)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
def preprocess(self, image_data):
# Resize to 640x640, normalize, NCHW format
img = np.array(image_data).astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1)) # HWC -> CHW
return np.expand_dims(img, 0)
def infer(self, data):
outputs = self.session.run(self.output_names, {self.input_name: data})
return outputs[0] if len(outputs) == 1 else outputs
Preprocessing Utilities
The module includes common preprocessing functions:
ImageNet Preprocessing
from horus.ml_utils import preprocess_image_imagenet
# Standard ImageNet preprocessing for classification models
preprocessed = preprocess_image_imagenet(
image,
target_size=(224, 224),
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
# Returns (1, C, H, W) normalized tensor
YOLO Preprocessing
from horus.ml_utils import preprocess_image_yolo
# Letterbox preprocessing for YOLO models
preprocessed, scale, pad_w, pad_h = preprocess_image_yolo(
image,
target_size=640
)
# Returns preprocessed image and transform params for postprocessing
Non-Maximum Suppression
from horus.ml_utils import nms
# Filter overlapping detections
keep_indices = nms(
boxes, # (N, 4) as [x, y, w, h]
scores, # (N,) confidence scores
iou_threshold=0.45
)
Performance Monitoring
Track inference performance in real-time:
from horus.ml_utils import PerformanceMonitor
monitor = PerformanceMonitor(window_size=100)
# In your inference loop
for frame in frames:
result, latency_ms = model.run_inference(frame)
monitor.record(latency_ms)
# Get statistics
stats = monitor.get_stats()
print(f"Avg latency: {stats['avg_latency_ms']:.2f}ms")
print(f"P95 latency: {stats['p95_latency_ms']:.2f}ms")
print(f"FPS: {stats['fps']:.1f}")
# Or print summary
monitor.print_stats()
Available Statistics
| Metric | Description |
|---|---|
avg_latency_ms | Average inference time |
min_latency_ms | Fastest inference |
max_latency_ms | Slowest inference |
std_latency_ms | Standard deviation |
p50_latency_ms | Median latency |
p95_latency_ms | 95th percentile |
p99_latency_ms | 99th percentile |
fps | Frames per second |
Zero-Copy Tensor Integration
Note:
TensorPoolandTensorHandleare not directly exposed in Python. They are used internally by theImage,PointCloud, andDepthImagedomain types. Use these domain types for zero-copy data sharing.
Combine ML utilities with HORUS domain types for zero-copy data sharing:
import horus
import numpy as np
# Image supports zero-copy numpy access
img = horus.Image(height=480, width=640, encoding="rgb8")
# Zero-copy to numpy
np_array = img.numpy() # No copy!
# Device info
print(f"Image is CPU-backed: {img.is_cpu()}")
Complete Example
from horus import Node, Scheduler
from horus.ml_utils import PyTorchInferenceNode, PerformanceMonitor
import numpy as np
def camera_tick(node):
"""Simulate camera frame"""
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
node.send("camera.raw", frame)
detector = ObjectDetector()
detector.load_model()
monitor = PerformanceMonitor()
def detector_tick(node):
if node.has_msg("camera.raw"):
frame = node.get("camera.raw")
result, latency = detector.run_inference(frame)
monitor.record(latency)
node.send("detections", result)
# Print stats every 100 frames
if monitor.get_stats()["samples"] % 100 == 0:
monitor.print_stats()
def main():
camera = Node(name="camera", pubs=["camera.raw"], tick=camera_tick, rate=30)
detect = Node(name="detector", subs=["camera.raw"], pubs=["detections"], tick=detector_tick, rate=30)
scheduler = Scheduler()
scheduler.add(camera, order=0)
scheduler.add(detect, order=1)
scheduler.run()
if __name__ == "__main__":
main()
See Also
- Python Bindings - Core Python API
- AI Integration - Rust-native ML inference