SensorBase interface. This guide shows you how to integrate new sensors in under 20 minutes.
Quick Start Template
Here’s a complete template for adding a new sensor:Copy
Ask AI
from neuronav.sensors.base import SensorBase, SensorConfig
from typing import Dict, Optional
import subprocess
import time
class MyCustomSensor(SensorBase):
"""Custom sensor implementation for [Your Camera Name]"""
def __init__(self, config: Optional[SensorConfig] = None):
self.config = config or SensorConfig()
self.process = None
def configure(self, config: SensorConfig):
"""Store configuration for later use"""
self.config = config
def start(self):
"""Start the sensor driver and begin publishing data"""
# Launch your camera's ROS2 driver
cmd = [
"ros2", "run", "your_camera_package", "your_camera_node",
"--ros-args",
"-p", f"width:={self.config.rgb_width}",
"-p", f"height:={self.config.rgb_height}",
"-p", f"fps:={self.config.fps}"
]
# Add device-specific parameters
if self.config.device_id:
cmd.extend(["-p", f"device_id:={self.config.device_id}"])
# Launch the process
self.process = subprocess.Popen(cmd)
time.sleep(2) # Wait for initialization
def stop(self):
"""Stop the sensor and clean up resources"""
if self.process:
self.process.terminate()
self.process.wait(timeout=5)
self.process = None
def get_sensor_name(self) -> str:
"""Return the name of your sensor"""
return "MyCustomSensor"
def get_ros_topics(self) -> Dict[str, str]:
"""Return the mapping of data types to ROS2 topics"""
return {
"rgb": "/camera/color/image_raw",
"depth": "/camera/depth/image_raw",
"camera_info": "/camera/color/camera_info",
"imu": "/imu/data" # Optional
}
# Context manager support
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
Step-by-Step Guide
Step 1: Understand Your Camera
Before implementing, gather information about your camera:Step 2: Create Sensor Class
Create a new fileneuronav/sensors/your_camera.py:
Copy
Ask AI
from neuronav.sensors.base import SensorBase, SensorConfig
from typing import Dict, Optional
import subprocess
import os
import time
class YourCameraSensor(SensorBase):
"""
Support for [Your Camera Name]
This sensor provides RGB-D data from [manufacturer] cameras.
Requires [dependencies] to be installed.
"""
def __init__(self, config: Optional[SensorConfig] = None):
"""
Initialize the sensor with optional configuration
Args:
config: SensorConfig object with camera parameters
"""
self.config = config or SensorConfig()
self.processes = [] # List to track multiple processes
self._validate_config()
def _validate_config(self):
"""Validate and set default configuration"""
# Set camera-specific defaults
if not self.config.custom_params:
self.config.custom_params = {}
# Example: Set default exposure if not specified
if "exposure" not in self.config.custom_params:
self.config.custom_params["exposure"] = "auto"
Step 3: Implement Core Methods
Configure Method
Copy
Ask AI
def configure(self, config: SensorConfig):
"""
Configure the sensor with new settings
Args:
config: New configuration to apply
"""
self.config = config
self._validate_config()
# If sensor is running, restart with new config
if self.processes:
self.stop()
self.start()
Start Method
Copy
Ask AI
def start(self):
"""Start the sensor driver and related processes"""
# 1. Launch the main camera driver
driver_cmd = self._build_driver_command()
driver_process = subprocess.Popen(
driver_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.processes.append(driver_process)
# 2. Launch additional nodes if needed (e.g., IMU filter)
if self.config.enable_imu:
imu_cmd = self._build_imu_command()
imu_process = subprocess.Popen(imu_cmd)
self.processes.append(imu_process)
# 3. Wait for initialization
time.sleep(3)
# 4. Verify the sensor started correctly
if not self._verify_topics():
raise RuntimeError(f"{self.get_sensor_name()} failed to start")
def _build_driver_command(self):
"""Build the command to launch the camera driver"""
cmd = [
"ros2", "launch", "your_camera_package", "camera.launch.py"
]
# Add parameters
params = [
f"rgb_width:={self.config.rgb_width}",
f"rgb_height:={self.config.rgb_height}",
f"depth_width:={self.config.depth_width}",
f"depth_height:={self.config.depth_height}",
f"fps:={self.config.fps}"
]
# Add custom parameters
for key, value in self.config.custom_params.items():
params.append(f"{key}:={value}")
cmd.extend(params)
return cmd
def _verify_topics(self, timeout=5):
"""Verify that expected topics are being published"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
# Check if topics exist (implement actual check)
# This is a simplified example
result = subprocess.run(
["ros2", "topic", "list"],
capture_output=True,
text=True
)
topics = self.get_ros_topics()
if topics["rgb"] in result.stdout:
return True
time.sleep(0.5)
return False
Stop Method
Copy
Ask AI
def stop(self):
"""Stop all sensor processes gracefully"""
for process in self.processes:
if process.poll() is None: # Process is still running
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill() # Force kill if needed
process.wait()
self.processes.clear()
Get Methods
Copy
Ask AI
def get_sensor_name(self) -> str:
"""Return human-readable sensor name"""
return "Your Camera Name"
def get_ros_topics(self) -> Dict[str, str]:
"""
Return the mapping of data types to ROS2 topics
Returns:
Dictionary mapping data types to topic names
"""
# Adjust these to match your camera's actual topics
base_namespace = "/your_camera"
topics = {
"rgb": f"{base_namespace}/color/image_raw",
"depth": f"{base_namespace}/depth/image_raw",
"camera_info": f"{base_namespace}/color/camera_info"
}
# Add IMU topic if enabled
if self.config.enable_imu:
topics["imu"] = f"{base_namespace}/imu/data"
return topics
Step 4: Handle Special Cases
Multiple Processes
Some cameras require multiple nodes:Copy
Ask AI
def start(self):
"""Start multiple processes for the sensor"""
# 1. Start the camera driver
driver_process = self._start_driver()
self.processes.append(driver_process)
# 2. Start depth processing node
depth_process = self._start_depth_processor()
self.processes.append(depth_process)
# 3. Start synchronization node
sync_process = self._start_synchronizer()
self.processes.append(sync_process)
# 4. Start IMU filter if needed
if self.config.enable_imu:
imu_process = self._start_imu_filter()
self.processes.append(imu_process)
def _start_driver(self):
"""Start the main camera driver"""
cmd = ["ros2", "run", "camera_pkg", "camera_node"]
return subprocess.Popen(cmd)
def _start_depth_processor(self):
"""Start depth processing node"""
cmd = ["ros2", "run", "depth_pkg", "depth_node"]
return subprocess.Popen(cmd)
def _start_synchronizer(self):
"""Start RGB-D synchronization"""
cmd = [
"ros2", "run", "rtabmap_sync", "rgbd_sync",
"--ros-args",
"--remap", "rgb/image:=/camera/color/image",
"--remap", "depth/image:=/camera/depth/image",
"--remap", "rgb/camera_info:=/camera/color/info"
]
return subprocess.Popen(cmd)
Topic Remapping
If your camera uses different topic names:Copy
Ask AI
def get_ros_topics(self) -> Dict[str, str]:
"""Handle non-standard topic names"""
# Your camera's actual topics
camera_topics = {
"rgb": "/my_camera/rgb/image",
"depth": "/my_camera/depth_registered/image",
"camera_info": "/my_camera/rgb/camera_info",
"imu": "/my_camera/imu/raw"
}
# The SDK will handle remapping to standard names
return camera_topics
Network Cameras
For IP/network cameras:Copy
Ask AI
class NetworkCameraSensor(SensorBase):
def __init__(self, config: Optional[SensorConfig] = None):
super().__init__(config)
self.camera_ip = config.custom_params.get("ip_address", "192.168.1.1")
self.camera_port = config.custom_params.get("port", 8080)
def start(self):
"""Start network camera stream"""
cmd = [
"ros2", "run", "network_camera", "stream_node",
"--ros-args",
"-p", f"camera_ip:={self.camera_ip}",
"-p", f"camera_port:={self.camera_port}",
"-p", f"username:={self.config.custom_params.get('username', '')}",
"-p", f"password:={self.config.custom_params.get('password', '')}"
]
self.process = subprocess.Popen(cmd)
Step 5: Add to Package
Updateneuronav/sensors/__init__.py:
Copy
Ask AI
from .base import SensorBase, SensorConfig
from .realsense import RealSenseSensor
from .oakd import OAKDSensor
from .your_camera import YourCameraSensor # Add your sensor
__all__ = [
'SensorBase',
'SensorConfig',
'RealSenseSensor',
'OAKDSensor',
'YourCameraSensor' # Export it
]
neuronav/__init__.py:
Copy
Ask AI
from .sensors import (
SensorBase,
SensorConfig,
RealSenseSensor,
OAKDSensor,
YourCameraSensor # Add here too
)
Step 6: Test Your Sensor
Create a test script:Copy
Ask AI
#!/usr/bin/env python3
"""Test script for custom sensor integration"""
from neuronav import YourCameraSensor, run_slam, RTABMapSLAM
from neuronav.sensors import SensorConfig
import time
def test_basic():
"""Test basic sensor functionality"""
print("Testing basic sensor startup...")
sensor = YourCameraSensor()
sensor.start()
print(f"Sensor name: {sensor.get_sensor_name()}")
print(f"Topics: {sensor.get_ros_topics()}")
time.sleep(5)
sensor.stop()
print("✓ Basic test passed")
def test_with_config():
"""Test sensor with custom configuration"""
print("\nTesting with configuration...")
config = SensorConfig(
rgb_width=1280,
rgb_height=720,
fps=30,
enable_imu=True,
custom_params={
"exposure": "8000",
"gain": "100"
}
)
sensor = YourCameraSensor(config)
sensor.start()
time.sleep(5)
sensor.stop()
print("✓ Configuration test passed")
def test_with_slam():
"""Test sensor with SLAM"""
print("\nTesting with SLAM...")
sensor = YourCameraSensor()
slam = RTABMapSLAM()
# Run for 10 seconds
run_slam(sensor, slam, duration=10)
print("✓ SLAM test passed")
if __name__ == "__main__":
test_basic()
test_with_config()
test_with_slam()
print("\n✅ All tests passed!")
Testing Checklist
Before considering your sensor integration complete:Next Steps
Test with SLAM
Run SLAM with your new sensor
Share Your Integration
Contribute your sensor to the SDK
Configuration Guide
Learn about advanced configuration
Troubleshooting
Debug common integration issues
Contributing Your Sensor
If you’ve successfully integrated a new sensor, consider contributing it back to the community:- Fork the Neuronav SDK repository
- Add your sensor implementation
- Include documentation and examples
- Submit a pull request