Sensor Messages
Sensor messages carry raw and processed data from your robot's sensors — LiDAR scans, IMU readings, GPS positions, battery levels, and more. These are the inputs to every robotics pipeline: perception, navigation, monitoring.
When you need these: Every robot has sensors. If it moves, you need Odometry. If it has a lidar, you need LaserScan. If it has an IMU, you need Imu. If it runs on battery, you need BatteryState.
from horus import (
LaserScan, Imu, Odometry, JointState,
BatteryState, NavSatFix, RangeSensor,
MagneticField, Temperature, FluidPressure, Illuminance,
Clock, TimeReference,
)
LaserScan
2D LiDAR scan — an array of range readings at known angles. The most common sensor for obstacle avoidance in mobile robotics.
Constructor
import math
scan = LaserScan(
angle_min=-math.pi, # Start angle (radians)
angle_max=math.pi, # End angle (radians)
range_min=0.1, # Minimum valid range (meters)
range_max=10.0, # Maximum valid range (meters)
ranges=[1.0, 1.5, 2.0], # Range readings
)
.angle_at(index) — Angle for a Range Reading
angle = scan.angle_at(0) # Angle of the first reading
angle = scan.angle_at(100) # Angle of the 101st reading
Computes the angle (in radians) for a given index in the ranges array. The formula is: angle_min + index * angle_increment. You need this to convert polar (angle, range) readings to cartesian (x, y) coordinates:
import math
for i in range(len(scan.ranges)):
if scan.is_range_valid(i):
angle = scan.angle_at(i)
r = scan.ranges[i]
x = r * math.cos(angle) # In robot's frame
y = r * math.sin(angle)
.is_range_valid(index) — Filter Bad Readings
if scan.is_range_valid(5):
print(f"Valid: {scan.ranges[5]:.2f} m")
else:
print("Invalid: too close, too far, or NaN")
Returns True if the range at the given index falls within [range_min, range_max]. LiDAR sensors return invalid readings for many reasons — reflective surfaces, transparent objects, direct sunlight, readings below minimum range (too close to the sensor), or at maximum range (nothing detected). Always filter with this before using a range value.
Common mistake: Iterating
scan.rangeswithout checking validity. Invalid readings are often 0.0 orinf, which will corrupt your obstacle map if not filtered.
.min_range() — Closest Valid Object
closest = scan.min_range()
if closest is not None and closest < 0.5:
print("DANGER: object at {:.2f} m!".format(closest))
Returns the minimum valid range reading, or None if all readings are invalid. This is the fastest way to check for close obstacles — one call instead of iterating the entire array.
len(scan) — Valid Reading Count
print(f"{len(scan)} valid readings out of {len(scan.ranges)} total")
Example — LiDAR Obstacle Detection:
from horus import Node, run, LaserScan, CmdVel, Topic
import math
scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)
def avoid_obstacles(node):
scan = scan_topic.recv(node)
if scan is None:
return
closest = scan.min_range()
if closest is not None and closest < 0.5:
# Too close — stop and turn
cmd_topic.send(CmdVel(linear=0.0, angular=0.5), node)
else:
# Clear path — drive forward
cmd_topic.send(CmdVel(linear=0.3, angular=0.0), node)
run(Node(tick=avoid_obstacles, rate=10, pubs=["cmd_vel"], subs=["scan"]))
Imu
Inertial Measurement Unit — accelerometer (linear acceleration) + gyroscope (angular velocity) + optional orientation quaternion.
Constructor
imu = Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.1)
.set_orientation_from_euler(roll, pitch, yaw) — Set Orientation
import math
imu.set_orientation_from_euler(roll=0.0, pitch=0.05, yaw=math.pi/4)
Converts Euler angles (radians) to the internal quaternion representation and stores it. Use this when you have orientation estimates from a sensor fusion algorithm (complementary filter, Madgwick, EKF) and want to publish them with the IMU data.
- roll: tilt left/right (positive = right side down)
- pitch: tilt forward/backward (positive = nose down)
- yaw: compass heading (positive = counterclockwise from above)
Common mistake: Publishing orientation without actually computing it. If your IMU only has a 6-axis accelerometer+gyroscope (no magnetometer or fusion), don't set orientation — call
has_orientation()to check, and let downstream nodes know the orientation field is empty.
.has_orientation() — Is Orientation Data Present?
if imu.has_orientation():
# Safe to use orientation fields
print("Orientation available")
else:
# Only acceleration and angular velocity are valid
print("Raw IMU — no orientation estimate")
Not all IMUs provide orientation. A raw 6-axis chip gives only acceleration and angular velocity — orientation must be computed by a sensor fusion node. This method checks whether the orientation quaternion has been set (is non-identity).
.is_valid() — Validation
if not imu.is_valid():
print("Bad IMU reading — NaN or inf detected")
.angular_velocity_vec() — Gyroscope as Vector3
gyro = imu.angular_velocity_vec() # Returns Vector3
print(f"Rotation rate: {gyro.magnitude():.3f} rad/s")
if gyro.magnitude() > 2.0:
print("Spinning too fast!")
Returns the gyroscope reading as a Vector3, giving you access to magnitude(), dot(), cross(), and other vector operations. Much more useful than raw gyro_x, gyro_y, gyro_z fields when you need to compute rotation rates or compare angular velocities.
.linear_acceleration_vec() — Accelerometer as Vector3
accel = imu.linear_acceleration_vec()
gravity_removed = accel.magnitude() - 9.81
print(f"Dynamic acceleration: {gravity_removed:.2f} m/s²")
Same idea — returns a Vector3 for the accelerometer. On a stationary robot, the magnitude should be ≈ 9.81 m/s² (gravity). Deviations indicate dynamic acceleration (movement, vibration, impact).
Example — Tilt Detection:
from horus import Imu, Topic
import math
imu_topic = Topic(Imu)
def check_tilt(node):
imu = imu_topic.recv(node)
if imu is None or not imu.is_valid():
return
accel = imu.linear_acceleration_vec()
# Pitch angle from accelerometer (simplified, assumes no dynamic motion)
pitch = math.atan2(accel.x, math.sqrt(accel.y**2 + accel.z**2))
if abs(pitch) > math.radians(15):
print(f"WARNING: Robot tilted {math.degrees(pitch):.1f}° — risk of tipping!")
Odometry
Robot odometry — 2D pose + velocity, typically published by wheel encoders or visual odometry systems.
Constructor
odom = Odometry(x=1.0, y=2.0, theta=0.5,
linear_velocity=0.5, angular_velocity=0.1)
.set_frames(frame, child_frame) — Set Coordinate Frames
odom.set_frames("odom", "base_link")
Sets the parent and child frame names. By convention:
frame= "odom" (the fixed odometry frame)child_frame= "base_link" (the robot's body frame)
These names are used by the TransformFrame system to build the transform tree.
.update(pose, twist) — Update from New Measurements
from horus import Pose2D, Twist
new_pose = Pose2D(x=1.1, y=2.05, theta=0.52)
new_twist = Twist.new_2d(linear_x=0.5, angular_z=0.1)
odom.update(new_pose, new_twist)
Replaces the pose and twist with new measurements. Use this in a sensor fusion node where you compute updated estimates each tick.
.is_valid() — Validation
if not odom.is_valid():
print("Odometry contains NaN — encoder fault?")
JointState
Multi-joint state for robot arms and legged robots. Stores up to 16 joint names, positions, velocities, and efforts. The key feature is by-name lookup — query individual joints without iterating arrays.
Constructor
js = JointState(
names=["shoulder", "elbow", "wrist"],
positions=[1.57, 0.785, 0.0],
velocities=[0.0, 0.1, 0.0],
efforts=[5.0, 2.0, 0.5],
)
.position(name) — Look Up Joint Position
shoulder_angle = js.position("shoulder") # 1.57
gripper_angle = js.position("gripper") # None (not found)
Returns the position of the named joint, or None if that joint doesn't exist in this message. Positions are typically in radians for revolute joints or meters for prismatic joints.
.velocity(name) — Look Up Joint Velocity
elbow_speed = js.velocity("elbow") # 0.1 rad/s
.effort(name) — Look Up Joint Effort/Torque
wrist_torque = js.effort("wrist") # 0.5 Nm
Common mistake: Expecting the same joint names in
JointStateandJointCommand. If the publisher uses "joint_1" but the subscriber expects "shoulder", the by-name lookup returnsNone. Standardize joint names across your system.
Example — Arm Joint Monitor:
from horus import JointState, Topic
js_topic = Topic(JointState)
def monitor_arm(node):
js = js_topic.recv(node)
if js is None:
return
for name in js.names:
pos = js.position(name)
eff = js.effort(name)
if eff is not None and abs(eff) > 10.0:
print(f"WARNING: {name} effort {eff:.1f} Nm exceeds limit!")
BatteryState
Battery monitoring with threshold checks and time estimation.
Constructor
battery = BatteryState(voltage=11.1, percentage=15.0, current=-2.5)
.is_critical() — Below 10%
if battery.is_critical():
print("CRITICAL — land immediately!")
Hardcoded threshold at 10%. Intended for "land now" or "return to dock immediately" decisions.
.is_low(threshold) — Below Custom Threshold
if battery.is_low(20.0):
print("Low battery — start heading home")
Check against your own threshold. Typical values: 20-30% for "start returning", 10% for critical.
.time_remaining() — Estimated Remaining Time
remaining = battery.time_remaining()
if remaining is not None:
print(f"~{remaining/60:.0f} minutes remaining")
else:
print("Not discharging (charging or current=0)")
Returns estimated seconds of battery life based on current draw, or None if the battery isn't discharging (current ≥ 0 or current = 0). This is a simple linear estimate — actual runtime depends on load variations.
Common mistake: Treating
Noneas "infinite battery".Nonemeans the estimate isn't available — the battery might not be discharging, or the current sensor might not be connected.
NavSatFix
GPS/GNSS position with fix status and distance calculation.
.from_coordinates(lat, lon, alt) — From GPS Coordinates
pos = NavSatFix.from_coordinates(lat=37.7749, lon=-122.4194, alt=10.0)
Factory method that creates a fix with STATUS_FIX and sets the coordinates. Simpler than setting fields individually.
.has_fix() — Is GPS Valid?
if not gps.has_fix():
print("No GPS fix — using dead reckoning only")
Returns True when the receiver has a valid position fix. Without a fix, latitude/longitude are meaningless.
.distance_to(other) — Great-Circle Distance
home = NavSatFix.from_coordinates(lat=37.7750, lon=-122.4195, alt=10.0)
current = NavSatFix.from_coordinates(lat=37.7760, lon=-122.4180, alt=10.0)
dist = current.distance_to(home)
print(f"Distance to home: {dist:.1f} m")
Computes the great-circle distance between two GPS positions using the Haversine formula. Returns meters. This accounts for Earth's curvature — accurate for any distance from centimeters to thousands of kilometers.
Common mistake: Using Euclidean distance on latitude/longitude values.
distance = sqrt((lat1-lat2)² + (lon1-lon2)²)is wrong — it doesn't account for Earth's curvature or the fact that longitude degrees get shorter near the poles. Always usedistance_to().
.horizontal_accuracy() / .is_valid()
print(f"Accuracy: ±{gps.horizontal_accuracy():.1f} m")
print(gps.is_valid())
Clock
Time source for wall-clock, simulation, and replay modes. The factory methods create the right clock type for your use case.
.wall_clock() — Real Time
clk = Clock.wall_clock()
Production time — actual wall-clock nanoseconds. Use this for logging, timestamps, and real-world operation.
.sim_time(sim_ns, speed) — Simulation Time
clk = Clock.sim_time(sim_ns=0, speed=2.0) # Start at t=0, 2x speed
Controlled time for simulation. The scheduler advances sim_time deterministically, so replaying the same sequence produces identical results. Use this for testing and development.
Common mistake: Using
wall_clock()during simulation. Wall time advances independently of the simulation — your node will process data at the wrong rate. Always usesim_time()in simulation mode.
.replay_time(replay_ns, speed) — Replay Time
clk = Clock.replay_time(replay_ns=1000000000, speed=1.0)
Playback from a recorded session. Time advances at speed multiplier (1.0 = real-time, 0.5 = half-speed).
.elapsed_since(earlier) — Time Delta
start = Clock.wall_clock()
# ... do work ...
end = Clock.wall_clock()
elapsed_ns = end.elapsed_since(start)
print(f"Took {elapsed_ns / 1e6:.1f} ms")
Nanoseconds elapsed since another clock reading. Use this for profiling and timing measurements.
.is_paused() — Check Pause State
if clk.is_paused():
print("Simulation paused")
TimeReference
External time source for synchronization (GPS time, NTP offset).
.correct_timestamp(local_ns) — Apply Time Offset
tref = TimeReference(time_ref_ns=1000000000, source="gps", offset_ns=-500)
corrected = tref.correct_timestamp(local_ns=1000000500)
print(tref.source) # "gps"
Applies the offset correction to a local timestamp: corrected = local_ns + offset_ns. Use this when your system clock differs from an external reference (GPS, NTP server).
RangeSensor
Single-point distance measurement from an ultrasonic, infrared, or single-beam LiDAR sensor.
Constructor
range_s = RangeSensor(range=2.5, min_range=0.02, max_range=10.0,
field_of_view=0.1, radiation_type=0)
Fields
| Field | Type | Description |
|---|---|---|
range | float | Measured distance in meters |
min_range | float | Minimum detectable range (meters) |
max_range | float | Maximum detectable range (meters) |
field_of_view | float | Sensor beam width in radians |
radiation_type | int | 0 = ultrasonic, 1 = infrared |
Use case — Cliff detection for mobile robots:
from horus import RangeSensor, CmdVel, Topic
cliff_topic = Topic(RangeSensor)
cmd_topic = Topic(CmdVel)
def cliff_check(node):
cliff = cliff_topic.recv(node)
if cliff is None:
return
if cliff.range > 0.15: # Ground dropped away — cliff!
cmd_topic.send(CmdVel(linear=-0.2, angular=0.0), node) # Back up
MagneticField
Magnetometer reading for compass heading and magnetic anomaly detection.
Constructor
mag = MagneticField(x=0.25, y=-0.1, z=0.45) # Tesla
Fields
| Field | Type | Description |
|---|---|---|
x | float | Magnetic field strength along X axis (Tesla) |
y | float | Magnetic field strength along Y axis (Tesla) |
z | float | Magnetic field strength along Z axis (Tesla) |
Earth's magnetic field is approximately 25-65 microtesla (0.000025-0.000065 T). Values significantly larger indicate nearby ferromagnetic objects or electromagnetic interference.
import math
heading = math.atan2(mag.y, mag.x) # Compass heading (radians)
Temperature
Temperature measurement from a thermocouple, thermistor, or integrated sensor.
Constructor
temp = Temperature(temperature=72.5, variance=0.1)
Fields
| Field | Type | Description |
|---|---|---|
temperature | float | Temperature in degrees Celsius |
variance | float | Measurement variance (uncertainty squared) |
Use case — Motor thermal protection:
if temp.temperature > 80.0:
print("Motor overheating — reduce duty cycle!")
FluidPressure
Atmospheric or fluid pressure measurement from a barometer or pressure transducer.
Constructor
pressure = FluidPressure(pressure=101325.0, variance=10.0)
Fields
| Field | Type | Description |
|---|---|---|
pressure | float | Pressure in Pascals (Pa) |
variance | float | Measurement variance |
Standard atmospheric pressure at sea level is 101,325 Pa. Barometric altitude estimation uses the pressure-altitude relationship (~12 Pa per meter near sea level).
Illuminance
Light level measurement from an ambient light sensor.
Constructor
lux = Illuminance(illuminance=500.0, variance=5.0)
Fields
| Field | Type | Description |
|---|---|---|
illuminance | float | Light intensity in lux |
variance | float | Measurement variance |
Reference values: 0.001 lux (moonless night), 50 lux (living room), 500 lux (office), 10,000-100,000 lux (direct sunlight). Use for adaptive camera exposure or deciding when to activate headlights.
Design Decisions
Why separate LaserScan and RangeSensor? LaserScan represents a full sweep of range readings at known angles — it is a scan, not a single measurement. RangeSensor is a single distance reading from an ultrasonic, infrared, or single-beam sensor. Different use cases: LaserScan drives mapping and SLAM; RangeSensor is for simple distance checks (cliff detection, fill-level measurement).
Why does Imu have optional orientation? Raw IMU chips (6-axis accelerometer+gyroscope) cannot determine orientation without a fusion algorithm. Publishing zeroed orientation would mislead downstream nodes into thinking the robot is level when it might not be. The has_orientation() check makes the contract explicit: if it returns False, only raw accel/gyro are valid.
Why does BatteryState use a hardcoded 10% threshold for is_critical()? Below 10%, most LiPo batteries risk permanent damage and voltage sag can cause brownouts. The threshold is conservative-by-design — if 10% is wrong for your battery chemistry, use is_low(your_threshold) instead.
Why nanosecond timestamp_ns on every message? Sensor fusion requires sub-millisecond time correlation. An IMU at 1kHz produces readings every 1ms, so millisecond timestamps lose ordering information. Nanosecond precision is free (it is just a u64) and ensures any fusion algorithm has sufficient temporal resolution.
See Also
- Geometry Messages — Pose2D, Vector3, Quaternion
- Navigation Messages — NavGoal, OccupancyGrid
- Diagnostics Messages — BatteryState monitoring patterns
- Control Messages — MotorCommand, JointCommand for actuators
- Python Message Library — All 55+ message types overview