Services API

HORUS services provide synchronous request/response communication between nodes. Define a service with the service! macro, run a server with ServiceServerBuilder, and call it with ServiceClient or AsyncServiceClient.

Python: Services are Rust-only. Python bindings are not yet available.

Defining a Service

Use the service! macro to define request and response types:

// simplified
use horus::prelude::*;

service! {
    /// Look up a robot's current pose by name.
    GetRobotPose {
        request {
            robot_name: String,
        }
        response {
            x: f64,
            y: f64,
            theta: f64,
            timestamp_ns: u64,
        }
    }
}

Service Trait

All services implement the Service trait:

MethodReturnsDescription
name()&'static strService name (used as topic prefix)
request_topic()StringRequest channel name ("{name}.request")
response_topic()StringResponse channel name ("{name}.response")
request_type_name()&'static strHuman-readable request type name
response_type_name()&'static strHuman-readable response type name

ServiceClient (Blocking)

Synchronous client that blocks until a response arrives or the timeout elapses.

Constructor

MethodReturnsDescription
ServiceClient::<S>::new()Result<Self>Create a client with default 1ms poll interval
ServiceClient::<S>::with_poll_interval(interval)Result<Self>Create a client with custom poll interval

Calling

MethodReturnsDescription
call(request, timeout)ServiceResult<S::Response>Block until response or timeout
call_resilient(request, timeout)ServiceResult<S::Response>Auto-retry on transient errors (3 retries, 10ms backoff, 2x multiplier)
call_resilient_with(request, timeout, config)ServiceResult<S::Response>Auto-retry with custom RetryConfig
call_optional(request, timeout)ServiceResult<Option<S::Response>>Returns Ok(None) on timeout instead of Err

Only transient errors (Timeout, Transport) are retried. Permanent errors (ServiceFailed, NoServer) propagate immediately.

Detailed Method Reference

call

// simplified
pub fn call(&self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>

Send a request and block until a response arrives or the timeout elapses.

Parameters

NameTypeRequiredDescription
requestS::RequestyesThe typed request message. S is the service type from service! macro.
timeoutDurationyesMaximum time to wait for a response. Create with .ms() or .secs(): 100_u64.ms().

Returns: ServiceResult<S::Response>Ok(response) or Err(ServiceError).

Errors

ErrorCondition
ServiceError::TimeoutNo response within timeout
ServiceError::NoServerNo server is registered for this service
ServiceError::ServiceFailed(msg)Server handler returned Err(msg)
ServiceError::Transport(msg)IPC communication failure

When to use

  • One-shot queries: "what is the robot's current pose?"
  • Parameter lookups from a configuration server
  • Any request that should complete in milliseconds

When NOT to use

  • Long-running tasks — use Actions instead
  • Continuous data streams — use Topics instead
  • Non-critical lookups where timeout is acceptable — use call_optional() instead

Example:

// simplified
let response = client.call(GetPose { frame: "base" }, 100.ms())?;
println!("x={}, y={}", response.x, response.y);

call_resilient

Sends a request with automatic retry on transient errors.

Signature

// simplified
pub fn call_resilient(&mut self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>

Parameters

NameTypeRequiredDescription
requestS::RequestyesThe typed request message.
timeoutDurationyesTimeout per attempt. Total time may exceed this due to retries.

Returns

ServiceResult<S::Response>Ok(response) after successful attempt, Err if all retries exhausted or permanent error.

Behavior

  • Default retry config: 3 retries, 10ms initial backoff, 2x multiplier
  • Only transient errors (Timeout, Transport) trigger retries
  • Permanent errors (NoServer, ServiceFailed) propagate immediately — no retry
  • Total wall time can exceed timeout because each retry gets a fresh timeout

When to use: Network-adjacent calls where transient failures are expected (sensor servers, remote nodes).

When NOT to use: Latency-critical paths where retry delay is unacceptable — use call() with your own retry logic.

call_resilient_with

Sends a request with automatic retry using a custom retry configuration.

Signature

// simplified
pub fn call_resilient_with(&mut self, request: S::Request, timeout: Duration, config: RetryConfig) -> ServiceResult<S::Response>

Parameters

NameTypeRequiredDescription
requestS::RequestyesThe typed request message.
timeoutDurationyesTimeout per attempt.
configRetryConfigyesCustom retry settings: RetryConfig::new(max_retries, initial_backoff) with .multiplier(), .max_backoff().

Returns

ServiceResult<S::Response> — same as call_resilient().

Example

// simplified
use horus::prelude::*;

let config = RetryConfig::new(5, 50_u64.ms())
    .multiplier(3.0)
    .max_backoff(1_u64.secs());

let response = client.call_resilient_with(request, 2_u64.secs(), config)?;

call_optional

Sends a request, returning None on timeout instead of an error.

Signature

// simplified
pub fn call_optional(&mut self, request: S::Request, timeout: Duration) -> ServiceResult<Option<S::Response>>

Parameters

NameTypeRequiredDescription
requestS::RequestyesThe typed request message.
timeoutDurationyesMaximum time to wait.

Returns

  • Ok(Some(response)) — server responded successfully
  • Ok(None) — timeout elapsed, no response (not an error)
  • Err(ServiceError) — non-timeout error (NoServer, ServiceFailed, Transport)

When to use: Non-critical lookups where missing data is acceptable — e.g., checking if a sensor is online before planning.

Example

// simplified
use horus::prelude::*;

// Create client for the GetRobotPose service
let mut client = ServiceClient::<GetRobotPose>::new()?;

// Blocking call with 1-second timeout
let response = client.call(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    1_u64.secs(),
)?;
println!("Robot at ({:.2}, {:.2})", response.x, response.y);

// Resilient call — retries on transient failures
let response = client.call_resilient(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    2_u64.secs(),
)?;

// Optional call — Ok(None) on timeout
match client.call_optional(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    100_u64.ms(),
)? {
    Some(res) => println!("Pose: ({:.2}, {:.2})", res.x, res.y),
    None => println!("Pose server not responding"),
}

AsyncServiceClient (Non-Blocking)

Non-blocking client that returns a PendingServiceCall handle. Check the handle each tick without blocking the scheduler.

Constructor

MethodReturnsDescription
AsyncServiceClient::<S>::new()Result<Self>Create with default 1ms poll interval
AsyncServiceClient::<S>::with_poll_interval(interval)Result<Self>Create with custom poll interval

Calling

MethodReturnsDescription
call_async(request, timeout)PendingServiceCall<S::Response>Send request, return pending handle immediately

PendingServiceCall

MethodReturnsDescription
check()ServiceResult<Option<Res>>Non-blocking check: Ok(Some(res)) if ready, Ok(None) if waiting, Err on timeout/failure
wait()ServiceResult<Res>Block until response arrives or timeout
is_expired()boolWhether the deadline has passed

check

Polls for a response without blocking.

Signature

// simplified
pub fn check(&mut self) -> ServiceResult<Option<Res>>

Returns

  • Ok(Some(response)) — response arrived
  • Ok(None) — still waiting, call again next tick
  • Err(ServiceError::Timeout) — deadline passed
  • Err(other) — transport or server error

When to use: In tick() — check every cycle without blocking the scheduler.

wait

Blocks until the response arrives or the deadline passes.

Signature

// simplified
pub fn wait(self) -> ServiceResult<Res>

Returns

ServiceResult<Res> — consumes the handle. Use check() for non-blocking.

When NOT to use: Inside tick() of RT nodes — blocking causes deadline misses.

is_expired

Checks whether the deadline has passed.

Signature

// simplified
pub fn is_expired(&self) -> bool

Returns

true if the timeout has elapsed. After this, check() will return Err(Timeout).

Example

// simplified
use horus::prelude::*;

service! {
    GetRobotPose {
        request { robot_name: String }
        response { x: f64, y: f64, theta: f64, timestamp_ns: u64 }
    }
}

struct PlannerNode {
    client: AsyncServiceClient<GetRobotPose>,
    pending: Option<PendingServiceCall<GetRobotPoseResponse>>,
}

impl Node for PlannerNode {
    fn name(&self) -> &str { "Planner" }

    fn tick(&mut self) {
        // Send request if none pending
        if self.pending.is_none() {
            self.pending = Some(self.client.call_async(
                GetRobotPoseRequest { robot_name: "arm_0".into() },
                500_u64.ms(),
            ));
        }

        // Check for response (non-blocking)
        if let Some(ref mut call) = self.pending {
            match call.check() {
                Ok(Some(pose)) => {
                    hlog!(info, "Robot at ({:.2}, {:.2})", pose.x, pose.y);
                    self.pending = None;
                }
                Ok(None) => {} // Still waiting
                Err(e) => {
                    hlog!(warn, "Service call failed: {}", e);
                    self.pending = None;
                }
            }
        }
    }
}

ServiceServerBuilder

Fluent builder for creating a service server.

MethodReturnsDescription
ServiceServerBuilder::<S>::new()SelfCreate a new builder
on_request(handler)SelfRegister the request handler (Fn(Req) -> Result<Res, String>)
poll_interval(interval)SelfOverride poll interval (default: 5ms)
build()Result<ServiceServer<S>>Build and start the server (spawns background thread)

The handler receives the request payload and returns either a response (Ok) or an error message (Err).

The handler's type is:

// simplified
type RequestHandler<Req, Res> = Box<dyn Fn(Req) -> Result<Res, String> + Send + Sync + 'static>;

ServiceServer

MethodReturnsDescription
stop()()Stop the server (also happens automatically on drop)

The server runs in a background thread. Dropping the ServiceServer handle shuts it down.

Example

// simplified
use horus::prelude::*;
use std::collections::HashMap;

// Build and start server
let poses: HashMap<String, (f64, f64, f64)> = HashMap::from([
    ("arm_1".into(), (1.5, 2.0, 0.0)),
    ("arm_2".into(), (3.0, 1.0, 1.57)),
]);

let server = ServiceServerBuilder::<GetRobotPose>::new()
    .on_request(move |req| {
        match poses.get(&req.robot_name) {
            Some(&(x, y, theta)) => Ok(GetRobotPoseResponse {
                x, y, theta, timestamp_ns: horus::timestamp_now(),
            }),
            None => Err(format!("Unknown robot: {}", req.robot_name)),
        }
    })
    .poll_interval(1_u64.ms())
    .build()?;

// Server runs in background thread until dropped

ServiceRequest / ServiceResponse

Wrapper types that flow over the wire:

ServiceRequest<Req>

FieldTypeDescription
request_idu64Unique correlation ID (auto-assigned by client)
payloadReqThe actual request data

ServiceResponse<Res>

FieldTypeDescription
request_idu64Echoes the request's correlation ID
okbooltrue if handled successfully
payloadOption<Res>Response data (Some when ok == true)
errorOption<String>Error message (Some when ok == false)
MethodReturnsDescription
ServiceResponse::success(request_id, payload)SelfCreate a successful response
ServiceResponse::failure(request_id, error)SelfCreate an error response

ServiceError

VariantDescriptionTransient?
TimeoutCall timed out waiting for responseYes
ServiceFailed(String)Server returned an errorNo
NoServerNo server registered for this serviceNo
Transport(String)Topic I/O errorYes
MethodReturnsDescription
is_transient()boolWhether a retry may succeed (Timeout and Transport are transient)

ServiceInfo

Metadata returned by horus service list:

FieldTypeDescription
nameStringService name
request_typeStringRust type name of request
response_typeStringRust type name of response
serversusizeActive server count (typically 0 or 1)
clientsusizeKnown client count

Complete Example

// simplified
use horus::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// Define a key-value store service
service! {
    /// Get or set values in a shared store.
    KeyValueStore {
        request {
            key: String,
            value: Option<String>,  // None = get, Some = set
        }
        response {
            value: Option<String>,
            found: bool,
        }
    }
}

fn main() -> Result<()> {
    let store = Arc::new(Mutex::new(HashMap::<String, String>::new()));
    let store_clone = store.clone();

    // Start server
    let _server = ServiceServerBuilder::<KeyValueStore>::new()
        .on_request(move |req| {
            let mut map = store_clone.lock().unwrap();
            match req.value {
                Some(val) => {
                    map.insert(req.key, val.clone());
                    Ok(KeyValueStoreResponse { value: Some(val), found: true })
                }
                None => {
                    let val = map.get(&req.key).cloned();
                    let found = val.is_some();
                    Ok(KeyValueStoreResponse { value: val, found })
                }
            }
        })
        .build()?;

    // Client: set a value
    let mut client = ServiceClient::<KeyValueStore>::new()?;
    client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: Some("arm_01".into()) },
        1_u64.secs(),
    )?;

    // Client: get it back
    let res = client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: None },
        1_u64.secs(),
    )?;
    assert_eq!(res.value, Some("arm_01".into()));
    assert!(res.found);

    Ok(())
}

See Also