Services API

HORUS services provide synchronous request/response communication between nodes. Define a service with the service! macro, run a server with ServiceServerBuilder, and call it with ServiceClient or AsyncServiceClient.

Defining a Service

Use the service! macro to define request and response types:

use horus::prelude::*;

service! {
    /// Add two integers together.
    AddTwoInts {
        request {
            a: i64,
            b: i64,
        }
        response {
            sum: i64,
        }
    }
}

This generates three types:

  • AddTwoIntsRequest — request struct with pub a: i64, pub b: i64
  • AddTwoIntsResponse — response struct with pub sum: i64
  • AddTwoInts — zero-sized marker implementing the Service trait

The service name is auto-derived as snake_case from the CamelCase name (add_two_ints).

service! {
    /// Look up a robot's current pose by name.
    GetRobotPose {
        request {
            robot_name: String,
        }
        response {
            x: f64,
            y: f64,
            theta: f64,
            timestamp_ns: u64,
        }
    }
}

Service Trait

All services implement the Service trait:

MethodReturnsDescription
name()&'static strService name (used as topic prefix)
request_topic()StringRequest channel name ("{name}.request")
response_topic()StringResponse channel name ("{name}.response")
request_type_name()&'static strHuman-readable request type name
response_type_name()&'static strHuman-readable response type name

ServiceClient (Blocking)

Synchronous client that blocks until a response arrives or the timeout elapses.

Constructor

MethodReturnsDescription
ServiceClient::<S>::new()HorusResult<Self>Create a client with default 1ms poll interval
ServiceClient::<S>::with_poll_interval(interval)HorusResult<Self>Create a client with custom poll interval

Calling

MethodReturnsDescription
call(request, timeout)ServiceResult<S::Response>Block until response or timeout
call_resilient(request, timeout)ServiceResult<S::Response>Auto-retry on transient errors (3 retries, 10ms backoff, 2x multiplier)
call_resilient_with(request, timeout, config)ServiceResult<S::Response>Auto-retry with custom RetryConfig
call_optional(request, timeout)ServiceResult<Option<S::Response>>Returns Ok(None) on timeout instead of Err

Only transient errors (Timeout, Transport) are retried. Permanent errors (ServiceFailed, NoServer) propagate immediately.

Example

use horus::prelude::*;

service! {
    AddTwoInts {
        request { a: i64, b: i64 }
        response { sum: i64 }
    }
}

// Create client
let mut client = ServiceClient::<AddTwoInts>::new()?;

// Blocking call with 1-second timeout
let response = client.call(
    AddTwoIntsRequest { a: 3, b: 5 },
    1_u64.secs(),
)?;
println!("Sum: {}", response.sum); // 8

// Resilient call — retries on transient failures
let response = client.call_resilient(
    AddTwoIntsRequest { a: 10, b: 20 },
    2_u64.secs(),
)?;

// Optional call — Ok(None) on timeout
match client.call_optional(AddTwoIntsRequest { a: 1, b: 2 }, 100_u64.ms())? {
    Some(res) => println!("Got: {}", res.sum),
    None => println!("Server not responding"),
}

AsyncServiceClient (Non-Blocking)

Non-blocking client that returns a PendingServiceCall handle. Check the handle each tick without blocking the scheduler.

Constructor

MethodReturnsDescription
AsyncServiceClient::<S>::new()HorusResult<Self>Create with default 1ms poll interval
AsyncServiceClient::<S>::with_poll_interval(interval)HorusResult<Self>Create with custom poll interval

Calling

MethodReturnsDescription
call_async(request, timeout)PendingServiceCall<S::Response>Send request, return pending handle immediately

PendingServiceCall

MethodReturnsDescription
check()ServiceResult<Option<Res>>Non-blocking check: Ok(Some(res)) if ready, Ok(None) if waiting, Err on timeout/failure
wait()ServiceResult<Res>Block until response arrives or timeout
is_expired()boolWhether the deadline has passed

Example

use horus::prelude::*;

service! {
    GetRobotPose {
        request { robot_name: String }
        response { x: f64, y: f64, theta: f64, timestamp_ns: u64 }
    }
}

struct PlannerNode {
    client: AsyncServiceClient<GetRobotPose>,
    pending: Option<PendingServiceCall<GetRobotPoseResponse>>,
}

impl Node for PlannerNode {
    fn name(&self) -> &str { "Planner" }

    fn tick(&mut self) {
        // Send request if none pending
        if self.pending.is_none() {
            self.pending = Some(self.client.call_async(
                GetRobotPoseRequest { robot_name: "arm_0".into() },
                500_u64.ms(),
            ));
        }

        // Check for response (non-blocking)
        if let Some(ref mut call) = self.pending {
            match call.check() {
                Ok(Some(pose)) => {
                    hlog!(info, "Robot at ({:.2}, {:.2})", pose.x, pose.y);
                    self.pending = None;
                }
                Ok(None) => {} // Still waiting
                Err(e) => {
                    hlog!(warn, "Service call failed: {}", e);
                    self.pending = None;
                }
            }
        }
    }
}

ServiceServerBuilder

Fluent builder for creating a service server.

MethodReturnsDescription
ServiceServerBuilder::<S>::new()SelfCreate a new builder
on_request(handler)SelfRegister the request handler (Fn(Req) -> Result<Res, String>)
poll_interval(interval)SelfOverride poll interval (default: 5ms)
build()HorusResult<ServiceServer<S>>Build and start the server (spawns background thread)

The handler receives the request payload and returns either a response (Ok) or an error message (Err).

ServiceServer

MethodReturnsDescription
stop()()Stop the server (also happens automatically on drop)

The server runs in a background thread. Dropping the ServiceServer handle shuts it down.

Example

use horus::prelude::*;

service! {
    AddTwoInts {
        request { a: i64, b: i64 }
        response { sum: i64 }
    }
}

// Build and start server
let server = ServiceServerBuilder::<AddTwoInts>::new()
    .on_request(|req| {
        Ok(AddTwoIntsResponse { sum: req.a + req.b })
    })
    .poll_interval(1_u64.ms())
    .build()?;

// Server runs in background thread until `server` is dropped
// ... do other work ...

// Explicit stop (optional — happens on drop)
server.stop();

ServiceRequest / ServiceResponse

Wrapper types that flow over the wire:

ServiceRequest<Req>

FieldTypeDescription
request_idu64Unique correlation ID (auto-assigned by client)
payloadReqThe actual request data

ServiceResponse<Res>

FieldTypeDescription
request_idu64Echoes the request's correlation ID
okbooltrue if handled successfully
payloadOption<Res>Response data (Some when ok == true)
errorOption<String>Error message (Some when ok == false)
MethodReturnsDescription
ServiceResponse::success(request_id, payload)SelfCreate a successful response
ServiceResponse::failure(request_id, error)SelfCreate an error response

ServiceError

VariantDescriptionTransient?
TimeoutCall timed out waiting for responseYes
ServiceFailed(String)Server returned an errorNo
NoServerNo server registered for this serviceNo
Transport(String)Topic I/O errorYes
MethodReturnsDescription
is_transient()boolWhether a retry may succeed (Timeout and Transport are transient)

ServiceInfo

Metadata returned by horus service list:

FieldTypeDescription
nameStringService name
request_typeStringRust type name of request
response_typeStringRust type name of response
serversusizeActive server count (typically 0 or 1)
clientsusizeKnown client count

Complete Example

use horus::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// Define a key-value store service
service! {
    /// Get or set values in a shared store.
    KeyValueStore {
        request {
            key: String,
            value: Option<String>,  // None = get, Some = set
        }
        response {
            value: Option<String>,
            found: bool,
        }
    }
}

fn main() -> HorusResult<()> {
    let store = Arc::new(Mutex::new(HashMap::<String, String>::new()));
    let store_clone = store.clone();

    // Start server
    let _server = ServiceServerBuilder::<KeyValueStore>::new()
        .on_request(move |req| {
            let mut map = store_clone.lock().unwrap();
            match req.value {
                Some(val) => {
                    map.insert(req.key, val.clone());
                    Ok(KeyValueStoreResponse { value: Some(val), found: true })
                }
                None => {
                    let val = map.get(&req.key).cloned();
                    let found = val.is_some();
                    Ok(KeyValueStoreResponse { value: val, found })
                }
            }
        })
        .build()?;

    // Client: set a value
    let mut client = ServiceClient::<KeyValueStore>::new()?;
    client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: Some("arm_01".into()) },
        1_u64.secs(),
    )?;

    // Client: get it back
    let res = client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: None },
        1_u64.secs(),
    )?;
    assert_eq!(res.value, Some("arm_01".into()));
    assert!(res.found);

    Ok(())
}

See Also