Python ML Utilities

HORUS provides Python ML utility classes in horus.ml_utils for easy integration with popular machine learning frameworks. These helper classes handle model loading, preprocessing, inference, and performance tracking.

Overview

The ml_utils module provides:

ClassFrameworkUse Case
MLNodeBaseAnyBase class for ML inference nodes
PyTorchInferenceNodePyTorchTorchScript and checkpoint models
TensorFlowInferenceNodeTensorFlow/KerasSavedModel and .h5 files
ONNXInferenceNodeONNX RuntimeCross-platform ONNX models
PerformanceMonitorAnyTrack inference latency and throughput

Base Class: MLNodeBase

All ML inference nodes inherit from MLNodeBase:

from horus.ml_utils import MLNodeBase

class MLNodeBase:
    def __init__(self, model_path: str, input_topic: str, output_topic: str):
        """
        Args:
            model_path: Path to the model file
            input_topic: Topic to subscribe to for inputs
            output_topic: Topic to publish predictions to
        """
        ...

    def load_model(self):
        """Override to load your model"""
        raise NotImplementedError()

    def preprocess(self, data):
        """Override to preprocess input data"""
        return data

    def infer(self, data):
        """Override to run inference"""
        raise NotImplementedError()

    def postprocess(self, output):
        """Override to postprocess model output"""
        return output

    def run_inference(self, data) -> tuple[Any, float]:
        """Run full pipeline, returns (result, latency_ms)"""
        ...

    def get_stats(self) -> dict:
        """Get inference statistics"""
        ...

PyTorch Integration

PyTorchInferenceNode

from horus import Node, Topic
from horus.ml_utils import PyTorchInferenceNode

class ObjectDetector(PyTorchInferenceNode):
    def __init__(self):
        super().__init__(
            model_path="models/yolov8n.pt",
            input_topic="camera.raw",
            output_topic="detections",
            device="cuda:0"  # or "cpu"
        )

    def load_model(self):
        import torch
        self.model = torch.jit.load(self.model_path, map_location=self.device)
        self.model.eval()

    def preprocess(self, image_data):
        import torch
        # Convert numpy to tensor, normalize
        tensor = torch.from_numpy(image_data).float() / 255.0
        tensor = tensor.permute(2, 0, 1).unsqueeze(0)  # HWC -> NCHW
        return tensor.to(self.device)

    def infer(self, tensor):
        import torch
        with torch.no_grad():
            return self.model(tensor)

    def postprocess(self, output):
        # Convert detections to list
        return output[0].cpu().numpy().tolist()

Usage with HORUS Node

from horus import Node, Scheduler
from horus.ml_utils import PyTorchInferenceNode

def vision_tick(node):
    if node.has_msg("camera.raw"):
        frame = node.get("camera.raw")
        result, latency_ms = detector.run_inference(frame)
        node.send("detections", result)
        print(f"Inference latency: {latency_ms:.2f}ms")

detector = ObjectDetector()
detector.load_model()

vision_node = Node(
    name="vision",
    subs=["camera.raw"],
    pubs=["detections"],
    tick=vision_tick,
    rate=30,
)

# Run
scheduler = Scheduler()
scheduler.add(vision_node, order=0)
scheduler.run()

TensorFlow Integration

TensorFlowInferenceNode

from horus.ml_utils import TensorFlowInferenceNode

class ImageClassifier(TensorFlowInferenceNode):
    def __init__(self):
        super().__init__(
            model_path="models/mobilenet_v2",
            input_topic="camera.raw",
            output_topic="classification",
            use_gpu=True
        )

    def load_model(self):
        import tensorflow as tf
        # Handles both SavedModel and .h5 formats
        if self.model_path.endswith(".h5"):
            self.model = tf.keras.models.load_model(self.model_path)
        else:
            self.model = tf.saved_model.load(self.model_path)

    def preprocess(self, image_data):
        import tensorflow as tf
        img = tf.image.resize(image_data, [224, 224])
        img = img / 255.0
        return tf.expand_dims(img, 0)

ONNX Integration

ONNXInferenceNode

ONNX Runtime provides cross-platform inference for models exported from PyTorch, TensorFlow, and other frameworks.

from horus.ml_utils import ONNXInferenceNode
import numpy as np

class YOLODetector(ONNXInferenceNode):
    def __init__(self):
        super().__init__(
            model_path="models/yolov8n.onnx",
            input_topic="camera.raw",
            output_topic="detections",
            use_gpu=True  # Uses CUDAExecutionProvider if available
        )

    def load_model(self):
        import onnxruntime as ort
        providers = (
            ["CUDAExecutionProvider", "CPUExecutionProvider"]
            if self.use_gpu else ["CPUExecutionProvider"]
        )
        self.session = ort.InferenceSession(self.model_path, providers=providers)
        self.input_name = self.session.get_inputs()[0].name
        self.output_names = [o.name for o in self.session.get_outputs()]

    def preprocess(self, image_data):
        # Resize to 640x640, normalize, NCHW format
        img = np.array(image_data).astype(np.float32) / 255.0
        img = np.transpose(img, (2, 0, 1))  # HWC -> CHW
        return np.expand_dims(img, 0)

    def infer(self, data):
        outputs = self.session.run(self.output_names, {self.input_name: data})
        return outputs[0] if len(outputs) == 1 else outputs

Preprocessing Utilities

The module includes common preprocessing functions:

ImageNet Preprocessing

from horus.ml_utils import preprocess_image_imagenet

# Standard ImageNet preprocessing for classification models
preprocessed = preprocess_image_imagenet(
    image,
    target_size=(224, 224),
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]
)
# Returns (1, C, H, W) normalized tensor

YOLO Preprocessing

from horus.ml_utils import preprocess_image_yolo

# Letterbox preprocessing for YOLO models
preprocessed, scale, pad_w, pad_h = preprocess_image_yolo(
    image,
    target_size=640
)
# Returns preprocessed image and transform params for postprocessing

Non-Maximum Suppression

from horus.ml_utils import nms

# Filter overlapping detections
keep_indices = nms(
    boxes,      # (N, 4) as [x, y, w, h]
    scores,     # (N,) confidence scores
    iou_threshold=0.45
)

Performance Monitoring

Track inference performance in real-time:

from horus.ml_utils import PerformanceMonitor

monitor = PerformanceMonitor(window_size=100)

# In your inference loop
for frame in frames:
    result, latency_ms = model.run_inference(frame)
    monitor.record(latency_ms)

# Get statistics
stats = monitor.get_stats()
print(f"Avg latency: {stats['avg_latency_ms']:.2f}ms")
print(f"P95 latency: {stats['p95_latency_ms']:.2f}ms")
print(f"FPS: {stats['fps']:.1f}")

# Or print summary
monitor.print_stats()

Available Statistics

MetricDescription
avg_latency_msAverage inference time
min_latency_msFastest inference
max_latency_msSlowest inference
std_latency_msStandard deviation
p50_latency_msMedian latency
p95_latency_ms95th percentile
p99_latency_ms99th percentile
fpsFrames per second

Zero-Copy Tensor Integration

Note: TensorPool and TensorHandle are not directly exposed in Python. They are used internally by the Image, PointCloud, and DepthImage domain types. Use these domain types for zero-copy data sharing.

Combine ML utilities with HORUS domain types for zero-copy data sharing:

import horus
import numpy as np

# Image supports zero-copy numpy access
img = horus.Image(height=480, width=640, encoding="rgb8")

# Zero-copy to numpy
np_array = img.numpy()  # No copy!

# Device info
print(f"Image is CPU-backed: {img.is_cpu()}")

Complete Example

from horus import Node, Scheduler
from horus.ml_utils import PyTorchInferenceNode, PerformanceMonitor
import numpy as np

def camera_tick(node):
    """Simulate camera frame"""
    frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    node.send("camera.raw", frame)

detector = ObjectDetector()
detector.load_model()
monitor = PerformanceMonitor()

def detector_tick(node):
    if node.has_msg("camera.raw"):
        frame = node.get("camera.raw")
        result, latency = detector.run_inference(frame)
        monitor.record(latency)
        node.send("detections", result)

        # Print stats every 100 frames
        if monitor.get_stats()["samples"] % 100 == 0:
            monitor.print_stats()


def main():
    camera = Node(name="camera", pubs=["camera.raw"], tick=camera_tick, rate=30)
    detect = Node(name="detector", subs=["camera.raw"], pubs=["detections"], tick=detector_tick, rate=30)

    scheduler = Scheduler()
    scheduler.add(camera, order=0)
    scheduler.add(detect, order=1)
    scheduler.run()

if __name__ == "__main__":
    main()

See Also