Input & Audio Messages

Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.

# simplified
from horus import JoystickInput, KeyboardInput, AudioFrame

JoystickInput

Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.

Fields

FieldTypeDescription
joystick_idintController identifier (for multi-controller setups)
event_typeintEvent type (0=button, 1=axis, 2=hat, 3=connection)
element_idintButton/axis/hat ID within the controller
element_namestrHuman-readable name (e.g., "A", "left_stick_x")
valuefloatAnalog value (-1.0 to 1.0 for sticks, 0.0/1.0 for buttons)
pressedboolButton state (True = pressed, False = released)
timestamp_nsintEvent timestamp in nanoseconds

Factory Methods

# simplified
# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)

# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)

# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)

# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)

Event Type Queries — Dispatch Pattern

The standard way to handle mixed joystick events is to check the type and dispatch:

# simplified
def handle_input(joy):
    if joy.is_button():
        if joy.pressed and joy.element_name == "A":
            trigger_action()
        elif joy.element_name == "B":
            cancel_action()

    elif joy.is_axis():
        if "stick_x" in joy.element_name:
            steer(joy.value)
        elif "trigger" in joy.element_name:
            throttle(joy.value)

    elif joy.is_hat():
        handle_dpad(joy.value)

    elif joy.is_connection_event():
        if joy.is_connected():
            print("Controller connected")
        else:
            print("Controller disconnected — engaging e-stop!")

.is_button() — Is This a Button Event?

# simplified
if joy.is_button():
    print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")

.is_axis() — Is This an Axis Event?

# simplified
if joy.is_axis():
    print(f"Axis {joy.element_name}: {joy.value:.2f}")

Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).

.is_hat() — Is This a Hat/D-pad Event?

# simplified
if joy.is_hat():
    print(f"D-pad direction: {joy.value}")

.is_connection_event() — Is This a Hotplug Event?

# simplified
if joy.is_connection_event():
    connected = joy.is_connected()

Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.

.is_connected() — Is the Controller Plugged In?

# simplified
if joy.is_connection_event() and not joy.is_connected():
    estop_topic.send(EmergencyStop.engage("Controller disconnected"))

Example — Gamepad Teleoperation:

# simplified
from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic

joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)

speed_scale = 1.0
linear = 0.0
angular = 0.0

def teleop(node):
    global speed_scale, linear, angular
    joy = joy_topic.recv(node)
    if joy is None:
        return

    if joy.is_axis():
        name = joy.element_name
        if "left_stick_y" in name:
            linear = -joy.value * speed_scale    # Forward/backward
        elif "left_stick_x" in name:
            angular = -joy.value * speed_scale   # Turn left/right
        elif "right_trigger" in name:
            speed_scale = 0.5 + joy.value * 1.5  # Speed boost

    elif joy.is_button() and joy.pressed:
        if joy.element_name == "A":
            estop_topic.send(EmergencyStop.engage("Operator e-stop"))
            linear = angular = 0.0

    elif joy.is_connection_event() and not joy.is_connected():
        estop_topic.send(EmergencyStop.engage("Controller disconnected"))
        linear = angular = 0.0
        return

    cmd_topic.send(CmdVel(linear=linear, angular=angular), node)

run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))

KeyboardInput

Keyboard events with modifier key detection.

Constructor

# simplified
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)

Fields

FieldTypeDescription
key_namestrKey label (e.g., "A", "space", "escape")
codeintPlatform-specific key code
pressedboolTrue = key down, False = key up
modifiersintModifier bit flags (Ctrl=1, Shift=2, Alt=4)

.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks

# simplified
if key.is_ctrl() and key.key_name == "S":
    save_map()
elif key.is_ctrl() and key.key_name == "Q":
    shutdown()
elif key.key_name == "space" and key.pressed:
    toggle_estop()

These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.

.pressed — Key State

# simplified
if key.pressed:
    print(f"Key down: {key.key_name}")
else:
    print(f"Key up: {key.key_name}")

Example — Keyboard Shortcuts:

# simplified
from horus import KeyboardInput, Topic

key_topic = Topic(KeyboardInput)

def handle_keys(node):
    key = key_topic.recv(node)
    if key is None or not key.pressed:
        return
    if key.key_name == "space":
        toggle_pause()
    elif key.is_ctrl() and key.key_name == "Z":
        undo()
    elif key.key_name == "escape":
        emergency_stop()

AudioFrame

Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.

Fields

FieldTypeDescription
sample_rateintSample rate in Hz (e.g., 16000, 44100, 48000)
channelsintNumber of audio channels
sampleslist[float]Interleaved audio samples (-1.0 to 1.0)
timestamp_nsintCapture timestamp in nanoseconds

.mono(sample_rate, samples) — Single Channel

# simplified
frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])

16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.

.stereo(sample_rate, samples) — Two Channels

# simplified
# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)

.multi_channel(sample_rate, channels, samples) — Microphone Array

# simplified
# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)

Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.

.duration_ms() — Audio Duration

# simplified
print(f"Frame duration: {frame.duration_ms():.1f} ms")

Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.

.frame_count() — Number of Samples per Channel

# simplified
print(f"Samples per channel: {frame.frame_count()}")

Example — Audio Recording Node:

# simplified
from horus import Node, run, AudioFrame, Topic

audio_topic = Topic(AudioFrame)
recorded_samples = []

def record_audio(node):
    frame = audio_topic.recv(node)
    if frame is not None:
        recorded_samples.extend(frame.samples)
        if frame.duration_ms() > 0:
            total_seconds = len(recorded_samples) / frame.sample_rate
            if total_seconds > 10:
                print(f"Recorded {total_seconds:.1f}s of audio")

Design Decisions

Why factory methods (new_button(), new_axis()) instead of raw event types? Joystick events carry different data depending on type: buttons have a pressed/released boolean, axes have a floating-point value, hat switches have a direction value. The factories set the correct event type flag and validate the data for that type, preventing a button event from carrying an axis value or vice versa.

Why does JoystickInput include connection events? Controller disconnection during teleoperation is a critical safety event. If the gamepad disconnects mid-mission, the robot receives no more velocity commands and coasts at its last speed. By including connection events in the same message stream, the handler can immediately detect disconnection and trigger an e-stop without needing a separate watchdog.

Why AudioFrame factory methods for channel layout? Mono, stereo, and multi-channel audio have different interleaving patterns. Mono is just a flat array. Stereo is interleaved L/R. Multi-channel is interleaved across N channels. The factories set the channel count and validate that the sample array length is a multiple of the channel count, catching a common class of audio processing bugs.

Why KeyboardInput uses modifier bit flags instead of separate booleans? Modifier keys can be combined (Ctrl+Shift+A). Bit flags allow efficient combination checking (is_ctrl() and is_shift()) without wasting space. The convenience methods (is_ctrl(), is_shift(), is_alt()) hide the bit manipulation, so you never need to work with raw flags.


See Also