SO101-Nexus
API Reference

LeRobot Processors

Action and observation processor steps, default pipelines, and the LeRobot env wrapper.

LeRobot Processors

The so101_nexus.processors subpackage exposes the LeRobot-compatible building blocks SO101-Nexus uses for teleoperation, dataset recording, and observation-shape conversion. Every step subclasses LeRobot's typed processor bases and is registered with ProcessorStepRegistry, so pipelines can be saved and loaded via save_pretrained and from_pretrained.

Optional Dependency

Importing the processors subpackage requires lerobot. Install the optional extra:

pip install "so101-nexus[teleop]"

or with uv:

uv sync --extra teleop

Environment Wrappers

make_lerobot_env

def make_lerobot_env(
    env_id: str,
    *,
    pipeline: DataProcessorPipeline | None = None,
    device: str | torch.device | None = None,
    add_batch_dim: bool = False,
    **make_kwargs: Any,
) -> gym.Env

Build a LeRobotEnvWrapper around a registered SO101-Nexus env id. Extra keyword arguments are forwarded to gymnasium.make.

The wrapped env must have a Dict observation space, which means at least one camera component (WristCamera or OverheadCamera) or another non-default observation must be configured. Wrapping the default state-only env raises TypeError.

import so101_nexus.mujoco  # noqa: F401
from so101_nexus import JointPositions, PickConfig, WristCamera
from so101_nexus.processors import make_lerobot_env

config = PickConfig(
    obs_mode="visual",
    observations=[JointPositions(), WristCamera(width=224, height=224)],
)
env = make_lerobot_env("MuJoCoPickLift-v1", config=config, render_mode="rgb_array")
obs, _ = env.reset()
# obs is now {"observation.state": ..., "observation.images.wrist": ..., ...}

LeRobotEnvWrapper

class LeRobotEnvWrapper(gym.ObservationWrapper):
    def __init__(
        self,
        env: gym.Env,
        pipeline: DataProcessorPipeline | None = None,
        *,
        device: str | torch.device | None = None,
        add_batch_dim: bool = False,
    ): ...

Wrap an existing env so its observations match LeRobot conventions (observation.state, observation.images.<name>). When pipeline is None, a default pipeline is built from make_default_env_observation_pipeline and observation_space is updated to reflect renamed keys and CHW float32 image shapes. When you pass a custom pipeline, you are responsible for setting observation_space if you need it to match the transformed shape.

Default Pipeline Factories

make_default_env_observation_pipeline

def make_default_env_observation_pipeline(
    observation_space: gym.spaces.Dict,
    *,
    device: str | torch.device | None = None,
    add_batch_dim: bool = False,
) -> DataProcessorPipeline

Build the default env-observation pipeline used by LeRobotEnvWrapper. The steps are:

  1. Rename state to observation.state and <name>_camera to observation.images.<name>.
  2. Convert HWC uint8 images to CHW float32 tensors in [0, 1].
  3. (Optional) Add a leading batch dimension.
  4. (Optional) Move tensors to device.

make_default_leader_action_pipeline

def make_default_leader_action_pipeline(
    joint_names: tuple[str, ...] = SO101_JOINT_NAMES,
    wrist_roll_offset_deg: float = -90.0,
) -> DataProcessorPipeline

Build the default leader-arm action pipeline used by the teleop recorder. It accepts {"action": leader_dict} and returns a NumPy array of shape (len(joint_names),) in radians, with the wrist-roll calibration shift applied.

joint_names must include "wrist_roll"; the offset is applied at that index.

Action Processor Steps

LeaderActionToJointArrayStep

LeaderActionToJointArrayStep(joint_names: tuple[str, ...] = SO101_JOINT_NAMES)

Convert a leader-arm dict ({joint}.pos floats in degrees) to an ordered NumPy array. Output unit matches the input (degrees in, degrees out); use DegreesToRadiansActionStep to convert.

DegreesToRadiansActionStep

DegreesToRadiansActionStep()

Convert an action vector from degrees to radians. Operates on a NumPy array.

JointOffsetActionStep

JointOffsetActionStep(joint_index: int = 0, offset_rad: float = 0.0)

Add a constant offset (in radians) to a single index of the action vector. Generic over the target joint, so it can be reused for any per-joint calibration shift.

Observation Processor Steps

Hwc2ChwImageObservationStep

Hwc2ChwImageObservationStep(image_keys: tuple[str, ...] = ())

Convert HWC uint8 images at the listed keys to CHW torch.float32 tensors in [0, 1]. Other observation entries pass through unchanged. The step is image-key-generic so it works for any SO101-Nexus camera (current backends ship wrist and overhead).

Building a Custom Pipeline

For a custom env-observation pipeline, build a DataProcessorPipeline and pass it to LeRobotEnvWrapper:

import gymnasium as gym
from lerobot.processor import DataProcessorPipeline, RenameObservationsProcessorStep
from so101_nexus import JointPositions, PickConfig, WristCamera
from so101_nexus.processors import Hwc2ChwImageObservationStep, LeRobotEnvWrapper

config = PickConfig(
    obs_mode="visual",
    observations=[JointPositions(), WristCamera(width=224, height=224)],
)
pipeline = DataProcessorPipeline(
    steps=[
        RenameObservationsProcessorStep(
            rename_map={
                "state": "observation.state",
                "wrist_camera": "observation.images.wrist",
            }
        ),
        Hwc2ChwImageObservationStep(image_keys=("observation.images.wrist",)),
    ]
)
env = LeRobotEnvWrapper(gym.make("MuJoCoPickLift-v1", config=config), pipeline=pipeline)

On this page