Services
Beta: The Services API is functional in Rust but still maturing. Python bindings are not yet available. The API may change in future releases.
Services provide synchronous request/response communication between nodes. A client sends a request and blocks (or polls) until the server responds.
Use services when:
- You need a response before continuing (parameter queries, sensor calibration)
- The operation completes quickly (within milliseconds)
- You need guaranteed delivery with error reporting
Use topics instead for continuous data streams. Use actions instead for long-running tasks with feedback.
Defining a Service
Use the service! macro to define Request and Response types:
use horus::prelude::*;
service! {
AddTwoInts {
request {
a: i64,
b: i64,
}
response {
sum: i64,
}
}
}
This generates:
AddTwoIntsRequeststruct with fieldsaandbAddTwoIntsResponsestruct with fieldsumAddTwoIntsmarker type implementing theServicetrait
More complex example:
service! {
GetMapRegion {
request {
x_min: f64,
y_min: f64,
x_max: f64,
y_max: f64,
resolution: f64,
}
response {
width: u32,
height: u32,
data: Vec<u8>,
timestamp: u64,
}
}
}
Service Server
The server listens for requests and returns responses. It runs in a background thread.
let server = ServiceServerBuilder::<AddTwoInts>::new()
.on_request(|req| {
Ok(AddTwoIntsResponse { sum: req.a + req.b })
})
.build()?;
// Server runs in a background thread — it's active until dropped
// Add it to your scheduler or keep it alive in main()
Returning Errors
Return Err(String) to signal failure to the client:
let server = ServiceServerBuilder::<GetMapRegion>::new()
.on_request(|req| {
if req.resolution <= 0.0 {
return Err("Resolution must be positive".into());
}
if req.x_max <= req.x_min || req.y_max <= req.y_min {
return Err("Invalid region bounds".into());
}
let map = generate_map(req.x_min, req.y_min, req.x_max, req.y_max, req.resolution);
Ok(GetMapRegionResponse {
width: map.width,
height: map.height,
data: map.data,
timestamp: horus::timestamp_now(),
})
})
.build()?;
Server Configuration
let server = ServiceServerBuilder::<AddTwoInts>::new()
.on_request(|req| Ok(AddTwoIntsResponse { sum: req.a + req.b }))
.poll_interval(Duration::from_millis(1)) // How often to check for requests (default: 5ms)
.build()?;
Stopping a Server
The server stops when dropped, or explicitly:
server.stop();
Service Client
Blocking Client
The simplest way to call a service:
let mut client = ServiceClient::<AddTwoInts>::new()?;
let response = client.call(
AddTwoIntsRequest { a: 3, b: 5 },
Duration::from_secs(1), // Timeout
)?;
println!("3 + 5 = {}", response.sum);
Resilient Calls (Auto-Retry)
Use call_resilient for production code that needs automatic retries on transient failures:
// Auto-retry with default settings (3 retries, exponential backoff from 10ms)
let response = client.call_resilient(
AddTwoIntsRequest { a: 3, b: 5 },
Duration::from_secs(5),
)?;
// Custom retry configuration
use horus::prelude::RetryConfig;
let response = client.call_resilient_with(
AddTwoIntsRequest { a: 3, b: 5 },
Duration::from_secs(5),
RetryConfig::new(5, Duration::from_millis(20)), // 5 retries, 20ms initial backoff
)?;
call_resilient retries on Timeout and Transport errors. ServiceFailed and NoServer errors are not retried since they indicate permanent failures.
Optional Response
Use call_optional when the server may not be running:
match client.call_optional(request, Duration::from_millis(100))? {
Some(response) => println!("Got response: {}", response.sum),
None => println!("No server available"),
}
Async Client
For non-blocking calls, use AsyncServiceClient:
let mut client = AsyncServiceClient::<AddTwoInts>::new()?;
// Start the call (non-blocking)
let mut pending = client.call_async(
AddTwoIntsRequest { a: 3, b: 5 },
Duration::from_secs(1),
);
// Do other work...
// Check if response is ready (non-blocking)
match pending.check()? {
Some(response) => println!("Sum: {}", response.sum),
None => println!("Still waiting..."),
}
// Check if the call has timed out
if pending.is_expired() {
println!("Service call timed out");
}
// Or block until done
let response = pending.wait()?;
Client Configuration
// Custom poll interval for faster response detection
let mut client = ServiceClient::<AddTwoInts>::with_poll_interval(
Duration::from_micros(500), // Default: 1ms
)?;
Architecture
Services use two internal topics for bidirectional communication:
Client Server
| |
|-- {name}.request ----------> | ServiceRequest<Req>
| | handler processes request
|<---------- {name}.response -- | ServiceResponse<Res>
| |
- Requests include a monotonically-increasing
request_idfor correlation - Multiple clients can call the same server concurrently
- Each client filters responses by matching its
request_id - Communication uses shared memory (zero-copy IPC)
Error Handling
match client.call(request, Duration::from_secs(1)) {
Ok(response) => { /* success */ }
Err(ServiceError::Timeout) => { /* server didn't respond in time */ }
Err(ServiceError::ServiceFailed(msg)) => { /* handler returned Err */ }
Err(ServiceError::NoServer) => { /* no server found */ }
Err(ServiceError::Transport(msg)) => { /* IPC error */ }
}
| Error | Cause |
|---|---|
Timeout | Server didn't respond within the timeout duration |
ServiceFailed(msg) | Server handler returned Err(msg) |
NoServer | No service server is running |
Transport(msg) | IPC/shared memory communication failure |
Complete Example
A service that looks up robot joint limits by name:
use horus::prelude::*;
use std::collections::HashMap;
// Define the service
service! {
GetJointLimits {
request {
joint_name: String,
}
response {
min_position: f64,
max_position: f64,
max_velocity: f64,
max_effort: f64,
}
}
}
fn main() -> Result<()> {
// Joint limits database
let limits: HashMap<String, (f64, f64, f64, f64)> = HashMap::from([
("shoulder".into(), (-3.14, 3.14, 2.0, 100.0)),
("elbow".into(), (0.0, 2.61, 2.0, 80.0)),
("wrist".into(), (-1.57, 1.57, 3.0, 40.0)),
]);
// Start server
let _server = ServiceServerBuilder::<GetJointLimits>::new()
.on_request(move |req| {
match limits.get(&req.joint_name) {
Some(&(min, max, vel, effort)) => Ok(GetJointLimitsResponse {
min_position: min,
max_position: max,
max_velocity: vel,
max_effort: effort,
}),
None => Err(format!("Unknown joint: {}", req.joint_name)),
}
})
.build()?;
// Client usage
let mut client = ServiceClient::<GetJointLimits>::new()?;
let resp = client.call(
GetJointLimitsRequest { joint_name: "elbow".into() },
Duration::from_secs(1),
)?;
println!("Elbow limits: [{:.2}, {:.2}] rad", resp.min_position, resp.max_position);
Ok(())
}
CLI Commands
# List active services
horus service list
Next Steps
- Services API Reference — Full method tables for ServiceClient, AsyncServiceClient, ServiceServerBuilder
- Communication Overview — When to use topics vs services vs actions
- Actions — Long-running tasks with feedback
- Topic — Pub/sub messaging