Async Nodes
HORUS provides native Python async/await support through AsyncNode and AsyncTopic classes. These enable non-blocking I/O operations, making it easy to integrate with async libraries, HTTP APIs, databases, and other async-native Python code.
Overview
| Class | Description |
|---|---|
AsyncNode | Base class for async nodes with async def async_tick() |
AsyncTopic | Async wrapper for Topic with await send() and await recv() |
AsyncNode
The AsyncNode class lets you use Python's native async/await syntax:
import horus
import asyncio
class MyAsyncNode(horus.AsyncNode):
async def async_init(self):
"""Called once at startup"""
self.topic = horus.AsyncTopic(dict)
self.api_url = "https://api.example.com/data"
async def async_tick(self):
"""Called each scheduler cycle - use await freely!"""
# Fetch from HTTP API (non-blocking)
async with aiohttp.ClientSession() as session:
async with session.get(self.api_url) as response:
data = await response.json()
# Publish to HORUS topic
await self.topic.send(data)
async def async_shutdown(self):
"""Called once at shutdown"""
print("Node shutting down")
Lifecycle Methods
| Method | Description |
|---|---|
async def async_init() | One-time initialization before first tick |
async def async_tick() | Called each scheduler cycle |
async def async_shutdown() | Cleanup when node stops |
Running an AsyncNode
import horus
# AsyncNode accepts the same kwargs as Node
node = MyAsyncNode(
name="async_processor",
subs=["input"],
pubs=["output"],
rate=30,
)
# Via scheduler
scheduler = horus.Scheduler()
scheduler.add(node, order=0)
scheduler.run()
# Or via horus.run()
horus.run(node)
AsyncTopic
AsyncTopic provides async send/receive operations:
import horus
from horus import AsyncTopic, CmdVel
# Create async topic (same constructor as Topic — pass msg_type)
topic = AsyncTopic(CmdVel)
# With custom capacity
topic = AsyncTopic(dict, capacity=2048)
# Async send
await topic.send(CmdVel(linear=1.0, angular=0.5))
# Async receive (waits for message with exponential backoff)
msg = await topic.recv()
# Async receive with timeout (raises asyncio.TimeoutError)
msg = await topic.recv(timeout=5.0)
# Try receive (returns None immediately if no message)
msg = await topic.try_recv()
API Reference
class AsyncTopic:
def __init__(self, msg_type, capacity: int = 1024):
"""Create async topic wrapping a Topic(msg_type, capacity)"""
async def send(self, msg: Any) -> None:
"""Send message asynchronously (runs in executor)"""
async def recv(self, timeout: Optional[float] = None) -> Any:
"""Receive message, waiting with exponential backoff.
Raises asyncio.TimeoutError if timeout expires."""
async def try_recv(self) -> Optional[Any]:
"""Try to receive, returns None immediately if no message"""
Utility Functions
Use Python's standard asyncio utilities alongside async nodes:
import asyncio
# Non-blocking sleep
await asyncio.sleep(0.1)
# Run multiple async operations concurrently
results = await asyncio.gather(
fetch_sensor_a(),
fetch_sensor_b(),
fetch_sensor_c()
)
# Wait with timeout
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
Complete Example: Async HTTP API Node
import horus
import aiohttp
import asyncio
class WeatherNode(horus.AsyncNode):
"""Fetches weather data from API and publishes to HORUS"""
async def async_init(self):
self.weather_pub = horus.AsyncTopic(dict)
self.location = "San Francisco"
self.api_key = "your-api-key"
self.session = aiohttp.ClientSession()
async def async_tick(self):
url = f"https://api.weather.com/v1/{self.location}?key={self.api_key}"
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
await self.weather_pub.send({
"temperature": data["temp"],
"humidity": data["humidity"],
"timestamp": data["timestamp"]
})
except aiohttp.ClientError as e:
print(f"API error: {e}")
async def async_shutdown(self):
await self.session.close()
class WeatherConsumer(horus.AsyncNode):
"""Consumes weather data and logs to database"""
async def async_init(self):
self.weather_sub = horus.AsyncTopic(dict)
# Setup async database connection
self.db = await asyncpg.connect("postgresql://localhost/robotics")
async def async_tick(self):
weather = await self.weather_sub.try_recv()
if weather:
await self.db.execute(
"INSERT INTO weather_log (temp, humidity, ts) VALUES ($1, $2, $3)",
weather["temperature"],
weather["humidity"],
weather["timestamp"]
)
async def async_shutdown(self):
await self.db.close()
def main():
scheduler = horus.Scheduler()
scheduler.add(WeatherNode(), order=0)
scheduler.add(WeatherConsumer(), order=1)
scheduler.run()
if __name__ == "__main__":
main()
When to Use AsyncNode
Good use cases:
- HTTP/REST API integration
- Database operations (asyncpg, aioredis)
- WebSocket connections
- File I/O operations
- Any I/O-bound operations
Not ideal for:
- CPU-bound computations (use regular Node with multiprocessing)
- Real-time control loops requiring deterministic timing
- Operations requiring <1ms latency
Mixing Sync and Async
You can mix regular Node and AsyncNode in the same scheduler:
import horus
# Regular sync node uses callback pattern
def read_sensor(node):
"""Fast sensor reading - blocking is OK"""
reading = node.get("sensor.raw")
if reading:
node.send("sensor.processed", reading * 2)
sync_node = horus.Node(
name="sensor_reader",
subs=["sensor.raw"],
pubs=["sensor.processed"],
tick=read_sensor,
rate=100
)
# Async node uses class inheritance
class AsyncCloudNode(horus.AsyncNode):
"""Async node for cloud upload"""
async def async_init(self):
self.topic = horus.AsyncTopic(dict)
async def async_tick(self):
msg = await self.topic.try_recv()
if msg:
await self.upload_to_cloud(msg)
# Both work together
scheduler = horus.Scheduler()
scheduler.add(sync_node, order=0) # Runs first
scheduler.add(AsyncCloudNode(), order=10) # Runs later
scheduler.run()
See Also
- Python Bindings - Core Python API
- ML Utilities - ML inference helpers
- Examples - More Python examples