Async Nodes

HORUS provides native Python async/await support through AsyncNode and AsyncTopic classes. These enable non-blocking I/O operations, making it easy to integrate with async libraries, HTTP APIs, databases, and other async-native Python code.

Overview

ClassDescription
AsyncNodeBase class for async nodes with async def async_tick()
AsyncTopicAsync wrapper for Topic with await send() and await recv()

AsyncNode

The AsyncNode class lets you use Python's native async/await syntax:

import horus
import asyncio

class MyAsyncNode(horus.AsyncNode):
    async def async_init(self):
        """Called once at startup"""
        self.topic = horus.AsyncTopic(dict)
        self.api_url = "https://api.example.com/data"

    async def async_tick(self):
        """Called each scheduler cycle - use await freely!"""
        # Fetch from HTTP API (non-blocking)
        async with aiohttp.ClientSession() as session:
            async with session.get(self.api_url) as response:
                data = await response.json()

        # Publish to HORUS topic
        await self.topic.send(data)

    async def async_shutdown(self):
        """Called once at shutdown"""
        print("Node shutting down")

Lifecycle Methods

MethodDescription
async def async_init()One-time initialization before first tick
async def async_tick()Called each scheduler cycle
async def async_shutdown()Cleanup when node stops

Running an AsyncNode

import horus

# AsyncNode accepts the same kwargs as Node
node = MyAsyncNode(
    name="async_processor",
    subs=["input"],
    pubs=["output"],
    rate=30,
)

# Via scheduler
scheduler = horus.Scheduler()
scheduler.add(node, order=0)
scheduler.run()

# Or via horus.run()
horus.run(node)

AsyncTopic

AsyncTopic provides async send/receive operations:

import horus
from horus import AsyncTopic, CmdVel

# Create async topic (same constructor as Topic — pass msg_type)
topic = AsyncTopic(CmdVel)

# With custom capacity
topic = AsyncTopic(dict, capacity=2048)

# Async send
await topic.send(CmdVel(linear=1.0, angular=0.5))

# Async receive (waits for message with exponential backoff)
msg = await topic.recv()

# Async receive with timeout (raises asyncio.TimeoutError)
msg = await topic.recv(timeout=5.0)

# Try receive (returns None immediately if no message)
msg = await topic.try_recv()

API Reference

class AsyncTopic:
    def __init__(self, msg_type, capacity: int = 1024):
        """Create async topic wrapping a Topic(msg_type, capacity)"""

    async def send(self, msg: Any) -> None:
        """Send message asynchronously (runs in executor)"""

    async def recv(self, timeout: Optional[float] = None) -> Any:
        """Receive message, waiting with exponential backoff.
        Raises asyncio.TimeoutError if timeout expires."""

    async def try_recv(self) -> Optional[Any]:
        """Try to receive, returns None immediately if no message"""

Utility Functions

Use Python's standard asyncio utilities alongside async nodes:

import asyncio

# Non-blocking sleep
await asyncio.sleep(0.1)

# Run multiple async operations concurrently
results = await asyncio.gather(
    fetch_sensor_a(),
    fetch_sensor_b(),
    fetch_sensor_c()
)

# Wait with timeout
try:
    result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
    print("Operation timed out")

Complete Example: Async HTTP API Node

import horus
import aiohttp
import asyncio

class WeatherNode(horus.AsyncNode):
    """Fetches weather data from API and publishes to HORUS"""

    async def async_init(self):
        self.weather_pub = horus.AsyncTopic(dict)
        self.location = "San Francisco"
        self.api_key = "your-api-key"
        self.session = aiohttp.ClientSession()

    async def async_tick(self):
        url = f"https://api.weather.com/v1/{self.location}?key={self.api_key}"

        try:
            async with self.session.get(url) as response:
                if response.status == 200:
                    data = await response.json()
                    await self.weather_pub.send({
                        "temperature": data["temp"],
                        "humidity": data["humidity"],
                        "timestamp": data["timestamp"]
                    })
        except aiohttp.ClientError as e:
            print(f"API error: {e}")

    async def async_shutdown(self):
        await self.session.close()


class WeatherConsumer(horus.AsyncNode):
    """Consumes weather data and logs to database"""

    async def async_init(self):
        self.weather_sub = horus.AsyncTopic(dict)
        # Setup async database connection
        self.db = await asyncpg.connect("postgresql://localhost/robotics")

    async def async_tick(self):
        weather = await self.weather_sub.try_recv()
        if weather:
            await self.db.execute(
                "INSERT INTO weather_log (temp, humidity, ts) VALUES ($1, $2, $3)",
                weather["temperature"],
                weather["humidity"],
                weather["timestamp"]
            )

    async def async_shutdown(self):
        await self.db.close()


def main():
    scheduler = horus.Scheduler()
    scheduler.add(WeatherNode(), order=0)
    scheduler.add(WeatherConsumer(), order=1)
    scheduler.run()

if __name__ == "__main__":
    main()

When to Use AsyncNode

Good use cases:

  • HTTP/REST API integration
  • Database operations (asyncpg, aioredis)
  • WebSocket connections
  • File I/O operations
  • Any I/O-bound operations

Not ideal for:

  • CPU-bound computations (use regular Node with multiprocessing)
  • Real-time control loops requiring deterministic timing
  • Operations requiring <1ms latency

Mixing Sync and Async

You can mix regular Node and AsyncNode in the same scheduler:

import horus

# Regular sync node uses callback pattern
def read_sensor(node):
    """Fast sensor reading - blocking is OK"""
    reading = node.get("sensor.raw")
    if reading:
        node.send("sensor.processed", reading * 2)

sync_node = horus.Node(
    name="sensor_reader",
    subs=["sensor.raw"],
    pubs=["sensor.processed"],
    tick=read_sensor,
    rate=100
)

# Async node uses class inheritance
class AsyncCloudNode(horus.AsyncNode):
    """Async node for cloud upload"""
    async def async_init(self):
        self.topic = horus.AsyncTopic(dict)

    async def async_tick(self):
        msg = await self.topic.try_recv()
        if msg:
            await self.upload_to_cloud(msg)

# Both work together
scheduler = horus.Scheduler()
scheduler.add(sync_node, order=0)           # Runs first
scheduler.add(AsyncCloudNode(), order=10)   # Runs later
scheduler.run()

See Also