Tutorial: Sensor Fusion Sandbox - Camera, LiDAR, and IMU Streams
Categories: Sensors & Fusion
Overview
This tutorial walks through a minimal sensor–fusion sandbox built on top of DSG‑JIT’s sensors.* layer.
The goal is to show how to:
- Define simple synthetic sensor streams for a camera, LiDAR, and IMU.
- Wrap them in
FunctionStreamobjects so they look like real hardware. - Register each stream with a
SensorFusionManager. - Attach a callback (
ToyImuIntegrator) that consumes IMU measurements and produces a toy 1D fused pose. - Inspect the fused pose and visualize the integrated trajectory.
This experiment focuses purely on the sensor layer; it does not yet create SLAM factors or update a WorldModel / SceneGraphWorld. Think of it as a clean sandbox for getting comfortable with DSG‑JIT’s sensor APIs before wiring them into the rest of the stack.
1. Synthetic Sensor “Hardware”
Instead of reading from real devices or log files, the experiment defines three generator-style read functions:
make_camera_read_fn(landmark_ids)make_lidar_read_fn()make_imu_read_fn()
Each one returns a read() callable that behaves like a simple hardware driver:
- Every call to
read()returns a raw sample dictionary. - When the underlying sequence is exhausted (for the camera), it returns
Noneto signal end-of-stream.
def make_camera_read_fn(landmark_ids):
"""
Return a generator-style read() function that yields synthetic
bearing measurements compatible with raw_sample_to_camera_measurement.
"""
samples = [
{
"t": 0.0,
"frame_id": 0,
"bearings": jnp.array(
[
[1.0, 0.0, 0.0], # bearing to landmark 0
[1.0, 0.1, 0.0], # bearing to landmark 1
],
dtype=jnp.float32,
),
"landmark_ids": landmark_ids,
"sensor_pose": jnp.array([0, 0, 0, 0, 0, 0], dtype=jnp.float32),
},
{
"t": 1.0,
"frame_id": 1,
"bearings": jnp.array(
[
[1.0, 0.05, 0.0],
[1.0, 0.15, 0.0],
],
dtype=jnp.float32,
),
"landmark_ids": landmark_ids,
"sensor_pose": jnp.array([0.5, 0.0, 0, 0, 0, 0], dtype=jnp.float32),
},
]
it = iter(samples)
def read():
try:
return next(it)
except StopIteration:
return None # signals end of stream
return read
Key idea: the returned dictionaries are raw samples, not CameraMeasurement objects yet. They are shaped so that they can be handed to the camera converter:
from dsg_jit.sensors.conversion import raw_sample_to_camera_measurement
The LiDAR and IMU streams follow the same pattern:
- LiDAR: emits a scan at 10 Hz (
t += 0.1) with fixed ranges ≈ 5 m and angles from –45 to +45 degrees. - IMU: emits at 20 Hz (
t += 0.05) with: - constant acceleration
a = [0.5, 0, 0]m/s², - zero angular velocity,
- and a scalar
dt = 0.05.
def make_lidar_read_fn() -> callable:
t = 0.0
def read() -> Dict[str, Any]:
nonlocal t
t += 0.1 # 10 Hz
num_beams = 16
angles = jnp.linspace(-math.pi / 4, math.pi / 4, num_beams)
ranges = 5.0 * jnp.ones_like(angles, dtype=jnp.float32)
return {
"t": t,
"frame_id": "lidar0",
"angles": angles,
"ranges": ranges,
"rays": jnp.stack([angles, ranges], axis=1),
}
return read
def make_imu_read_fn() -> callable:
t = 0.0
def read() -> Dict[str, Any]:
nonlocal t
t += 0.05 # 20 Hz
dt = 0.05
accel = jnp.array([0.5, 0.0, 0.0], dtype=jnp.float32)
gyro = jnp.array([0.0, 0.0, 0.0], dtype=jnp.float32)
return {
"t": t,
"dt": dt,
"frame_id": "imu0",
"accel": accel,
"gyro": gyro,
}
return read
These functions are deliberately simple: they isolate the stream interface (a read() function) from the rest of the system.
2. Wrapping Streams with FunctionStream
DSG‑JIT represents a sensor stream as an object that it can poll for new samples. For synthetic streams, the FunctionStream wrapper is perfect:
from dsg_jit.sensors.streams import FunctionStream
landmark_ids = [0, 1]
cam_stream = FunctionStream(make_camera_read_fn(landmark_ids=landmark_ids))
lidar_stream = FunctionStream(make_lidar_read_fn())
imu_stream = FunctionStream(make_imu_read_fn())
Each FunctionStream holds onto the underlying read() function and exposes a standard interface that SensorFusionManager knows how to call.
If you later want to replace synthetic data with recorded logs, you can swap FunctionStream for something like FileRangeStream without touching the rest of the experiment.
3. From Raw Samples to Measurements
The raw dictionaries produced by the read() functions are converted to typed measurements via functions in sensors.conversion:
raw_sample_to_camera_measurementraw_sample_to_lidar_measurementraw_sample_to_imu_measurement
These converters know how to interpret keys like "t", "frame_id", "bearings", "ranges", "accel", "gyro", and dt, and they produce instances of:
CameraMeasurementLidarMeasurementIMUMeasurement
The fusion manager takes a stream + converter pair for each sensor:
from dsg_jit.sensors.fusion import SensorFusionManager
from dsg_jit.sensors.conversion import (
raw_sample_to_camera_measurement,
raw_sample_to_lidar_measurement,
raw_sample_to_imu_measurement,
)
fusion = SensorFusionManager()
fusion.register_sensor(
name="cam0",
modality="camera",
stream=cam_stream,
converter=raw_sample_to_camera_measurement,
)
fusion.register_sensor(
name="lidar0",
modality="lidar",
stream=lidar_stream,
converter=raw_sample_to_lidar_measurement,
)
fusion.register_sensor(
name="imu0",
modality="imu",
stream=imu_stream,
converter=raw_sample_to_imu_measurement,
)
Why this step? It separates I/O concerns (reading dictionaries from a file or device) from measurement semantics (what fields are present and how to interpret them for SLAM, fusion, or learning).
4. SensorFusionManager and Callbacks
Once streams and converters are registered, the fusion manager becomes the hub for polling measurements and dispatching them to callbacks.
# Pseudo-code shape:
class SensorFusionManager:
def register_sensor(self, name, modality, stream, converter):
...
def register_callback(self, callback):
...
def poll_once(self) -> int:
# 1) poll each stream at most one sample
# 2) convert raw samples to measurements
# 3) invoke callbacks(meas) for all new measurements
# 4) return the total number of new measurements
In this experiment, we attach a single callback: ToyImuIntegrator.
integrator = ToyImuIntegrator(fusion)
fusion.register_callback(integrator)
Every time poll_once() retrieves a new IMUMeasurement, the integrator is called and updates a toy 1D pose estimate.
5. ToyImuIntegrator: A Minimal 1D IMU Filter
The ToyImuIntegrator class demonstrates how to consume IMU measurements, maintain internal state, and optionally publish a fused pose back into the fusion manager.
class ToyImuIntegrator:
"""
Very simple 1D integrator using IMU measurements.
This is *not* a real inertial filter; it is just a toy example.
"""
def __init__(self, fusion: SensorFusionManager) -> None:
self.fusion = fusion
self.x = 0.0 # position along x
self.vx = 0.0 # velocity along x
self.history_t: List[float] = []
self.history_x: List[float] = []
def __call__(self, meas: BaseMeasurement) -> None:
if isinstance(meas, IMUMeasurement):
a_x = float(meas.accel[0])
dt = float(meas.dt)
# Basic integration: v += a*dt, x += v*dt
self.vx += a_x * dt
self.x += self.vx * dt
self.history_t.append(float(meas.timestamp))
self.history_x.append(self.x)
pose_se3 = jnp.array(
[self.x, 0.0, 0.0, 0.0, 0.0, 0.0],
dtype=jnp.float32,
)
self.fusion.record_fused_pose(
t=meas.timestamp,
pose_se3=pose_se3,
covariance=None,
source_counts={"imu": 1},
)
elif isinstance(meas, CameraMeasurement):
# Log camera info (timestamp + image/bearing shape)
...
elif isinstance(meas, LidarMeasurement):
# Log LiDAR mean range
...
A few details to note:
- The callback is polymorphic: it inspects the type of
measand decides what to do. - For IMU data, it performs a simplified double integration along the x-axis and appends the result to a history buffer.
- It uses
fusion.record_fused_pose(...)to publish a fused pose so it can be retrieved later viafusion.get_latest_pose().
For camera and LiDAR measurements, this demo just prints a brief summary:
- Camera: timestamp and image (or bearing) shape.
- LiDAR: timestamp and mean range.
In a more advanced setup, this is exactly where you would add DSG factors (e.g., range factors, bearing factors, occupancy updates) or log them for later learning.
6. Running the Simulation Loop
The main function wires everything together and runs a short simulation:
def main() -> None:
# 1) Build streams
landmark_ids = [0, 1]
cam_stream = FunctionStream(make_camera_read_fn(landmark_ids=landmark_ids))
lidar_stream = FunctionStream(make_lidar_read_fn())
imu_stream = FunctionStream(make_imu_read_fn())
# 2) Construct fusion manager and register sensors
fusion = SensorFusionManager()
fusion.register_sensor("cam0", "camera", cam_stream, raw_sample_to_camera_measurement)
fusion.register_sensor("lidar0", "lidar", lidar_stream, raw_sample_to_lidar_measurement)
fusion.register_sensor("imu0", "imu", imu_stream, raw_sample_to_imu_measurement)
# 3) Attach toy integrator
integrator = ToyImuIntegrator(fusion)
fusion.register_callback(integrator)
# 4) Run a short simulation loop
num_steps = 100
for step in range(num_steps):
n_meas = fusion.poll_once()
if n_meas == 0:
# In a real app, you might break when all streams are exhausted.
pass
# 5) Inspect latest fused pose
fused = fusion.get_latest_pose()
if fused is not None:
print("\n=== Latest fused pose (toy IMU integrator) ===")
print(f"t = {fused.t}")
print(f"pose_se3 = {fused.pose_se3}")
print(f"source_counts = {fused.source_counts}")
else:
print("No fused pose produced.")
The key call here is:
n_meas = fusion.poll_once()
On each iteration, SensorFusionManager:
- Polls each registered
streamat most once. - Converts any new raw samples into measurement objects via the provided converters.
- Calls all registered callbacks with each measurement.
You can control the simulated duration either by num_steps or by breaking when n_meas == 0 consistently.
7. Visualizing the 1D Trajectory
The integrator stores time–position pairs inside history_t and history_x. At the end of the run, the experiment uses Matplotlib to plot the estimated trajectory:
if integrator.history_t:
import matplotlib.pyplot as plt
ts = jnp.array(integrator.history_t)
xs = jnp.array(integrator.history_x)
plt.figure()
plt.plot(ts, xs, marker="o")
plt.xlabel("time [s]")
plt.ylabel("x position [m]")
plt.title("Toy IMU-integrated 1D trajectory")
plt.grid(True)
plt.show()
Because the acceleration is constant, the true motion would be a quadratic curve in time. The simple integrator tracks this approximate shape, giving you immediate visual feedback that the IMU callback is wired correctly.
8. Where to Go Next
This sensor fusion sandbox is intentionally minimal. Once you are comfortable with:
- constructing streams,
- converting raw samples into measurements,
- using
SensorFusionManager, - and writing callbacks like
ToyImuIntegrator,
you are ready to:
- Attach measurement factors into a
WorldModel(e.g., range priors, bearing factors). - Use fused poses as the backbone for SceneGraphWorld or DynamicSceneGraph trajectories.
- Swap synthetic streams with real datasets or ROS2 bridges.
- Extend the callback to maintain a full state estimator instead of a toy 1D integrator.
This experiment is meant to be your “hello world” for DSG‑JIT’s sensor fusion layer—simple enough to hack on quickly, but structured in the same way you would handle real sensor data in more advanced SLAM and learning setups.