Scheduler API
The Scheduler is the central orchestrator in HORUS. It creates topics, registers nodes, manages their lifecycle, and drives the tick loop. Configuration uses a builder pattern -- chain methods to set tick rate, RT mode, watchdog, and networking, then call spin() to run.
Rust: See
Schedulerfor the Rust equivalent. Python: Seehorus.Schedulerfor the Python equivalent.
// simplified
#include <horus/scheduler.hpp>
using namespace horus::literals;
Quick Reference -- Scheduler Methods
| Method | Returns | Description |
|---|---|---|
Scheduler() | Scheduler | Construct a new scheduler |
.tick_rate(Frequency) | Scheduler& | Set the global tick rate |
.name(string_view) | Scheduler& | Set the scheduler name |
.prefer_rt() | Scheduler& | Prefer RT scheduling (graceful degradation) |
.require_rt() | Scheduler& | Require RT scheduling (fail if unavailable) |
.deterministic(bool) | Scheduler& | Enable deterministic mode (SimClock + seeded RNG) |
.verbose(bool) | Scheduler& | Enable verbose logging |
.watchdog(Duration) | Scheduler& | Set global watchdog timeout |
.blackbox(size_t) | Scheduler& | Set BlackBox flight recorder size in MB |
.enable_network() | Scheduler& | Enable LAN network replication |
.advertise<T>(string_view) | Publisher<T> | Create a publisher for a named topic |
.subscribe<T>(string_view) | Subscriber<T> | Create a subscriber for a named topic |
.add(string_view) | NodeBuilder | Add a lambda node by name |
.add(Node&) | NodeBuilder | Add a struct-based node |
.add(LambdaNode&) | NodeBuilder | Add a LambdaNode |
.spin() | void | Run the scheduler (blocks until stopped) |
.tick_once() | void | Execute a single tick of all nodes |
.stop() | void | Stop the scheduler (thread-safe) |
.is_running() | bool | Check if the scheduler is still running |
.get_name() | std::string | Get the scheduler name |
.status() | std::string | Get a human-readable status string |
.has_full_rt() | bool | Check if full RT capabilities are available |
.node_list() | std::vector<std::string> | Get list of registered node names |
Quick Reference -- NodeBuilder Methods
| Method | Returns | Description |
|---|---|---|
.rate(Frequency) | NodeBuilder& | Set tick rate for this node |
.budget(Duration) | NodeBuilder& | Set execution budget (auto-enables RT) |
.deadline(Duration) | NodeBuilder& | Set hard deadline (auto-enables RT) |
.on_miss(Miss) | NodeBuilder& | Set deadline miss policy |
.compute() | NodeBuilder& | Mark as compute-class (CPU-bound) |
.async_io() | NodeBuilder& | Mark as async I/O class |
.on(string_view) | NodeBuilder& | Trigger on topic message (event-driven) |
.order(uint32_t) | NodeBuilder& | Set execution order within tick |
.pin_core(size_t) | NodeBuilder& | Pin to a specific CPU core |
.priority(int32_t) | NodeBuilder& | Set thread priority |
.watchdog(Duration) | NodeBuilder& | Set per-node watchdog timeout |
.tick(function) | NodeBuilder& | Set the tick callback |
.init(function) | NodeBuilder& | Set the init callback (called once) |
.safe_state(function) | NodeBuilder& | Set the enter_safe_state callback |
.build() | void | Finalize and register the node |
Construction and Configuration
The Scheduler uses a builder pattern. All configuration methods return Scheduler& for chaining. Configuration is deferred -- nothing runs until spin() or tick_once().
#include <horus/scheduler.hpp>
using namespace horus::literals;
auto sched = horus::Scheduler()
.tick_rate(100_hz) // 100 Hz global tick rate
.name("arm_controller") // scheduler name (shown in logs)
.prefer_rt() // request SCHED_FIFO (fallback to SCHED_OTHER)
.watchdog(500_ms) // kill nodes that exceed 500ms
.blackbox(64) // 64 MB flight recorder
.verbose(true); // print scheduling decisions
RT Mode Selection
| Method | Behavior |
|---|---|
.prefer_rt() | Request real-time scheduling. Falls back gracefully if CAP_SYS_NICE is unavailable |
.require_rt() | Require real-time scheduling. Fails with an error if RT is unavailable |
| (neither) | Best-effort scheduling only |
Check RT availability at runtime:
if (sched.has_full_rt()) {
printf("Running with SCHED_FIFO\n");
} else {
printf("Falling back to SCHED_OTHER\n");
}
Deterministic Mode
Deterministic mode replaces wall clock with SimClock and seeds all RNG. Use for reproducible tests and replay:
auto sched = horus::Scheduler()
.tick_rate(100_hz)
.deterministic(true); // SimClock + seeded RNG
Creating Topics
Topics are created on the scheduler before nodes are added. The scheduler owns the underlying shared memory segments.
auto cmd_pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto scan_sub = sched.subscribe<horus::msg::LaserScan>("lidar.scan");
auto imu_sub = sched.subscribe<horus::msg::Imu>("imu.data");
auto odom_pub = sched.advertise<horus::msg::Odometry>("odom");
Topic names use dots (not slashes) as separators. This is required for macOS shm_open compatibility.
See Publisher and Subscriber API for the full messaging API.
Adding Nodes
The scheduler supports three node styles. All go through NodeBuilder for scheduling configuration.
Style 1: Lambda Node (Inline)
The simplest approach. Pass a name and a tick callback:
sched.add("obstacle_detector")
.rate(50_hz)
.budget(5_ms)
.on_miss(horus::Miss::Skip)
.tick([&] {
auto scan = scan_sub.recv();
if (!scan) return;
// process scan...
})
.build();
Style 2: Struct-Based Node
Subclass horus::Node for complex nodes with state (see Node API):
ArmController ctrl; // subclass of horus::Node
sched.add(ctrl).rate(100_hz).budget(2_ms).build();
Style 3: LambdaNode
Declarative node with builder pattern for pub/sub. Like Python's horus.Node():
auto nav = horus::LambdaNode("navigator")
.sub<horus::msg::Odometry>("odom")
.pub<horus::msg::CmdVel>("motor.cmd")
.on_tick([](horus::LambdaNode& self) {
auto odom = self.recv<horus::msg::Odometry>("odom");
if (!odom) return;
self.send("motor.cmd", horus::msg::CmdVel{0, 0.3f, 0.0f});
});
sched.add(nav)
.rate(20_hz)
.build();
See Node API for the full lifecycle and introspection API.
NodeBuilder Configuration
Every sched.add(...) call returns a NodeBuilder. Chain scheduling options before calling .build().
Execution Class Auto-Detection
The scheduler automatically assigns an execution class based on what you configure:
| Configuration | Detected Class | Thread |
|---|---|---|
.rate() + .budget() or .deadline() | Rt | Dedicated RT thread, SCHED_FIFO |
.rate() only | BestEffort | Shared thread pool |
.compute() | Compute | CPU-bound thread pool |
.async_io() | AsyncIo | I/O thread pool |
.on("topic") | Event | Wakes on message arrival |
// RT node: rate + budget auto-detects as Rt class
sched.add("safety_monitor")
.rate(1000_hz)
.budget(100_us)
.deadline(900_us)
.on_miss(horus::Miss::SafeMode)
.priority(90)
.pin_core(3)
.tick([&] { /* safety checks */ })
.build();
// Compute node: long-running CPU work
sched.add("path_planner")
.compute()
.tick([&] { /* A* search */ })
.build();
// Event-driven node: wakes on message
sched.add("logger")
.on("diagnostics.status")
.tick([&] { /* log message */ })
.build();
Deadline Miss Policies
| Policy | Behavior |
|---|---|
Miss::Warn | Log a warning, continue execution |
Miss::Skip | Skip the current tick, reset for next cycle |
Miss::SafeMode | Call enter_safe_state(), then continue |
Miss::Stop | Stop the node permanently |
Init and Safe State Callbacks
Lambda nodes can set lifecycle callbacks through the builder:
sched.add("motor_driver")
.rate(100_hz)
.budget(2_ms)
.init([&] {
printf("Motor driver initialized\n");
// one-time hardware setup
})
.safe_state([&] {
// send zero velocity on watchdog timeout
cmd_pub.send(horus::msg::CmdVel{0, 0.0f, 0.0f});
})
.tick([&] {
// normal motor control
})
.build();
Running the Scheduler
Blocking Spin
spin() blocks the calling thread until the scheduler is stopped (via Ctrl+C, SIGTERM, or .stop()):
sched.spin();
// execution resumes here after shutdown
Single Tick
tick_once() executes exactly one tick of all registered nodes. Useful for testing and stepped simulation:
for (int i = 0; i < 1000; ++i) {
sched.tick_once();
}
Stopping from Another Thread
stop() is thread-safe. Call it from a signal handler, another thread, or a node's tick callback:
// Stop programmatically after 10 seconds:
std::thread timer([&] {
std::this_thread::sleep_for(std::chrono::seconds(10));
sched.stop();
});
sched.spin();
timer.join();
Runtime Queries
Query the scheduler state at any time (all methods are thread-safe):
// Check if still running
if (sched.is_running()) { /* ... */ }
// Get the scheduler name
std::string name = sched.get_name();
// Get human-readable status
std::string info = sched.status();
// List all registered nodes
auto nodes = sched.node_list();
for (const auto& n : nodes) {
printf(" node: %s\n", n.c_str());
}
Common Patterns
Multi-Rate System
Different nodes run at different rates within the same scheduler:
auto sched = horus::Scheduler()
.tick_rate(1000_hz) // GCD of all node rates
.prefer_rt();
sched.add("safety")
.rate(1000_hz).budget(50_us).priority(99)
.tick([&] { /* fastest, highest priority */ }).build();
sched.add("controller")
.rate(100_hz).budget(2_ms)
.tick([&] { /* medium rate */ }).build();
sched.add("planner")
.rate(10_hz).compute()
.tick([&] { /* slow, CPU-heavy */ }).build();
sched.spin();
Test Harness with tick_once
Step through execution deterministically for unit tests:
auto sched = horus::Scheduler().tick_rate(100_hz).deterministic(true);
auto pub = sched.advertise<horus::msg::CmdVel>("cmd");
auto sub = sched.subscribe<horus::msg::CmdVel>("cmd");
int tick_count = 0;
sched.add("producer").rate(100_hz)
.tick([&] { pub.send(horus::msg::CmdVel{0, 1.0f, 0.0f}); tick_count++; })
.build();
sched.tick_once();
assert(tick_count == 1);
auto msg = sub.recv();
assert(msg.has_value());
Network-Enabled Scheduler
auto sched = horus::Scheduler().tick_rate(100_hz)
.enable_network().name("robot_01"); // topics visible on LAN
auto pub = sched.advertise<horus::msg::Odometry>("robot_01.odom");
Ownership
Scheduler is move-only. It owns the underlying Rust Box<FfiScheduler> and releases it in the destructor. Copy is deleted:
horus::Scheduler a;
// horus::Scheduler b = a; // COMPILE ERROR
horus::Scheduler b = std::move(a); // OK
See Also
- Node API -- Struct-based and lambda-based node lifecycle
- Publisher and Subscriber API -- Zero-copy messaging
- Services and Actions API -- RPC and long-running tasks
- C++ Real-Time Guide -- Budget, deadline, SCHED_FIFO, CPU pinning
- C++ API Overview -- All classes at a glance