LeRobot Processors
Action and observation processor steps, default pipelines, and the LeRobot env wrapper.
LeRobot Processors
The so101_nexus.processors subpackage exposes the LeRobot-compatible building blocks SO101-Nexus uses for teleoperation, dataset recording, and observation-shape conversion. Every step subclasses LeRobot's typed processor bases and is registered with ProcessorStepRegistry, so pipelines can be saved and loaded via save_pretrained and from_pretrained.
Optional Dependency
Importing the processors subpackage requires lerobot. Install the optional extra:
pip install "so101-nexus[teleop]"or with uv:
uv sync --extra teleopEnvironment Wrappers
make_lerobot_env
def make_lerobot_env(
env_id: str,
*,
pipeline: DataProcessorPipeline | None = None,
device: str | torch.device | None = None,
add_batch_dim: bool = False,
**make_kwargs: Any,
) -> gym.EnvBuild a LeRobotEnvWrapper around a registered SO101-Nexus env id. Extra keyword arguments are forwarded to gymnasium.make.
The wrapped env must have a Dict observation space, which means at least one camera component (WristCamera or OverheadCamera) or another non-default observation must be configured. Wrapping the default state-only env raises TypeError.
import so101_nexus.mujoco # noqa: F401
from so101_nexus import JointPositions, PickConfig, WristCamera
from so101_nexus.processors import make_lerobot_env
config = PickConfig(
obs_mode="visual",
observations=[JointPositions(), WristCamera(width=224, height=224)],
)
env = make_lerobot_env("MuJoCoPickLift-v1", config=config, render_mode="rgb_array")
obs, _ = env.reset()
# obs is now {"observation.state": ..., "observation.images.wrist": ..., ...}LeRobotEnvWrapper
class LeRobotEnvWrapper(gym.ObservationWrapper):
def __init__(
self,
env: gym.Env,
pipeline: DataProcessorPipeline | None = None,
*,
device: str | torch.device | None = None,
add_batch_dim: bool = False,
): ...Wrap an existing env so its observations match LeRobot conventions (observation.state, observation.images.<name>). When pipeline is None, a default pipeline is built from make_default_env_observation_pipeline and observation_space is updated to reflect renamed keys and CHW float32 image shapes. When you pass a custom pipeline, you are responsible for setting observation_space if you need it to match the transformed shape.
Default Pipeline Factories
make_default_env_observation_pipeline
def make_default_env_observation_pipeline(
observation_space: gym.spaces.Dict,
*,
device: str | torch.device | None = None,
add_batch_dim: bool = False,
) -> DataProcessorPipelineBuild the default env-observation pipeline used by LeRobotEnvWrapper. The steps are:
- Rename
statetoobservation.stateand<name>_cameratoobservation.images.<name>. - Convert HWC
uint8images to CHWfloat32tensors in[0, 1]. - (Optional) Add a leading batch dimension.
- (Optional) Move tensors to
device.
make_default_leader_action_pipeline
def make_default_leader_action_pipeline(
joint_names: tuple[str, ...] = SO101_JOINT_NAMES,
wrist_roll_offset_deg: float = -90.0,
) -> DataProcessorPipelineBuild the default leader-arm action pipeline used by the teleop recorder. It accepts {"action": leader_dict} and returns a NumPy array of shape (len(joint_names),) in radians, with the wrist-roll calibration shift applied.
joint_names must include "wrist_roll"; the offset is applied at that index.
Action Processor Steps
LeaderActionToJointArrayStep
LeaderActionToJointArrayStep(joint_names: tuple[str, ...] = SO101_JOINT_NAMES)Convert a leader-arm dict ({joint}.pos floats in degrees) to an ordered NumPy array. Output unit matches the input (degrees in, degrees out); use DegreesToRadiansActionStep to convert.
DegreesToRadiansActionStep
DegreesToRadiansActionStep()Convert an action vector from degrees to radians. Operates on a NumPy array.
JointOffsetActionStep
JointOffsetActionStep(joint_index: int = 0, offset_rad: float = 0.0)Add a constant offset (in radians) to a single index of the action vector. Generic over the target joint, so it can be reused for any per-joint calibration shift.
Observation Processor Steps
Hwc2ChwImageObservationStep
Hwc2ChwImageObservationStep(image_keys: tuple[str, ...] = ())Convert HWC uint8 images at the listed keys to CHW torch.float32 tensors in [0, 1]. Other observation entries pass through unchanged. The step is image-key-generic so it works for any SO101-Nexus camera (current backends ship wrist and overhead).
Building a Custom Pipeline
For a custom env-observation pipeline, build a DataProcessorPipeline and pass it to LeRobotEnvWrapper:
import gymnasium as gym
from lerobot.processor import DataProcessorPipeline, RenameObservationsProcessorStep
from so101_nexus import JointPositions, PickConfig, WristCamera
from so101_nexus.processors import Hwc2ChwImageObservationStep, LeRobotEnvWrapper
config = PickConfig(
obs_mode="visual",
observations=[JointPositions(), WristCamera(width=224, height=224)],
)
pipeline = DataProcessorPipeline(
steps=[
RenameObservationsProcessorStep(
rename_map={
"state": "observation.state",
"wrist_camera": "observation.images.wrist",
}
),
Hwc2ChwImageObservationStep(image_keys=("observation.images.wrist",)),
]
)
env = LeRobotEnvWrapper(gym.make("MuJoCoPickLift-v1", config=config), pipeline=pipeline)