| from typing import Iterator, Tuple, Any, Iterable, Optional, Union, List |
|
|
| import numpy as np |
| import tensorflow as tf |
| import tensorflow_datasets as tfds |
| import tensorflow_hub as hub |
| import pandas as pd |
| import json |
| import os |
| import glob |
| import cv2 |
|
|
| |
| |
|
|
|
|
| class LampeSearchDatasetConfig(tfds.core.BuilderConfig): |
| """Config for selecting a subset of samples by integer index (lampe_search_XXX).""" |
|
|
| def __init__( |
| self, |
| *, |
| sample_start: Optional[int] = None, |
| sample_end: Optional[int] = None, |
| **kwargs, |
| ): |
| super().__init__(**kwargs) |
| self.sample_start = sample_start |
| self.sample_end = sample_end |
|
|
|
|
| def _sample_index_from_json_path(json_path: str) -> Optional[int]: |
| """Extract integer index from .../lampe_search_XXX/metadata.json.""" |
| sample_dir = os.path.basename(os.path.dirname(json_path)) |
| if not sample_dir.startswith("lampe_search_"): |
| return None |
| try: |
| return int(sample_dir.split("_", 2)[2]) |
| except Exception: |
| return None |
|
|
|
|
| |
| class LampeSearchDataset(tfds.core.GeneratorBasedBuilder): |
| """DatasetBuilder for lampe_search dataset.""" |
|
|
| VERSION = tfds.core.Version('1.0.0') |
| RELEASE_NOTES = { |
| '1.0.0': 'Initial release.', |
| } |
|
|
| BUILDER_CONFIGS = [ |
| LampeSearchDatasetConfig( |
| name="all", |
| description="All samples (lampe_search_001 ... lampe_search_050, if present).", |
| version=VERSION, |
| sample_start=None, |
| sample_end=None, |
| ), |
| LampeSearchDatasetConfig( |
| name="samples_001_025", |
| description="Samples 001-025 only.", |
| version=VERSION, |
| sample_start=1, |
| sample_end=25, |
| ), |
| LampeSearchDatasetConfig( |
| name="samples_026_050", |
| description="Samples 026-050 only.", |
| version=VERSION, |
| sample_start=26, |
| sample_end=50, |
| ), |
| ] |
|
|
| DEFAULT_CONFIG_NAME = "all" |
|
|
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self._embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-large/5") |
|
|
| def _info(self) -> tfds.core.DatasetInfo: |
| """Dataset metadata (homepage, citation, ...).""" |
| return self.dataset_info_from_configs( |
| description=""" |
| Lamp Search Robot Manipulation Dataset |
| |
| This dataset contains robot demonstrations for a 4-degree-of-freedom (4-DoF) robotic system |
| performing lamp search tasks. Each episode consists of: |
| - **Robot Actions**: Joint trajectories recorded during lamp search demonstrations |
| - **Observations**: RGB camera frames synchronized with joint state trajectories |
| - **Language Instructions**: Natural language commands (default: "search for lamp" if empty) |
| - **Trajectories**: Joint positions recorded at ~21-22 Hz (Base, Joint2, Joint3, Joint4) |
| |
| **Data Processing:** |
| - Videos were recorded at 30 FPS |
| - Frames are extracted from frames/ directories |
| - Joint trajectories are synchronized with video frames |
| - Language instructions are embedded using Universal Sentence Encoder (512-dim) |
| |
| **Dataset Statistics:** |
| - Total Episodes: Multiple samples (lampe_search_001 to lampe_search_050) |
| - Control Method: Simulation Replay |
| - Video Duration: ~24 seconds per episode |
| - Trajectory Rate: ~21-22 Hz |
| """, |
| homepage='https://github.com/your-repo/lampe-search-dataset', |
| citation=""" |
| @misc{lampe_search_dataset, |
| title = {Lamp Search Robot Manipulation Dataset}, |
| author = {Dataset Creator}, |
| year = {2025}, |
| note = {Dataset for fine-tuning vision-language-action models} |
| } |
| """, |
| features=tfds.features.FeaturesDict({ |
| 'steps': tfds.features.Dataset({ |
| 'observation': tfds.features.FeaturesDict({ |
| 'image': tfds.features.Image( |
| shape=(None, None, 3), |
| dtype=np.uint8, |
| encoding_format='jpeg', |
| doc='Main camera RGB observation.', |
| ), |
| 'state': tfds.features.Tensor( |
| shape=(4,), |
| dtype=np.float32, |
| doc='Robot state: [Base, Joint2, Joint3, Joint4]', |
| ) |
| }), |
| 'action': tfds.features.Tensor( |
| shape=(4,), |
| dtype=np.float32, |
| doc='Robot action: [Base, Joint2, Joint3, Joint4]', |
| ), |
| 'discount': tfds.features.Scalar( |
| dtype=np.float32, |
| doc='Discount if provided, default to 1.' |
| ), |
| 'reward': tfds.features.Scalar( |
| dtype=np.float32, |
| doc='Reward if provided, 1 on final step for demos.' |
| ), |
| 'is_first': tfds.features.Scalar( |
| dtype=np.bool_, |
| doc='True on first step of the episode.' |
| ), |
| 'is_last': tfds.features.Scalar( |
| dtype=np.bool_, |
| doc='True on last step of the episode.' |
| ), |
| 'is_terminal': tfds.features.Scalar( |
| dtype=np.bool_, |
| doc='True on last step of the episode if it is a terminal step, True for demos.' |
| ), |
| 'language_instruction': tfds.features.Text( |
| doc='Language Instruction.' |
| ), |
| 'language_embedding': tfds.features.Tensor( |
| shape=(512,), |
| dtype=np.float32, |
| doc='Universal Sentence Encoder embedding.' |
| ), |
| }), |
| 'episode_metadata': tfds.features.FeaturesDict({ |
| 'file_path': tfds.features.Text( |
| doc='Path to the original data file.' |
| ), |
| 'sample_id': tfds.features.Text( |
| doc='Unique ID for the sample.' |
| ), |
| }), |
| })) |
| def _split_generators(self, dl_manager: tfds.download.DownloadManager): |
| """Define data splits.""" |
| |
| dataset_root = os.environ.get("LAMPE_SEARCH_DATASET_ROOT", "/workspace/Lamp_Search") |
|
|
| |
| all_json_paths = sorted(glob.glob(os.path.join(dataset_root, "lampe_search_*", "metadata.json"))) |
|
|
| cfg = self.builder_config |
| if cfg is not None and getattr(cfg, "sample_start", None) is not None and getattr(cfg, "sample_end", None) is not None: |
| start = int(cfg.sample_start) |
| end = int(cfg.sample_end) |
| filtered: List[str] = [] |
| for p in all_json_paths: |
| idx = _sample_index_from_json_path(p) |
| if idx is None: |
| continue |
| if start <= idx <= end: |
| filtered.append(p) |
| json_paths = filtered |
| else: |
| json_paths = all_json_paths |
|
|
| return { |
| 'train': self._generate_examples(path=json_paths), |
| } |
|
|
| def _generate_examples(self, path: Union[str, Iterable[str]]) -> Iterator[Tuple[str, Any]]: |
| """Generator of examples for each split.""" |
| |
| |
| if isinstance(path, str): |
| json_paths = sorted(glob.glob(path)) |
| else: |
| json_paths = sorted(list(path)) |
| |
| if not json_paths: |
| print(f"WARNING: No JSON files found at {path}. Check your path!") |
|
|
| for json_path in json_paths: |
| |
| base_dir = os.path.dirname(json_path) |
| |
| |
| try: |
| with open(json_path, 'r') as f: |
| meta = json.load(f) |
| except Exception as e: |
| print(f"Skipping {json_path}: Could not load JSON. Error: {e}") |
| continue |
|
|
| sample_id = meta.get('sample_id', 'unknown_id') |
| |
| instruction = meta.get('episode_instruction', '') or meta.get('instruction', '') |
| if not instruction or instruction.strip() == '': |
| instruction = 'search for lamp' |
| |
| |
| lang_embed = self._embed([instruction])[0].numpy() |
|
|
| |
| frames_dir = os.path.join(base_dir, meta.get('frames_dir', 'frames')) |
| csv_path = os.path.join(base_dir, meta.get('joint_trajectory_path', 'joint_trajectory.csv')) |
|
|
| |
| if not os.path.exists(frames_dir) or not os.path.exists(csv_path): |
| print(f"Skipping {sample_id}: Missing frames or CSV.") |
| continue |
|
|
| |
| try: |
| |
| traj_df = pd.read_csv(csv_path) |
| |
| |
| |
| traj_df = traj_df.rename(columns={ |
| 'base_joint': 'base', |
| 'joint2': 'j2', |
| 'joint3': 'j3', |
| 'joint4': 'j4' |
| }) |
| |
| |
| required_cols = ['timestamp', 'base', 'j2', 'j3', 'j4'] |
| if not all(col in traj_df.columns for col in required_cols): |
| print(f"Skipping {sample_id}: CSV missing required columns. Found: {traj_df.columns}") |
| continue |
| |
| except Exception as e: |
| print(f"Skipping {sample_id}: Error reading CSV. {e}") |
| continue |
|
|
| |
| traj_df = traj_df.sort_values('timestamp') |
| |
| |
| recording_start_time = traj_df['timestamp'].iloc[0] |
| start_offset = 0.0 |
| robot_action_start_time = recording_start_time + start_offset |
|
|
| |
| frame_files = sorted(glob.glob(os.path.join(frames_dir, "frame_*.jpg"))) |
| |
| if len(frame_files) == 0: |
| print(f"Skipping {sample_id}: No frames found in {frames_dir}") |
| continue |
|
|
| episode_steps = [] |
| fps = meta.get('fps', 30.0) |
|
|
| for i, frame_file in enumerate(frame_files): |
| |
| current_frame_time = robot_action_start_time + (i / fps) |
|
|
| |
| time_diffs = np.abs(traj_df['timestamp'] - current_frame_time) |
| nearest_idx = time_diffs.argmin() |
| row = traj_df.iloc[nearest_idx] |
| |
| state = np.array([row['base'], row['j2'], row['j3'], row['j4']], dtype=np.float32) |
|
|
| |
| |
| next_frame_time = current_frame_time + (1.0 / fps) |
| |
| time_diffs_next = np.abs(traj_df['timestamp'] - next_frame_time) |
| nearest_idx_next = time_diffs_next.argmin() |
| row_next = traj_df.iloc[nearest_idx_next] |
| |
| action = np.array([row_next['base'], row_next['j2'], row_next['j3'], row_next['j4']], dtype=np.float32) |
|
|
| |
| with open(frame_file, 'rb') as f_img: |
| image_bytes = f_img.read() |
| image_np = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) |
| image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) |
|
|
| |
| episode_steps.append({ |
| 'observation': { |
| 'image': image_rgb, |
| 'state': state, |
| }, |
| 'action': action, |
| 'discount': 1.0, |
| 'reward': float(i == (len(frame_files) - 1)), |
| 'is_first': i == 0, |
| 'is_last': i == (len(frame_files) - 1), |
| 'is_terminal': i == (len(frame_files) - 1), |
| 'language_instruction': instruction, |
| 'language_embedding': lang_embed, |
| }) |
|
|
| |
| if len(episode_steps) > 0: |
| yield sample_id, { |
| 'steps': episode_steps, |
| 'episode_metadata': { |
| 'file_path': json_path, |
| 'sample_id': sample_id |
| } |
| } |
|
|
| |
|
|
|
|