Spaces:
Build error
Build error
| from __future__ import annotations | |
| import logging | |
| from pathlib import PosixPath | |
| from typing import Any | |
| import cv2 | |
| import numpy as np | |
| import rerun as rr | |
| import torch | |
| from lerobot.common.datasets.lerobot_dataset import LeRobotDataset | |
| from PIL import Image | |
| from tqdm import tqdm | |
| logger = logging.getLogger(__name__) | |
| def get_frame( | |
| video_path: PosixPath, timestamp: float, video_cache: dict[PosixPath, tuple[np.ndarray, float]] | None = None | |
| ) -> np.ndarray: | |
| """ | |
| Extracts a specific frame from a video. | |
| `video_path`: path to the video. | |
| `timestamp`: timestamp of the wanted frame. | |
| `video_cache`: cache to prevent reading the same video file twice. | |
| """ | |
| if video_cache is None: | |
| video_cache = {} | |
| if video_path not in video_cache: | |
| cap = cv2.VideoCapture(str(video_path)) | |
| frames = [] | |
| while cap.isOpened(): | |
| success, frame = cap.read() | |
| if success: | |
| frames.append(frame) | |
| else: | |
| break | |
| frame_rate = cap.get(cv2.CAP_PROP_FPS) | |
| video_cache[video_path] = (frames, frame_rate) | |
| frames, frame_rate = video_cache[video_path] | |
| return frames[int(timestamp * frame_rate)] | |
| def to_rerun( | |
| column_name: str, | |
| value: Any, | |
| video_cache: dict[PosixPath, tuple[np.ndarray, float]] | None = None, | |
| videos_dir: PosixPath | None = None, | |
| ) -> Any: | |
| """Do our best to interpret the value and convert it to a Rerun-compatible archetype.""" | |
| if isinstance(value, Image.Image): | |
| if "depth" in column_name: | |
| return rr.DepthImage(value) | |
| else: | |
| return rr.Image(value) | |
| elif isinstance(value, np.ndarray): | |
| return rr.Tensor(value) | |
| elif isinstance(value, list): | |
| if isinstance(value[0], float): | |
| return rr.BarChart(value) | |
| else: | |
| return rr.TextDocument(str(value)) # Fallback to text | |
| elif isinstance(value, float) or isinstance(value, int): | |
| return rr.Scalar(value) | |
| elif isinstance(value, torch.Tensor): | |
| if value.dim() == 0: | |
| return rr.Scalar(value.item()) | |
| elif value.dim() == 1: | |
| return rr.BarChart(value) | |
| elif value.dim() == 2 and "depth" in column_name: | |
| return rr.DepthImage(value) | |
| elif value.dim() == 2: | |
| return rr.Image(value) | |
| elif value.dim() == 3 and (value.shape[2] == 3 or value.shape[2] == 4): | |
| return rr.Image(value) # Treat it as a RGB or RGBA image | |
| else: | |
| return rr.Tensor(value) | |
| elif isinstance(value, dict) and "path" in value and "timestamp" in value: | |
| path = (videos_dir or PosixPath("./")) / PosixPath(value["path"]) | |
| timestamp = value["timestamp"] | |
| return rr.Image(get_frame(path, timestamp, video_cache=video_cache)) | |
| else: | |
| return rr.TextDocument(str(value)) # Fallback to text | |
| def log_lerobot_dataset_to_rerun(dataset: LeRobotDataset, episode_index: int) -> None: | |
| # Special time-like columns for LeRobot datasets (https://huggingface.co/lerobot/): | |
| TIME_LIKE = {"index", "frame_id", "timestamp"} | |
| # Ignore these columns (again, LeRobot-specific): | |
| IGNORE = {"episode_data_index_from", "episode_data_index_to", "episode_id"} | |
| hf_ds_subset = dataset.hf_dataset.filter( | |
| lambda frame: "episode_index" not in frame or frame["episode_index"] == episode_index | |
| ) | |
| video_cache: dict[PosixPath, tuple[np.ndarray, float]] = {} | |
| for row in tqdm(hf_ds_subset): | |
| # Handle time-like columns first, since they set a state (time is an index in Rerun): | |
| for column_name in TIME_LIKE: | |
| if column_name in row: | |
| cell = row[column_name] | |
| if isinstance(cell, torch.Tensor) and cell.dim() == 0: | |
| cell = cell.item() | |
| if isinstance(cell, int): | |
| rr.set_time_sequence(column_name, cell) | |
| elif isinstance(cell, float): | |
| rr.set_time_seconds(column_name, cell) # assume seconds | |
| else: | |
| print(f"Unknown time-like column {column_name} with value {cell}") | |
| # Now log actual data columns: | |
| for column_name, cell in row.items(): | |
| if column_name in TIME_LIKE or column_name in IGNORE: | |
| continue | |
| else: | |
| rr.log( | |
| column_name, | |
| to_rerun(column_name, cell, video_cache=video_cache, videos_dir=dataset.videos_dir.parent), | |
| ) | |
| def log_dataset_to_rerun(dataset: Any) -> None: | |
| TIME_LIKE = {"index", "frame_id", "timestamp"} | |
| for row in tqdm(dataset): | |
| # Handle time-like columns first, since they set a state (time is an index in Rerun): | |
| for column_name in TIME_LIKE: | |
| if column_name in row: | |
| cell = row[column_name] | |
| if isinstance(cell, int): | |
| rr.set_time_sequence(column_name, cell) | |
| elif isinstance(cell, float): | |
| rr.set_time_seconds(column_name, cell) # assume seconds | |
| else: | |
| print(f"Unknown time-like column {column_name} with value {cell}") | |
| # Now log actual data columns: | |
| for column_name, cell in row.items(): | |
| if column_name in TIME_LIKE: | |
| continue | |
| rr.log(column_name, to_rerun(column_name, cell)) | |