"""Compute norm stats for the Kinova TeddyBear dataset without decoding videos. This matches the training-time state/action preprocessing for the custom `pi05_kinova_teddybear` config: - state stats are computed on raw `observation.state` - action stats are computed on 16-step action chunks after converting the first 6 dimensions from absolute targets to deltas relative to the current state while leaving the gripper dimension absolute """ from pathlib import Path import numpy as np import pandas as pd from openpi.shared import normalize from openpi.training import config as config_lib from openpi.training.kinova_lerobot_v3_dataset import locate_dataset_root def main() -> None: cfg = config_lib.get_config("pi05_kinova_teddybear") repo_id = "lsnu/TeddyBearKinovaTestSetLeRobot" root = locate_dataset_root() data_df = pd.concat( [pd.read_parquet(path) for path in sorted((root / "data").rglob("*.parquet"))], ignore_index=True, ).sort_values("index") state_stats = normalize.RunningStats() action_stats = normalize.RunningStats() horizon = cfg.model.action_horizon for _, ep_rows in data_df.groupby("episode_index", sort=True): states = np.stack(ep_rows["observation.state"].to_list()).astype(np.float32) actions = np.stack(ep_rows["action"].to_list()).astype(np.float32) length = len(states) state_stats.update(states) idx = np.arange(length)[:, None] + np.arange(horizon)[None, :] idx = np.clip(idx, 0, length - 1) action_chunks = actions[idx] action_chunks[..., :6] -= states[:, None, :6] action_stats.update(action_chunks) output_path = cfg.assets_dirs / repo_id norm_stats = { "state": state_stats.get_statistics(), "actions": action_stats.get_statistics(), } print(f"Writing stats to: {output_path}") normalize.save(output_path, norm_stats) if __name__ == "__main__": main()