Input & Audio Messages

Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.

from horus import JoystickInput, KeyboardInput, AudioFrame

JoystickInput

Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.

Fields

FieldTypeDescription
joystick_idintController identifier (for multi-controller setups)
event_typeintEvent type (0=button, 1=axis, 2=hat, 3=connection)
element_idintButton/axis/hat ID within the controller
element_namestrHuman-readable name (e.g., "A", "left_stick_x")
valuefloatAnalog value (-1.0 to 1.0 for sticks, 0.0/1.0 for buttons)
pressedboolButton state (True = pressed, False = released)
timestamp_nsintEvent timestamp in nanoseconds

Factory Methods

# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)

# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)

# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)

# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)

Event Type Queries — Dispatch Pattern

The standard way to handle mixed joystick events is to check the type and dispatch:

def handle_input(joy):
    if joy.is_button():
        if joy.pressed and joy.element_name == "A":
            trigger_action()
        elif joy.element_name == "B":
            cancel_action()

    elif joy.is_axis():
        if "stick_x" in joy.element_name:
            steer(joy.value)
        elif "trigger" in joy.element_name:
            throttle(joy.value)

    elif joy.is_hat():
        handle_dpad(joy.value)

    elif joy.is_connection_event():
        if joy.is_connected():
            print("Controller connected")
        else:
            print("Controller disconnected — engaging e-stop!")

.is_button() — Is This a Button Event?

if joy.is_button():
    print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")

.is_axis() — Is This an Axis Event?

if joy.is_axis():
    print(f"Axis {joy.element_name}: {joy.value:.2f}")

Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).

.is_hat() — Is This a Hat/D-pad Event?

if joy.is_hat():
    print(f"D-pad direction: {joy.value}")

.is_connection_event() — Is This a Hotplug Event?

if joy.is_connection_event():
    connected = joy.is_connected()

Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.

.is_connected() — Is the Controller Plugged In?

if joy.is_connection_event() and not joy.is_connected():
    estop_topic.send(EmergencyStop.engage("Controller disconnected"))

Example — Gamepad Teleoperation:

from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic

joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)

speed_scale = 1.0
linear = 0.0
angular = 0.0

def teleop(node):
    global speed_scale, linear, angular
    joy = joy_topic.recv(node)
    if joy is None:
        return

    if joy.is_axis():
        name = joy.element_name
        if "left_stick_y" in name:
            linear = -joy.value * speed_scale    # Forward/backward
        elif "left_stick_x" in name:
            angular = -joy.value * speed_scale   # Turn left/right
        elif "right_trigger" in name:
            speed_scale = 0.5 + joy.value * 1.5  # Speed boost

    elif joy.is_button() and joy.pressed:
        if joy.element_name == "A":
            estop_topic.send(EmergencyStop.engage("Operator e-stop"))
            linear = angular = 0.0

    elif joy.is_connection_event() and not joy.is_connected():
        estop_topic.send(EmergencyStop.engage("Controller disconnected"))
        linear = angular = 0.0
        return

    cmd_topic.send(CmdVel(linear=linear, angular=angular), node)

run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))

KeyboardInput

Keyboard events with modifier key detection.

Constructor

key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)

Fields

FieldTypeDescription
key_namestrKey label (e.g., "A", "space", "escape")
codeintPlatform-specific key code
pressedboolTrue = key down, False = key up
modifiersintModifier bit flags (Ctrl=1, Shift=2, Alt=4)

.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks

if key.is_ctrl() and key.key_name == "S":
    save_map()
elif key.is_ctrl() and key.key_name == "Q":
    shutdown()
elif key.key_name == "space" and key.pressed:
    toggle_estop()

These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.

.pressed — Key State

if key.pressed:
    print(f"Key down: {key.key_name}")
else:
    print(f"Key up: {key.key_name}")

Example — Keyboard Shortcuts:

from horus import KeyboardInput, Topic

key_topic = Topic(KeyboardInput)

def handle_keys(node):
    key = key_topic.recv(node)
    if key is None or not key.pressed:
        return
    if key.key_name == "space":
        toggle_pause()
    elif key.is_ctrl() and key.key_name == "Z":
        undo()
    elif key.key_name == "escape":
        emergency_stop()

AudioFrame

Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.

Fields

FieldTypeDescription
sample_rateintSample rate in Hz (e.g., 16000, 44100, 48000)
channelsintNumber of audio channels
sampleslist[float]Interleaved audio samples (-1.0 to 1.0)
timestamp_nsintCapture timestamp in nanoseconds

.mono(sample_rate, samples) — Single Channel

frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])

16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.

.stereo(sample_rate, samples) — Two Channels

# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)

.multi_channel(sample_rate, channels, samples) — Microphone Array

# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)

Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.

.duration_ms() — Audio Duration

print(f"Frame duration: {frame.duration_ms():.1f} ms")

Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.

.frame_count() — Number of Samples per Channel

print(f"Samples per channel: {frame.frame_count()}")

Example — Audio Recording Node:

from horus import Node, run, AudioFrame, Topic

audio_topic = Topic(AudioFrame)
recorded_samples = []

def record_audio(node):
    frame = audio_topic.recv(node)
    if frame is not None:
        recorded_samples.extend(frame.samples)
        if frame.duration_ms() > 0:
            total_seconds = len(recorded_samples) / frame.sample_rate
            if total_seconds > 10:
                print(f"Recorded {total_seconds:.1f}s of audio")

Design Decisions

Why factory methods (new_button(), new_axis()) instead of raw event types? Joystick events carry different data depending on type: buttons have a pressed/released boolean, axes have a floating-point value, hat switches have a direction value. The factories set the correct event type flag and validate the data for that type, preventing a button event from carrying an axis value or vice versa.

Why does JoystickInput include connection events? Controller disconnection during teleoperation is a critical safety event. If the gamepad disconnects mid-mission, the robot receives no more velocity commands and coasts at its last speed. By including connection events in the same message stream, the handler can immediately detect disconnection and trigger an e-stop without needing a separate watchdog.

Why AudioFrame factory methods for channel layout? Mono, stereo, and multi-channel audio have different interleaving patterns. Mono is just a flat array. Stereo is interleaved L/R. Multi-channel is interleaved across N channels. The factories set the channel count and validate that the sample array length is a multiple of the channel count, catching a common class of audio processing bugs.

Why KeyboardInput uses modifier bit flags instead of separate booleans? Modifier keys can be combined (Ctrl+Shift+A). Bit flags allow efficient combination checking (is_ctrl() and is_shift()) without wasting space. The convenience methods (is_ctrl(), is_shift(), is_alt()) hide the bit manipulation, so you never need to work with raw flags.


See Also