Input & Audio Messages
Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.
from horus import JoystickInput, KeyboardInput, AudioFrame
JoystickInput
Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.
Fields
| Field | Type | Description |
|---|---|---|
joystick_id | int | Controller identifier (for multi-controller setups) |
event_type | int | Event type (0=button, 1=axis, 2=hat, 3=connection) |
element_id | int | Button/axis/hat ID within the controller |
element_name | str | Human-readable name (e.g., "A", "left_stick_x") |
value | float | Analog value (-1.0 to 1.0 for sticks, 0.0/1.0 for buttons) |
pressed | bool | Button state (True = pressed, False = released) |
timestamp_ns | int | Event timestamp in nanoseconds |
Factory Methods
# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)
# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)
# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)
# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)
Event Type Queries — Dispatch Pattern
The standard way to handle mixed joystick events is to check the type and dispatch:
def handle_input(joy):
if joy.is_button():
if joy.pressed and joy.element_name == "A":
trigger_action()
elif joy.element_name == "B":
cancel_action()
elif joy.is_axis():
if "stick_x" in joy.element_name:
steer(joy.value)
elif "trigger" in joy.element_name:
throttle(joy.value)
elif joy.is_hat():
handle_dpad(joy.value)
elif joy.is_connection_event():
if joy.is_connected():
print("Controller connected")
else:
print("Controller disconnected — engaging e-stop!")
.is_button() — Is This a Button Event?
if joy.is_button():
print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")
.is_axis() — Is This an Axis Event?
if joy.is_axis():
print(f"Axis {joy.element_name}: {joy.value:.2f}")
Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).
.is_hat() — Is This a Hat/D-pad Event?
if joy.is_hat():
print(f"D-pad direction: {joy.value}")
.is_connection_event() — Is This a Hotplug Event?
if joy.is_connection_event():
connected = joy.is_connected()
Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.
.is_connected() — Is the Controller Plugged In?
if joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
Example — Gamepad Teleoperation:
from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic
joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)
speed_scale = 1.0
linear = 0.0
angular = 0.0
def teleop(node):
global speed_scale, linear, angular
joy = joy_topic.recv(node)
if joy is None:
return
if joy.is_axis():
name = joy.element_name
if "left_stick_y" in name:
linear = -joy.value * speed_scale # Forward/backward
elif "left_stick_x" in name:
angular = -joy.value * speed_scale # Turn left/right
elif "right_trigger" in name:
speed_scale = 0.5 + joy.value * 1.5 # Speed boost
elif joy.is_button() and joy.pressed:
if joy.element_name == "A":
estop_topic.send(EmergencyStop.engage("Operator e-stop"))
linear = angular = 0.0
elif joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
linear = angular = 0.0
return
cmd_topic.send(CmdVel(linear=linear, angular=angular), node)
run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))
KeyboardInput
Keyboard events with modifier key detection.
Constructor
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)
Fields
| Field | Type | Description |
|---|---|---|
key_name | str | Key label (e.g., "A", "space", "escape") |
code | int | Platform-specific key code |
pressed | bool | True = key down, False = key up |
modifiers | int | Modifier bit flags (Ctrl=1, Shift=2, Alt=4) |
.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks
if key.is_ctrl() and key.key_name == "S":
save_map()
elif key.is_ctrl() and key.key_name == "Q":
shutdown()
elif key.key_name == "space" and key.pressed:
toggle_estop()
These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.
.pressed — Key State
if key.pressed:
print(f"Key down: {key.key_name}")
else:
print(f"Key up: {key.key_name}")
Example — Keyboard Shortcuts:
from horus import KeyboardInput, Topic
key_topic = Topic(KeyboardInput)
def handle_keys(node):
key = key_topic.recv(node)
if key is None or not key.pressed:
return
if key.key_name == "space":
toggle_pause()
elif key.is_ctrl() and key.key_name == "Z":
undo()
elif key.key_name == "escape":
emergency_stop()
AudioFrame
Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.
Fields
| Field | Type | Description |
|---|---|---|
sample_rate | int | Sample rate in Hz (e.g., 16000, 44100, 48000) |
channels | int | Number of audio channels |
samples | list[float] | Interleaved audio samples (-1.0 to 1.0) |
timestamp_ns | int | Capture timestamp in nanoseconds |
.mono(sample_rate, samples) — Single Channel
frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])
16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.
.stereo(sample_rate, samples) — Two Channels
# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)
.multi_channel(sample_rate, channels, samples) — Microphone Array
# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)
Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.
.duration_ms() — Audio Duration
print(f"Frame duration: {frame.duration_ms():.1f} ms")
Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.
.frame_count() — Number of Samples per Channel
print(f"Samples per channel: {frame.frame_count()}")
Example — Audio Recording Node:
from horus import Node, run, AudioFrame, Topic
audio_topic = Topic(AudioFrame)
recorded_samples = []
def record_audio(node):
frame = audio_topic.recv(node)
if frame is not None:
recorded_samples.extend(frame.samples)
if frame.duration_ms() > 0:
total_seconds = len(recorded_samples) / frame.sample_rate
if total_seconds > 10:
print(f"Recorded {total_seconds:.1f}s of audio")
Design Decisions
Why factory methods (new_button(), new_axis()) instead of raw event types? Joystick events carry different data depending on type: buttons have a pressed/released boolean, axes have a floating-point value, hat switches have a direction value. The factories set the correct event type flag and validate the data for that type, preventing a button event from carrying an axis value or vice versa.
Why does JoystickInput include connection events? Controller disconnection during teleoperation is a critical safety event. If the gamepad disconnects mid-mission, the robot receives no more velocity commands and coasts at its last speed. By including connection events in the same message stream, the handler can immediately detect disconnection and trigger an e-stop without needing a separate watchdog.
Why AudioFrame factory methods for channel layout? Mono, stereo, and multi-channel audio have different interleaving patterns. Mono is just a flat array. Stereo is interleaved L/R. Multi-channel is interleaved across N channels. The factories set the channel count and validate that the sample array length is a multiple of the channel count, catching a common class of audio processing bugs.
Why KeyboardInput uses modifier bit flags instead of separate booleans? Modifier keys can be combined (Ctrl+Shift+A). Bit flags allow efficient combination checking (is_ctrl() and is_shift()) without wasting space. The convenience methods (is_ctrl(), is_shift(), is_alt()) hide the bit manipulation, so you never need to work with raw flags.
See Also
- Control Messages — CmdVel for translating joystick input to velocity
- Diagnostics Messages — EmergencyStop for safety
- Geometry Messages — Twist for 6-DOF velocity commands from joysticks
- Python Message Library — All 55+ message types overview