kavinrajkrupsurge's picture
Upload folder using huggingface_hub
85c5c21 verified
from typing import Iterator, Tuple, Any, Iterable, Optional, Union, List
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import pandas as pd
import json
import os
import glob
import cv2
# NOTE: This builder is patterned after `keyboard_controlled_dataset/keyboard_controlled_dataset.py` and follows the
# same episode/step structure and synchronization method for the Lamp_Search dataset.
class LampeSearchDatasetConfig(tfds.core.BuilderConfig):
"""Config for selecting a subset of samples by integer index (lampe_search_XXX)."""
def __init__(
self,
*,
sample_start: Optional[int] = None,
sample_end: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
self.sample_start = sample_start
self.sample_end = sample_end
def _sample_index_from_json_path(json_path: str) -> Optional[int]:
"""Extract integer index from .../lampe_search_XXX/metadata.json."""
sample_dir = os.path.basename(os.path.dirname(json_path))
if not sample_dir.startswith("lampe_search_"):
return None
try:
return int(sample_dir.split("_", 2)[2])
except Exception:
return None
# Dataset Builder Class for lampe_search dataset
class LampeSearchDataset(tfds.core.GeneratorBasedBuilder):
"""DatasetBuilder for lampe_search dataset."""
VERSION = tfds.core.Version('1.0.0')
RELEASE_NOTES = {
'1.0.0': 'Initial release.',
}
BUILDER_CONFIGS = [
LampeSearchDatasetConfig(
name="all",
description="All samples (lampe_search_001 ... lampe_search_050, if present).",
version=VERSION,
sample_start=None,
sample_end=None,
),
LampeSearchDatasetConfig(
name="samples_001_025",
description="Samples 001-025 only.",
version=VERSION,
sample_start=1,
sample_end=25,
),
LampeSearchDatasetConfig(
name="samples_026_050",
description="Samples 026-050 only.",
version=VERSION,
sample_start=26,
sample_end=50,
),
]
DEFAULT_CONFIG_NAME = "all"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-large/5")
def _info(self) -> tfds.core.DatasetInfo:
"""Dataset metadata (homepage, citation, ...)."""
return self.dataset_info_from_configs(
description="""
Lamp Search Robot Manipulation Dataset
This dataset contains robot demonstrations for a 4-degree-of-freedom (4-DoF) robotic system
performing lamp search tasks. Each episode consists of:
- **Robot Actions**: Joint trajectories recorded during lamp search demonstrations
- **Observations**: RGB camera frames synchronized with joint state trajectories
- **Language Instructions**: Natural language commands (default: "search for lamp" if empty)
- **Trajectories**: Joint positions recorded at ~21-22 Hz (Base, Joint2, Joint3, Joint4)
**Data Processing:**
- Videos were recorded at 30 FPS
- Frames are extracted from frames/ directories
- Joint trajectories are synchronized with video frames
- Language instructions are embedded using Universal Sentence Encoder (512-dim)
**Dataset Statistics:**
- Total Episodes: Multiple samples (lampe_search_001 to lampe_search_050)
- Control Method: Simulation Replay
- Video Duration: ~24 seconds per episode
- Trajectory Rate: ~21-22 Hz
""",
homepage='https://github.com/your-repo/lampe-search-dataset',
citation="""
@misc{lampe_search_dataset,
title = {Lamp Search Robot Manipulation Dataset},
author = {Dataset Creator},
year = {2025},
note = {Dataset for fine-tuning vision-language-action models}
}
""",
features=tfds.features.FeaturesDict({
'steps': tfds.features.Dataset({
'observation': tfds.features.FeaturesDict({
'image': tfds.features.Image(
shape=(None, None, 3), # We accept any size here; we resize in transform.py later
dtype=np.uint8,
encoding_format='jpeg', # Your frames are .jpg
doc='Main camera RGB observation.',
),
'state': tfds.features.Tensor(
shape=(4,), # STRICTLY 4-DoF: Base, J2, J3, J4
dtype=np.float32,
doc='Robot state: [Base, Joint2, Joint3, Joint4]',
)
}),
'action': tfds.features.Tensor(
shape=(4,), # STRICTLY 4-DoF
dtype=np.float32,
doc='Robot action: [Base, Joint2, Joint3, Joint4]',
),
'discount': tfds.features.Scalar(
dtype=np.float32,
doc='Discount if provided, default to 1.'
),
'reward': tfds.features.Scalar(
dtype=np.float32,
doc='Reward if provided, 1 on final step for demos.'
),
'is_first': tfds.features.Scalar(
dtype=np.bool_,
doc='True on first step of the episode.'
),
'is_last': tfds.features.Scalar(
dtype=np.bool_,
doc='True on last step of the episode.'
),
'is_terminal': tfds.features.Scalar(
dtype=np.bool_,
doc='True on last step of the episode if it is a terminal step, True for demos.'
),
'language_instruction': tfds.features.Text(
doc='Language Instruction.'
),
'language_embedding': tfds.features.Tensor(
shape=(512,),
dtype=np.float32,
doc='Universal Sentence Encoder embedding.'
),
}),
'episode_metadata': tfds.features.FeaturesDict({
'file_path': tfds.features.Text(
doc='Path to the original data file.'
),
'sample_id': tfds.features.Text(
doc='Unique ID for the sample.'
),
}),
}))
def _split_generators(self, dl_manager: tfds.download.DownloadManager):
"""Define data splits."""
# Use the absolute path to your dataset directory
dataset_root = os.environ.get("LAMPE_SEARCH_DATASET_ROOT", "/workspace/Lamp_Search")
# Find all metadata.json files then optionally filter by sample index range.
all_json_paths = sorted(glob.glob(os.path.join(dataset_root, "lampe_search_*", "metadata.json")))
cfg = self.builder_config
if cfg is not None and getattr(cfg, "sample_start", None) is not None and getattr(cfg, "sample_end", None) is not None:
start = int(cfg.sample_start)
end = int(cfg.sample_end)
filtered: List[str] = []
for p in all_json_paths:
idx = _sample_index_from_json_path(p)
if idx is None:
continue
if start <= idx <= end:
filtered.append(p)
json_paths = filtered
else:
json_paths = all_json_paths
return {
'train': self._generate_examples(path=json_paths),
}
def _generate_examples(self, path: Union[str, Iterable[str]]) -> Iterator[Tuple[str, Any]]:
"""Generator of examples for each split."""
# 1. Find all metadata.json files based on the wildcard path (or explicit list of paths)
if isinstance(path, str):
json_paths = sorted(glob.glob(path))
else:
json_paths = sorted(list(path))
if not json_paths:
print(f"WARNING: No JSON files found at {path}. Check your path!")
for json_path in json_paths:
# --- PREPARATION ---
base_dir = os.path.dirname(json_path)
# Load Metadata
try:
with open(json_path, 'r') as f:
meta = json.load(f)
except Exception as e:
print(f"Skipping {json_path}: Could not load JSON. Error: {e}")
continue
sample_id = meta.get('sample_id', 'unknown_id')
# Handle instruction field - check both 'episode_instruction' and 'instruction' for compatibility
instruction = meta.get('episode_instruction', '') or meta.get('instruction', '')
if not instruction or instruction.strip() == '':
instruction = 'search for lamp'
# Calculate Embedding (once per episode)
lang_embed = self._embed([instruction])[0].numpy()
# Define Paths - lampe_search uses 'frames/' directory
frames_dir = os.path.join(base_dir, meta.get('frames_dir', 'frames'))
csv_path = os.path.join(base_dir, meta.get('joint_trajectory_path', 'joint_trajectory.csv'))
# Check if files exist
if not os.path.exists(frames_dir) or not os.path.exists(csv_path):
print(f"Skipping {sample_id}: Missing frames or CSV.")
continue
# --- LOAD TRAJECTORY ---
try:
# Read CSV using the header provided in the file
traj_df = pd.read_csv(csv_path)
# Rename columns to match the variables we use below
# MAPPING: Your CSV Header -> Our Internal Name
traj_df = traj_df.rename(columns={
'base_joint': 'base',
'joint2': 'j2',
'joint3': 'j3',
'joint4': 'j4'
})
# Verify required columns exist
required_cols = ['timestamp', 'base', 'j2', 'j3', 'j4']
if not all(col in traj_df.columns for col in required_cols):
print(f"Skipping {sample_id}: CSV missing required columns. Found: {traj_df.columns}")
continue
except Exception as e:
print(f"Skipping {sample_id}: Error reading CSV. {e}")
continue
# Sort by timestamp just in case
traj_df = traj_df.sort_values('timestamp')
# --- SYNCHRONIZATION LOGIC ---
recording_start_time = traj_df['timestamp'].iloc[0]
start_offset = 0.0
robot_action_start_time = recording_start_time + start_offset
# --- FRAME LOADING ---
frame_files = sorted(glob.glob(os.path.join(frames_dir, "frame_*.jpg")))
if len(frame_files) == 0:
print(f"Skipping {sample_id}: No frames found in {frames_dir}")
continue
episode_steps = []
fps = meta.get('fps', 30.0)
for i, frame_file in enumerate(frame_files):
# 1. Calculate the exact time of CURRENT frame
current_frame_time = robot_action_start_time + (i / fps)
# 2. Get CURRENT STATE (Observation)
time_diffs = np.abs(traj_df['timestamp'] - current_frame_time)
nearest_idx = time_diffs.argmin()
row = traj_df.iloc[nearest_idx]
state = np.array([row['base'], row['j2'], row['j3'], row['j4']], dtype=np.float32)
# 3. Define ACTION (Target State at t+1)
# We look ahead by 1 frame duration to find where the robot SHOULD be
next_frame_time = current_frame_time + (1.0 / fps)
time_diffs_next = np.abs(traj_df['timestamp'] - next_frame_time)
nearest_idx_next = time_diffs_next.argmin()
row_next = traj_df.iloc[nearest_idx_next]
action = np.array([row_next['base'], row_next['j2'], row_next['j3'], row_next['j4']], dtype=np.float32)
# 4. Load Image
with open(frame_file, 'rb') as f_img:
image_bytes = f_img.read()
image_np = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR)
image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
# 5. Build Step Dictionary
episode_steps.append({
'observation': {
'image': image_rgb,
'state': state,
},
'action': action, # NOW CORRECT: This is the target position
'discount': 1.0,
'reward': float(i == (len(frame_files) - 1)),
'is_first': i == 0,
'is_last': i == (len(frame_files) - 1),
'is_terminal': i == (len(frame_files) - 1),
'language_instruction': instruction,
'language_embedding': lang_embed,
})
# --- YIELD EPISODE ---
if len(episode_steps) > 0:
yield sample_id, {
'steps': episode_steps,
'episode_metadata': {
'file_path': json_path,
'sample_id': sample_id
}
}