Async Nodes

HORUS automatically detects async def tick functions and runs them on the async I/O thread pool. No special classes or imports needed — just pass an async function to Node().

Basic Usage

# simplified
import horus
import aiohttp

async def fetch_weather(node):
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.weather.com/data") as resp:
            data = await resp.json()
            node.send("weather", data)

node = horus.Node(
    name="weather",
    tick=fetch_weather,
    rate=1,
    pubs=["weather"],
)
horus.run(node)

That's it. async def is auto-detected — the scheduler runs this node on the async I/O thread pool (matching Rust's .async_io() execution class), so it doesn't block other nodes.

How It Works

When you pass an async def to Node(tick=...):

  1. Node() detects that tick is an async function
  2. The scheduler automatically applies the async I/O execution class (matching Rust .async_io())
  3. The Rust scheduler runs this node on a separate thread pool, not the main tick thread

Other (sync) nodes continue ticking while async nodes await.

Async Node Detection

SignalResult
async def tick(node):Auto-classified as AsyncIo
async def init(node):Init runs on async runtime
async def shutdown(node):Shutdown runs on async runtime
Regular def tick(node):Stays in default execution class

Async Init and Shutdown

init and shutdown callbacks can also be async:

# simplified
import horus
import asyncpg

async def setup(node):
    node.db = await asyncpg.connect("postgresql://localhost/robotics")

async def process(node):
    if node.has_msg("data"):
        data = node.recv("data")
        await node.db.execute("INSERT INTO logs (value) VALUES ($1)", data)

async def cleanup(node):
    await node.db.close()

node = horus.Node(
    name="db_logger",
    tick=process,
    init=setup,
    shutdown=cleanup,
    rate=10,
    subs=["data"],
)
horus.run(node)

Complete Example: HTTP API + Database

# simplified
import horus
import aiohttp
import asyncpg

async def fetch(node):
    """Fetch sensor data from HTTP API"""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/sensor") as resp:
            if resp.status == 200:
                data = await resp.json()
                node.send("sensor.data", data)

async def store_init(node):
    node.db = await asyncpg.connect("postgresql://localhost/robotics")

async def store(node):
    """Store received data in database"""
    if node.has_msg("sensor.data"):
        data = node.recv("sensor.data")
        await node.db.execute(
            "INSERT INTO sensor_log (temp, humidity) VALUES ($1, $2)",
            data["temperature"], data["humidity"]
        )

async def store_shutdown(node):
    await node.db.close()

horus.run(
    horus.Node(name="fetcher", tick=fetch, rate=1, pubs=["sensor.data"], order=0),
    horus.Node(name="storer", tick=store, init=store_init, shutdown=store_shutdown,
               rate=10, subs=["sensor.data"], order=1),
)

Mixing Sync and Async

Sync and async nodes work together in the same scheduler. Sync nodes run on the main tick thread, async nodes run on the I/O thread pool:

# simplified
import horus

def read_sensor(node):
    """Fast sync sensor read"""
    node.send("raw", get_lidar_data())

async def upload(node):
    """Slow async cloud upload"""
    if node.has_msg("raw"):
        data = node.recv("raw")
        await cloud_client.upload(data)

horus.run(
    horus.Node(name="sensor", tick=read_sensor, rate=100, order=0, pubs=["raw"]),
    horus.Node(name="upload", tick=upload, rate=1, order=1, subs=["raw"]),
)

No special handling — the scheduler detects which is async and routes accordingly.

Quick Reference

FeatureSync NodeAsync Node
Tick functiondef tick(node):async def tick(node):
Execution classDefault / Compute / RtAsyncIo (auto)
Thread poolMain tick threadSeparate I/O thread pool
DetectionAutomaticAutomatic (inspect.iscoroutinefunction)
Latency overhead~0~1ms (event loop scheduling)
Best forSensor reads, control, ML inferenceHTTP, database, WebSocket, file I/O

When to Use Async

Good use cases:

  • HTTP/REST API integration (aiohttp, httpx)
  • Database operations (asyncpg, aioredis, motor)
  • WebSocket connections
  • File I/O operations
  • Any I/O-bound work that benefits from await

Not ideal for:

  • CPU-bound computation -- use compute=True instead
  • Real-time control loops -- use sync tick with budget and deadline
  • Operations requiring sub-millisecond latency -- async overhead is ~1ms

Design Decisions

Why auto-detect instead of explicit async=True parameter? Python already distinguishes def from async def at the language level. Auto-detection means zero boilerplate -- just write async def tick(node): and the scheduler does the right thing. This matches Python's "explicit is better than implicit" principle: the async keyword is the explicit signal.

Why a separate I/O thread pool instead of running async on the main tick thread? The main tick thread runs all sync nodes with deterministic timing. If an async node's await blocked the main thread, it would delay every other node. Running async nodes on a dedicated thread pool isolates I/O latency from control loop timing.

Why no await for node.send() and node.recv()? Topic operations use lock-free shared memory and complete in microseconds. Making them async would add event loop overhead for no benefit. Keep topic I/O synchronous even inside async tick functions.

Mixing sync and async: The scheduler treats async nodes as AsyncIo execution class, which runs on the I/O thread pool alongside Rust .async_io() nodes. Sync and async nodes communicate through the same topics with no special handling -- shared memory IPC is thread-safe by design.

Error Handling

When an async tick raises an exception, it flows through the same FailurePolicy as sync nodes -- there is no separate error path for async. KeyboardInterrupt is special-cased: it sets the scheduler's stop flag instead of propagating, so Ctrl+C triggers a clean shutdown rather than crashing.

Use try/except inside your async tick to handle expected failures gracefully:

# simplified
import horus
import aiohttp

async def resilient_tick(node):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get("http://api.example.com/data") as resp:
                data = await resp.json()
                node.send("api_data", data)
    except aiohttp.ClientError as e:
        node.log_error(f"HTTP request failed: {e}")
    except Exception as e:
        node.log_error(f"Unexpected error: {e}")

node = horus.Node(
    name="resilient_fetcher",
    tick=resilient_tick,
    rate=1,
    pubs=["api_data"],
)
horus.run(node)

Cancellation Behavior

When the scheduler stops (Ctrl+C, duration expires), pending awaits complete before shutdown runs -- they are not cancelled. This means a hanging await will block shutdown indefinitely.

Always use timeouts on network requests and any other awaits that could hang. If your async tick makes an HTTP call without a timeout and the server never responds, the entire scheduler will hang on shutdown waiting for that tick to finish.

Timeout Pattern

Wrap network calls with asyncio.wait_for() to guarantee bounded execution time:

# simplified
import asyncio
import aiohttp

async def safe_tick(node):
    try:
        async with aiohttp.ClientSession() as session:
            resp = await asyncio.wait_for(
                session.get("http://api.example.com/data"),
                timeout=2.0
            )
            data = await resp.json()
            node.send("api_data", data)
    except asyncio.TimeoutError:
        node.log_warning("API timeout — skipping this tick")
    except Exception as e:
        node.log_error(f"API error: {e}")

Async File I/O

Use aiofiles for non-blocking log writing:

# simplified
import horus
import aiofiles

log_file = None

async def logger_init(node):
    global log_file
    log_file = await aiofiles.open("sensor_log.csv", "w")
    await log_file.write("timestamp,value\n")

async def logger_tick(node):
    msg = node.recv("sensor")
    if msg:
        await log_file.write(f"{horus.timestamp_ns()},{msg.get('value', 0)}\n")

async def logger_shutdown(node):
    await log_file.close()
    node.log_info("Log file closed")

logger = horus.Node(
    name="logger",
    init=logger_init,
    tick=logger_tick,
    shutdown=logger_shutdown,
    rate=100,
    subs=["sensor"],
)

Testing Async Nodes

tick_once() works with async nodes — the scheduler runs the async event loop internally:

# simplified
sched = horus.Scheduler(tick_rate=10, deterministic=True)
sched.add(horus.Node(name="fetcher", tick=async_tick, rate=1, pubs=["data"]))

# tick_once() handles async nodes transparently
for _ in range(5):
    sched.tick_once()

Common Errors

ErrorCauseFix
Node hangs on shutdownawait without timeoutUse asyncio.wait_for()
compute=True with async defMutually exclusiveRemove compute=True or make tick synchronous
on="topic" with async defMutually exclusiveRemove on= or make tick synchronous
Low throughputGIL + await overheadReduce rate, use batch processing

See Also