Clock API
Framework-aware clock that respects deterministic mode. Use these instead of time.time() for reproducible behavior.
import horus
dt = horus.dt() # Timestep for this tick
remaining = horus.budget_remaining() # Budget left
Functions
| Function | Returns | Description |
|---|---|---|
horus.now() | float | Current time in seconds (wall clock or SimClock) |
horus.dt() | float | Timestep for this tick (actual elapsed or fixed 1/rate) |
horus.elapsed() | float | Seconds since scheduler start |
horus.tick() | int | Current tick number |
horus.budget_remaining() | float | Seconds of budget left this tick (inf if no budget) |
horus.rng_float() | float | Random [0.0, 1.0) (system entropy or tick-seeded in deterministic) |
horus.timestamp_ns() | int | Nanosecond timestamp for TransformFrame queries |
Unit Constants
| Constant | Value | Usage |
|---|---|---|
horus.us | 1e-6 | Microseconds: 300 * horus.us = 300μs |
horus.ms | 1e-3 | Milliseconds: 1 * horus.ms = 1ms |
# Budget and deadline use seconds — multiply with constants
node = horus.Node(
budget=300 * horus.us, # 300 microseconds
deadline=900 * horus.us, # 900 microseconds
...
)
Wall Clock vs SimClock
| Function | Normal Mode | Deterministic Mode |
|---|---|---|
now() | Wall clock (time.monotonic()) | SimClock (advances by exactly 1/rate per tick) |
dt() | Actual elapsed since last tick | Fixed 1/rate (e.g., 0.01 for 100 Hz) |
elapsed() | Real wall time since start | Simulated time since start |
tick() | Monotonic tick counter | Same |
rng_float() | System entropy (non-deterministic) | Tick-seeded (deterministic, reproducible) |
budget_remaining() | Real remaining budget | Same (budget enforcement still real-time) |
Enable deterministic mode:
horus.run(my_node, deterministic=True)
# or
sched = horus.Scheduler(deterministic=True)
Adaptive Quality with budget_remaining()
Use budget_remaining() to do more work when time permits:
import horus
def planner_tick(node):
# Always compute basic path
path = compute_basic_path()
# Only optimize if budget allows
if horus.budget_remaining() > 2 * horus.ms:
path = optimize_path(path)
# Only smooth if still have time
if horus.budget_remaining() > 1 * horus.ms:
path = smooth_path(path)
node.send("path", path)
planner = horus.Node(
name="planner",
tick=planner_tick,
rate=10,
budget=20 * horus.ms, # 20ms budget
on_miss="warn",
pubs=["path"],
)
Physics Integration with dt()
Use dt() for frame-rate-independent physics. In deterministic mode, dt() returns a fixed value making simulations reproducible:
import horus
velocity = [0.0, 0.0]
position = [0.0, 0.0]
def physics_tick(node):
cmd = node.recv("cmd_vel")
if cmd:
velocity[0] = cmd.linear
velocity[1] = cmd.angular
# dt() adapts to actual tick rate; fixed in deterministic mode
dt = horus.dt()
position[0] += velocity[0] * dt
position[1] += velocity[1] * dt
node.send("odom", {"x": position[0], "y": position[1]})
Reproducible Random Numbers
rng_float() returns system entropy normally, but tick-seeded deterministic values when deterministic=True. Use it instead of random.random() for reproducible behavior in replay:
import horus
def sensor_tick(node):
# Noise is reproducible across runs in deterministic mode
noise = horus.rng_float() * 0.01
reading = 9.81 + noise
node.send("accel_z", {"value": reading})
See Also
- Scheduler API — Deterministic mode, tick_rate
- Node API — Budget, deadline, on_miss kwargs
- Deterministic Mode — Full deterministic mode guide
- Rust Duration/Frequency API — Rust equivalent (
100_u64.hz(),200_u64.us())