Input & Audio Messages
Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.
# simplified
from horus import JoystickInput, KeyboardInput, AudioFrame
JoystickInput
Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.
Fields
| Field | Type | Description |
|---|---|---|
joystick_id | int | Controller identifier (for multi-controller setups) |
event_type | int | Event type (0=button, 1=axis, 2=hat, 3=connection) |
element_id | int | Button/axis/hat ID within the controller |
element_name | str | Human-readable name (e.g., "A", "left_stick_x") |
value | float | Analog value (-1.0 to 1.0 for sticks, 0.0/1.0 for buttons) |
pressed | bool | Button state (True = pressed, False = released) |
timestamp_ns | int | Event timestamp in nanoseconds |
Factory Methods
# simplified
# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)
# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)
# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)
# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)
Event Type Queries — Dispatch Pattern
The standard way to handle mixed joystick events is to check the type and dispatch:
# simplified
def handle_input(joy):
if joy.is_button():
if joy.pressed and joy.element_name == "A":
trigger_action()
elif joy.element_name == "B":
cancel_action()
elif joy.is_axis():
if "stick_x" in joy.element_name:
steer(joy.value)
elif "trigger" in joy.element_name:
throttle(joy.value)
elif joy.is_hat():
handle_dpad(joy.value)
elif joy.is_connection_event():
if joy.is_connected():
print("Controller connected")
else:
print("Controller disconnected — engaging e-stop!")
.is_button() — Is This a Button Event?
# simplified
if joy.is_button():
print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")
.is_axis() — Is This an Axis Event?
# simplified
if joy.is_axis():
print(f"Axis {joy.element_name}: {joy.value:.2f}")
Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).
.is_hat() — Is This a Hat/D-pad Event?
# simplified
if joy.is_hat():
print(f"D-pad direction: {joy.value}")
.is_connection_event() — Is This a Hotplug Event?
# simplified
if joy.is_connection_event():
connected = joy.is_connected()
Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.
.is_connected() — Is the Controller Plugged In?
# simplified
if joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
Example — Gamepad Teleoperation:
# simplified
from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic
joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)
speed_scale = 1.0
linear = 0.0
angular = 0.0
def teleop(node):
global speed_scale, linear, angular
joy = joy_topic.recv(node)
if joy is None:
return
if joy.is_axis():
name = joy.element_name
if "left_stick_y" in name:
linear = -joy.value * speed_scale # Forward/backward
elif "left_stick_x" in name:
angular = -joy.value * speed_scale # Turn left/right
elif "right_trigger" in name:
speed_scale = 0.5 + joy.value * 1.5 # Speed boost
elif joy.is_button() and joy.pressed:
if joy.element_name == "A":
estop_topic.send(EmergencyStop.engage("Operator e-stop"))
linear = angular = 0.0
elif joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
linear = angular = 0.0
return
cmd_topic.send(CmdVel(linear=linear, angular=angular), node)
run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))
KeyboardInput
Keyboard events with modifier key detection.
Constructor
# simplified
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)
Fields
| Field | Type | Description |
|---|---|---|
key_name | str | Key label (e.g., "A", "space", "escape") |
code | int | Platform-specific key code |
pressed | bool | True = key down, False = key up |
modifiers | int | Modifier bit flags (Ctrl=1, Shift=2, Alt=4) |
.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks
# simplified
if key.is_ctrl() and key.key_name == "S":
save_map()
elif key.is_ctrl() and key.key_name == "Q":
shutdown()
elif key.key_name == "space" and key.pressed:
toggle_estop()
These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.
.pressed — Key State
# simplified
if key.pressed:
print(f"Key down: {key.key_name}")
else:
print(f"Key up: {key.key_name}")
Example — Keyboard Shortcuts:
# simplified
from horus import KeyboardInput, Topic
key_topic = Topic(KeyboardInput)
def handle_keys(node):
key = key_topic.recv(node)
if key is None or not key.pressed:
return
if key.key_name == "space":
toggle_pause()
elif key.is_ctrl() and key.key_name == "Z":
undo()
elif key.key_name == "escape":
emergency_stop()
AudioFrame
Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.
Fields
| Field | Type | Description |
|---|---|---|
sample_rate | int | Sample rate in Hz (e.g., 16000, 44100, 48000) |
channels | int | Number of audio channels |
samples | list[float] | Interleaved audio samples (-1.0 to 1.0) |
timestamp_ns | int | Capture timestamp in nanoseconds |
.mono(sample_rate, samples) — Single Channel
# simplified
frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])
16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.
.stereo(sample_rate, samples) — Two Channels
# simplified
# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)
.multi_channel(sample_rate, channels, samples) — Microphone Array
# simplified
# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)
Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.
.duration_ms() — Audio Duration
# simplified
print(f"Frame duration: {frame.duration_ms():.1f} ms")
Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.
.frame_count() — Number of Samples per Channel
# simplified
print(f"Samples per channel: {frame.frame_count()}")
Example — Audio Recording Node:
# simplified
from horus import Node, run, AudioFrame, Topic
audio_topic = Topic(AudioFrame)
recorded_samples = []
def record_audio(node):
frame = audio_topic.recv(node)
if frame is not None:
recorded_samples.extend(frame.samples)
if frame.duration_ms() > 0:
total_seconds = len(recorded_samples) / frame.sample_rate
if total_seconds > 10:
print(f"Recorded {total_seconds:.1f}s of audio")
Design Decisions
Why factory methods (new_button(), new_axis()) instead of raw event types? Joystick events carry different data depending on type: buttons have a pressed/released boolean, axes have a floating-point value, hat switches have a direction value. The factories set the correct event type flag and validate the data for that type, preventing a button event from carrying an axis value or vice versa.
Why does JoystickInput include connection events? Controller disconnection during teleoperation is a critical safety event. If the gamepad disconnects mid-mission, the robot receives no more velocity commands and coasts at its last speed. By including connection events in the same message stream, the handler can immediately detect disconnection and trigger an e-stop without needing a separate watchdog.
Why AudioFrame factory methods for channel layout? Mono, stereo, and multi-channel audio have different interleaving patterns. Mono is just a flat array. Stereo is interleaved L/R. Multi-channel is interleaved across N channels. The factories set the channel count and validate that the sample array length is a multiple of the channel count, catching a common class of audio processing bugs.
Why KeyboardInput uses modifier bit flags instead of separate booleans? Modifier keys can be combined (Ctrl+Shift+A). Bit flags allow efficient combination checking (is_ctrl() and is_shift()) without wasting space. The convenience methods (is_ctrl(), is_shift(), is_alt()) hide the bit manipulation, so you never need to work with raw flags.
See Also
- Control Messages — CmdVel for translating joystick input to velocity
- Diagnostics Messages — EmergencyStop for safety
- Geometry Messages — Twist for 6-DOF velocity commands from joysticks
- Python Message Library — All 55+ message types overview