Clock API

Framework-aware clock that respects deterministic mode. Use these instead of time.time() for reproducible behavior.

import horus

dt = horus.dt()          # Timestep for this tick
remaining = horus.budget_remaining()  # Budget left

Functions

FunctionReturnsDescription
horus.now()floatCurrent time in seconds (wall clock or SimClock)
horus.dt()floatTimestep for this tick (actual elapsed or fixed 1/rate)
horus.elapsed()floatSeconds since scheduler start
horus.tick()intCurrent tick number
horus.budget_remaining()floatSeconds of budget left this tick (inf if no budget)
horus.rng_float()floatRandom [0.0, 1.0) (system entropy or tick-seeded in deterministic)
horus.timestamp_ns()intNanosecond timestamp for TransformFrame queries

Unit Constants

ConstantValueUsage
horus.us1e-6Microseconds: 300 * horus.us = 300μs
horus.ms1e-3Milliseconds: 1 * horus.ms = 1ms
# Budget and deadline use seconds — multiply with constants
node = horus.Node(
    budget=300 * horus.us,     # 300 microseconds
    deadline=900 * horus.us,   # 900 microseconds
    ...
)

Wall Clock vs SimClock

FunctionNormal ModeDeterministic Mode
now()Wall clock (time.monotonic())SimClock (advances by exactly 1/rate per tick)
dt()Actual elapsed since last tickFixed 1/rate (e.g., 0.01 for 100 Hz)
elapsed()Real wall time since startSimulated time since start
tick()Monotonic tick counterSame
rng_float()System entropy (non-deterministic)Tick-seeded (deterministic, reproducible)
budget_remaining()Real remaining budgetSame (budget enforcement still real-time)

Enable deterministic mode:

horus.run(my_node, deterministic=True)
# or
sched = horus.Scheduler(deterministic=True)

Adaptive Quality with budget_remaining()

Use budget_remaining() to do more work when time permits:

import horus

def planner_tick(node):
    # Always compute basic path
    path = compute_basic_path()

    # Only optimize if budget allows
    if horus.budget_remaining() > 2 * horus.ms:
        path = optimize_path(path)

    # Only smooth if still have time
    if horus.budget_remaining() > 1 * horus.ms:
        path = smooth_path(path)

    node.send("path", path)

planner = horus.Node(
    name="planner",
    tick=planner_tick,
    rate=10,
    budget=20 * horus.ms,    # 20ms budget
    on_miss="warn",
    pubs=["path"],
)

Physics Integration with dt()

Use dt() for frame-rate-independent physics. In deterministic mode, dt() returns a fixed value making simulations reproducible:

import horus

velocity = [0.0, 0.0]
position = [0.0, 0.0]

def physics_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd:
        velocity[0] = cmd.linear
        velocity[1] = cmd.angular

    # dt() adapts to actual tick rate; fixed in deterministic mode
    dt = horus.dt()
    position[0] += velocity[0] * dt
    position[1] += velocity[1] * dt

    node.send("odom", {"x": position[0], "y": position[1]})

Reproducible Random Numbers

rng_float() returns system entropy normally, but tick-seeded deterministic values when deterministic=True. Use it instead of random.random() for reproducible behavior in replay:

import horus

def sensor_tick(node):
    # Noise is reproducible across runs in deterministic mode
    noise = horus.rng_float() * 0.01
    reading = 9.81 + noise
    node.send("accel_z", {"value": reading})

See Also