Skip to main content
The Neuronav SDK is designed to be easily extensible. You can add support for any depth camera by implementing the SensorBase interface. This guide shows you how to integrate new sensors in under 20 minutes.

Quick Start Template

Here’s a complete template for adding a new sensor:
from neuronav.sensors.base import SensorBase, SensorConfig
from typing import Dict, Optional
import subprocess
import time

class MyCustomSensor(SensorBase):
    """Custom sensor implementation for [Your Camera Name]"""

    def __init__(self, config: Optional[SensorConfig] = None):
        self.config = config or SensorConfig()
        self.process = None

    def configure(self, config: SensorConfig):
        """Store configuration for later use"""
        self.config = config

    def start(self):
        """Start the sensor driver and begin publishing data"""
        # Launch your camera's ROS2 driver
        cmd = [
            "ros2", "run", "your_camera_package", "your_camera_node",
            "--ros-args",
            "-p", f"width:={self.config.rgb_width}",
            "-p", f"height:={self.config.rgb_height}",
            "-p", f"fps:={self.config.fps}"
        ]

        # Add device-specific parameters
        if self.config.device_id:
            cmd.extend(["-p", f"device_id:={self.config.device_id}"])

        # Launch the process
        self.process = subprocess.Popen(cmd)
        time.sleep(2)  # Wait for initialization

    def stop(self):
        """Stop the sensor and clean up resources"""
        if self.process:
            self.process.terminate()
            self.process.wait(timeout=5)
            self.process = None

    def get_sensor_name(self) -> str:
        """Return the name of your sensor"""
        return "MyCustomSensor"

    def get_ros_topics(self) -> Dict[str, str]:
        """Return the mapping of data types to ROS2 topics"""
        return {
            "rgb": "/camera/color/image_raw",
            "depth": "/camera/depth/image_raw",
            "camera_info": "/camera/color/camera_info",
            "imu": "/imu/data"  # Optional
        }

    # Context manager support
    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

Step-by-Step Guide

Step 1: Understand Your Camera

Before implementing, gather information about your camera:

Step 2: Create Sensor Class

Create a new file neuronav/sensors/your_camera.py:
from neuronav.sensors.base import SensorBase, SensorConfig
from typing import Dict, Optional
import subprocess
import os
import time

class YourCameraSensor(SensorBase):
    """
    Support for [Your Camera Name]

    This sensor provides RGB-D data from [manufacturer] cameras.
    Requires [dependencies] to be installed.
    """

    def __init__(self, config: Optional[SensorConfig] = None):
        """
        Initialize the sensor with optional configuration

        Args:
            config: SensorConfig object with camera parameters
        """
        self.config = config or SensorConfig()
        self.processes = []  # List to track multiple processes
        self._validate_config()

    def _validate_config(self):
        """Validate and set default configuration"""
        # Set camera-specific defaults
        if not self.config.custom_params:
            self.config.custom_params = {}

        # Example: Set default exposure if not specified
        if "exposure" not in self.config.custom_params:
            self.config.custom_params["exposure"] = "auto"

Step 3: Implement Core Methods

Configure Method

def configure(self, config: SensorConfig):
    """
    Configure the sensor with new settings

    Args:
        config: New configuration to apply
    """
    self.config = config
    self._validate_config()

    # If sensor is running, restart with new config
    if self.processes:
        self.stop()
        self.start()

Start Method

def start(self):
    """Start the sensor driver and related processes"""

    # 1. Launch the main camera driver
    driver_cmd = self._build_driver_command()
    driver_process = subprocess.Popen(
        driver_cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    self.processes.append(driver_process)

    # 2. Launch additional nodes if needed (e.g., IMU filter)
    if self.config.enable_imu:
        imu_cmd = self._build_imu_command()
        imu_process = subprocess.Popen(imu_cmd)
        self.processes.append(imu_process)

    # 3. Wait for initialization
    time.sleep(3)

    # 4. Verify the sensor started correctly
    if not self._verify_topics():
        raise RuntimeError(f"{self.get_sensor_name()} failed to start")

def _build_driver_command(self):
    """Build the command to launch the camera driver"""
    cmd = [
        "ros2", "launch", "your_camera_package", "camera.launch.py"
    ]

    # Add parameters
    params = [
        f"rgb_width:={self.config.rgb_width}",
        f"rgb_height:={self.config.rgb_height}",
        f"depth_width:={self.config.depth_width}",
        f"depth_height:={self.config.depth_height}",
        f"fps:={self.config.fps}"
    ]

    # Add custom parameters
    for key, value in self.config.custom_params.items():
        params.append(f"{key}:={value}")

    cmd.extend(params)
    return cmd

def _verify_topics(self, timeout=5):
    """Verify that expected topics are being published"""
    import time
    start_time = time.time()

    while time.time() - start_time < timeout:
        # Check if topics exist (implement actual check)
        # This is a simplified example
        result = subprocess.run(
            ["ros2", "topic", "list"],
            capture_output=True,
            text=True
        )

        topics = self.get_ros_topics()
        if topics["rgb"] in result.stdout:
            return True

        time.sleep(0.5)

    return False

Stop Method

def stop(self):
    """Stop all sensor processes gracefully"""
    for process in self.processes:
        if process.poll() is None:  # Process is still running
            process.terminate()

            try:
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()  # Force kill if needed
                process.wait()

    self.processes.clear()

Get Methods

def get_sensor_name(self) -> str:
    """Return human-readable sensor name"""
    return "Your Camera Name"

def get_ros_topics(self) -> Dict[str, str]:
    """
    Return the mapping of data types to ROS2 topics

    Returns:
        Dictionary mapping data types to topic names
    """
    # Adjust these to match your camera's actual topics
    base_namespace = "/your_camera"

    topics = {
        "rgb": f"{base_namespace}/color/image_raw",
        "depth": f"{base_namespace}/depth/image_raw",
        "camera_info": f"{base_namespace}/color/camera_info"
    }

    # Add IMU topic if enabled
    if self.config.enable_imu:
        topics["imu"] = f"{base_namespace}/imu/data"

    return topics

Step 4: Handle Special Cases

Multiple Processes

Some cameras require multiple nodes:
def start(self):
    """Start multiple processes for the sensor"""

    # 1. Start the camera driver
    driver_process = self._start_driver()
    self.processes.append(driver_process)

    # 2. Start depth processing node
    depth_process = self._start_depth_processor()
    self.processes.append(depth_process)

    # 3. Start synchronization node
    sync_process = self._start_synchronizer()
    self.processes.append(sync_process)

    # 4. Start IMU filter if needed
    if self.config.enable_imu:
        imu_process = self._start_imu_filter()
        self.processes.append(imu_process)

def _start_driver(self):
    """Start the main camera driver"""
    cmd = ["ros2", "run", "camera_pkg", "camera_node"]
    return subprocess.Popen(cmd)

def _start_depth_processor(self):
    """Start depth processing node"""
    cmd = ["ros2", "run", "depth_pkg", "depth_node"]
    return subprocess.Popen(cmd)

def _start_synchronizer(self):
    """Start RGB-D synchronization"""
    cmd = [
        "ros2", "run", "rtabmap_sync", "rgbd_sync",
        "--ros-args",
        "--remap", "rgb/image:=/camera/color/image",
        "--remap", "depth/image:=/camera/depth/image",
        "--remap", "rgb/camera_info:=/camera/color/info"
    ]
    return subprocess.Popen(cmd)

Topic Remapping

If your camera uses different topic names:
def get_ros_topics(self) -> Dict[str, str]:
    """Handle non-standard topic names"""

    # Your camera's actual topics
    camera_topics = {
        "rgb": "/my_camera/rgb/image",
        "depth": "/my_camera/depth_registered/image",
        "camera_info": "/my_camera/rgb/camera_info",
        "imu": "/my_camera/imu/raw"
    }

    # The SDK will handle remapping to standard names
    return camera_topics

Network Cameras

For IP/network cameras:
class NetworkCameraSensor(SensorBase):
    def __init__(self, config: Optional[SensorConfig] = None):
        super().__init__(config)
        self.camera_ip = config.custom_params.get("ip_address", "192.168.1.1")
        self.camera_port = config.custom_params.get("port", 8080)

    def start(self):
        """Start network camera stream"""
        cmd = [
            "ros2", "run", "network_camera", "stream_node",
            "--ros-args",
            "-p", f"camera_ip:={self.camera_ip}",
            "-p", f"camera_port:={self.camera_port}",
            "-p", f"username:={self.config.custom_params.get('username', '')}",
            "-p", f"password:={self.config.custom_params.get('password', '')}"
        ]

        self.process = subprocess.Popen(cmd)

Step 5: Add to Package

Update neuronav/sensors/__init__.py:
from .base import SensorBase, SensorConfig
from .realsense import RealSenseSensor
from .oakd import OAKDSensor
from .your_camera import YourCameraSensor  # Add your sensor

__all__ = [
    'SensorBase',
    'SensorConfig',
    'RealSenseSensor',
    'OAKDSensor',
    'YourCameraSensor'  # Export it
]
Update main neuronav/__init__.py:
from .sensors import (
    SensorBase,
    SensorConfig,
    RealSenseSensor,
    OAKDSensor,
    YourCameraSensor  # Add here too
)

Step 6: Test Your Sensor

Create a test script:
#!/usr/bin/env python3
"""Test script for custom sensor integration"""

from neuronav import YourCameraSensor, run_slam, RTABMapSLAM
from neuronav.sensors import SensorConfig
import time

def test_basic():
    """Test basic sensor functionality"""
    print("Testing basic sensor startup...")

    sensor = YourCameraSensor()
    sensor.start()

    print(f"Sensor name: {sensor.get_sensor_name()}")
    print(f"Topics: {sensor.get_ros_topics()}")

    time.sleep(5)
    sensor.stop()
    print("✓ Basic test passed")

def test_with_config():
    """Test sensor with custom configuration"""
    print("\nTesting with configuration...")

    config = SensorConfig(
        rgb_width=1280,
        rgb_height=720,
        fps=30,
        enable_imu=True,
        custom_params={
            "exposure": "8000",
            "gain": "100"
        }
    )

    sensor = YourCameraSensor(config)
    sensor.start()
    time.sleep(5)
    sensor.stop()
    print("✓ Configuration test passed")

def test_with_slam():
    """Test sensor with SLAM"""
    print("\nTesting with SLAM...")

    sensor = YourCameraSensor()
    slam = RTABMapSLAM()

    # Run for 10 seconds
    run_slam(sensor, slam, duration=10)
    print("✓ SLAM test passed")

if __name__ == "__main__":
    test_basic()
    test_with_config()
    test_with_slam()
    print("\n✅ All tests passed!")

Testing Checklist

Before considering your sensor integration complete:

Next Steps

Contributing Your Sensor

If you’ve successfully integrated a new sensor, consider contributing it back to the community:
  1. Fork the Neuronav SDK repository
  2. Add your sensor implementation
  3. Include documentation and examples
  4. Submit a pull request
We’d love to expand our sensor support!