Services

Beta: The Services API is functional in Rust but still maturing. Python bindings are not yet available. The API may change in future releases.

Services provide synchronous request/response communication between nodes. A client sends a request and blocks (or polls) until the server responds.

Use services when:

  • You need a response before continuing (parameter queries, sensor calibration)
  • The operation completes quickly (within milliseconds)
  • You need guaranteed delivery with error reporting

Use topics instead for continuous data streams. Use actions instead for long-running tasks with feedback.

Defining a Service

Use the service! macro to define Request and Response types:

use horus::prelude::*;

service! {
    AddTwoInts {
        request {
            a: i64,
            b: i64,
        }
        response {
            sum: i64,
        }
    }
}

This generates:

  • AddTwoIntsRequest struct with fields a and b
  • AddTwoIntsResponse struct with field sum
  • AddTwoInts marker type implementing the Service trait

More complex example:

service! {
    GetMapRegion {
        request {
            x_min: f64,
            y_min: f64,
            x_max: f64,
            y_max: f64,
            resolution: f64,
        }
        response {
            width: u32,
            height: u32,
            data: Vec<u8>,
            timestamp: u64,
        }
    }
}

Service Server

The server listens for requests and returns responses. It runs in a background thread.

let server = ServiceServerBuilder::<AddTwoInts>::new()
    .on_request(|req| {
        Ok(AddTwoIntsResponse { sum: req.a + req.b })
    })
    .build()?;

// Server runs in a background thread — it's active until dropped
// Add it to your scheduler or keep it alive in main()

Returning Errors

Return Err(String) to signal failure to the client:

let server = ServiceServerBuilder::<GetMapRegion>::new()
    .on_request(|req| {
        if req.resolution <= 0.0 {
            return Err("Resolution must be positive".into());
        }
        if req.x_max <= req.x_min || req.y_max <= req.y_min {
            return Err("Invalid region bounds".into());
        }

        let map = generate_map(req.x_min, req.y_min, req.x_max, req.y_max, req.resolution);
        Ok(GetMapRegionResponse {
            width: map.width,
            height: map.height,
            data: map.data,
            timestamp: horus::timestamp_now(),
        })
    })
    .build()?;

Server Configuration

let server = ServiceServerBuilder::<AddTwoInts>::new()
    .on_request(|req| Ok(AddTwoIntsResponse { sum: req.a + req.b }))
    .poll_interval(Duration::from_millis(1))  // How often to check for requests (default: 5ms)
    .build()?;

Stopping a Server

The server stops when dropped, or explicitly:

server.stop();

Service Client

Blocking Client

The simplest way to call a service:

let mut client = ServiceClient::<AddTwoInts>::new()?;

let response = client.call(
    AddTwoIntsRequest { a: 3, b: 5 },
    Duration::from_secs(1),  // Timeout
)?;

println!("3 + 5 = {}", response.sum);

Resilient Calls (Auto-Retry)

Use call_resilient for production code that needs automatic retries on transient failures:

// Auto-retry with default settings (3 retries, exponential backoff from 10ms)
let response = client.call_resilient(
    AddTwoIntsRequest { a: 3, b: 5 },
    Duration::from_secs(5),
)?;

// Custom retry configuration
use horus::prelude::RetryConfig;

let response = client.call_resilient_with(
    AddTwoIntsRequest { a: 3, b: 5 },
    Duration::from_secs(5),
    RetryConfig::new(5, Duration::from_millis(20)),  // 5 retries, 20ms initial backoff
)?;

call_resilient retries on Timeout and Transport errors. ServiceFailed and NoServer errors are not retried since they indicate permanent failures.

Optional Response

Use call_optional when the server may not be running:

match client.call_optional(request, Duration::from_millis(100))? {
    Some(response) => println!("Got response: {}", response.sum),
    None => println!("No server available"),
}

Async Client

For non-blocking calls, use AsyncServiceClient:

let mut client = AsyncServiceClient::<AddTwoInts>::new()?;

// Start the call (non-blocking)
let mut pending = client.call_async(
    AddTwoIntsRequest { a: 3, b: 5 },
    Duration::from_secs(1),
);

// Do other work...

// Check if response is ready (non-blocking)
match pending.check()? {
    Some(response) => println!("Sum: {}", response.sum),
    None => println!("Still waiting..."),
}

// Check if the call has timed out
if pending.is_expired() {
    println!("Service call timed out");
}

// Or block until done
let response = pending.wait()?;

Client Configuration

// Custom poll interval for faster response detection
let mut client = ServiceClient::<AddTwoInts>::with_poll_interval(
    Duration::from_micros(500),  // Default: 1ms
)?;

Architecture

Services use two internal topics for bidirectional communication:

Client                          Server
  |                                |
  |-- {name}.request ---------->   |  ServiceRequest<Req>
  |                                |  handler processes request
  |<---------- {name}.response --  |  ServiceResponse<Res>
  |                                |
  • Requests include a monotonically-increasing request_id for correlation
  • Multiple clients can call the same server concurrently
  • Each client filters responses by matching its request_id
  • Communication uses shared memory (zero-copy IPC)

Error Handling

match client.call(request, Duration::from_secs(1)) {
    Ok(response) => { /* success */ }
    Err(ServiceError::Timeout) => { /* server didn't respond in time */ }
    Err(ServiceError::ServiceFailed(msg)) => { /* handler returned Err */ }
    Err(ServiceError::NoServer) => { /* no server found */ }
    Err(ServiceError::Transport(msg)) => { /* IPC error */ }
}
ErrorCause
TimeoutServer didn't respond within the timeout duration
ServiceFailed(msg)Server handler returned Err(msg)
NoServerNo service server is running
Transport(msg)IPC/shared memory communication failure

Complete Example

A service that looks up robot joint limits by name:

use horus::prelude::*;
use std::collections::HashMap;

// Define the service
service! {
    GetJointLimits {
        request {
            joint_name: String,
        }
        response {
            min_position: f64,
            max_position: f64,
            max_velocity: f64,
            max_effort: f64,
        }
    }
}

fn main() -> Result<()> {
    // Joint limits database
    let limits: HashMap<String, (f64, f64, f64, f64)> = HashMap::from([
        ("shoulder".into(), (-3.14, 3.14, 2.0, 100.0)),
        ("elbow".into(), (0.0, 2.61, 2.0, 80.0)),
        ("wrist".into(), (-1.57, 1.57, 3.0, 40.0)),
    ]);

    // Start server
    let _server = ServiceServerBuilder::<GetJointLimits>::new()
        .on_request(move |req| {
            match limits.get(&req.joint_name) {
                Some(&(min, max, vel, effort)) => Ok(GetJointLimitsResponse {
                    min_position: min,
                    max_position: max,
                    max_velocity: vel,
                    max_effort: effort,
                }),
                None => Err(format!("Unknown joint: {}", req.joint_name)),
            }
        })
        .build()?;

    // Client usage
    let mut client = ServiceClient::<GetJointLimits>::new()?;

    let resp = client.call(
        GetJointLimitsRequest { joint_name: "elbow".into() },
        Duration::from_secs(1),
    )?;

    println!("Elbow limits: [{:.2}, {:.2}] rad", resp.min_position, resp.max_position);

    Ok(())
}

CLI Commands

# List active services
horus service list

Next Steps