Spaces:
Sleeping
Sleeping
| """ | |
| IPAD Dataset Loader for HuggingFace Infrastructure | |
| Loads data from HF Hub and provides PyTorch DataLoader compatible interface | |
| """ | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import zipfile | |
| from huggingface_hub import hf_hub_download | |
| import os | |
| from typing import List, Tuple, Optional | |
| import random | |
| class IPADVideoDataset(Dataset): | |
| """ | |
| IPAD Video Anomaly Detection Dataset | |
| Args: | |
| root_dir: Path to extracted dataset | |
| device_name: Device ID (e.g., "S01", "S02", ..., "S12") | |
| split: "train" or "test" | |
| clip_length: Number of frames per clip (default: 16) | |
| frame_size: Tuple of (height, width) for resizing (default: (256, 256)) | |
| stride: Frame sampling stride (default: 1) | |
| normalize: Whether to normalize frames to [-1, 1] | |
| """ | |
| def __init__( | |
| self, | |
| root_dir: str, | |
| device_name: str = "S01", | |
| split: str = "train", | |
| clip_length: int = 16, | |
| frame_size: Tuple[int, int] = (256, 256), | |
| stride: int = 1, | |
| normalize: bool = True | |
| ): | |
| self.root_dir = Path(root_dir) | |
| self.device_name = device_name | |
| self.split = split | |
| self.clip_length = clip_length | |
| self.frame_size = frame_size | |
| self.stride = stride | |
| self.normalize = normalize | |
| # Construct path to device frames | |
| # Note: The dataset uses "training" and "testing", not "train" and "test" | |
| split_folder = "training" if split == "train" else "testing" | |
| self.device_path = self.root_dir / device_name / split_folder / "frames" | |
| if not self.device_path.exists(): | |
| raise ValueError(f"Dataset path not found: {self.device_path}") | |
| # Get all video directories | |
| self.video_dirs = sorted([d for d in self.device_path.iterdir() if d.is_dir()]) | |
| # Build index of all valid clips | |
| self.clips = [] | |
| for video_dir in self.video_dirs: | |
| frames = sorted(list(video_dir.glob("*.jpg")) + list(video_dir.glob("*.png"))) | |
| num_frames = len(frames) | |
| # Create clips with stride | |
| for start_idx in range(0, num_frames - clip_length + 1, stride): | |
| self.clips.append({ | |
| 'video_dir': video_dir, | |
| 'start_idx': start_idx, | |
| 'frames': frames[start_idx:start_idx + clip_length] | |
| }) | |
| print(f"Loaded {len(self.clips)} clips from {device_name}/{split}") | |
| def __len__(self) -> int: | |
| return len(self.clips) | |
| def __getitem__(self, idx: int) -> torch.Tensor: | |
| clip_info = self.clips[idx] | |
| frames = [] | |
| # Load and process each frame | |
| for frame_path in clip_info['frames']: | |
| frame = cv2.imread(str(frame_path)) | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame = cv2.resize(frame, self.frame_size) | |
| # Normalize to [0, 1] | |
| frame = frame.astype(np.float32) / 255.0 | |
| # Normalize to [-1, 1] if requested | |
| if self.normalize: | |
| frame = (frame - 0.5) / 0.5 | |
| frames.append(frame) | |
| # Convert to tensor: [T, H, W, C] -> [C, T, H, W] | |
| frames = np.stack(frames, axis=0) # [T, H, W, C] | |
| frames = torch.from_numpy(frames).permute(3, 0, 1, 2) # [C, T, H, W] | |
| return frames | |
| def download_and_extract_dataset(cache_dir: str = "./cache") -> Path: | |
| """ | |
| Download IPAD dataset from HF Hub and extract it | |
| The zip contains: IPAD_dataset/S01/training/frames/... | |
| We return the path to IPAD_dataset directory | |
| Returns: | |
| Path to extracted dataset directory (IPAD_dataset) | |
| """ | |
| cache_dir = Path(cache_dir) | |
| cache_dir.mkdir(exist_ok=True, parents=True) | |
| extracted_path = cache_dir / "IPAD_dataset" | |
| # Check if already extracted | |
| if extracted_path.exists() and (extracted_path / "S01" / "training" / "frames").exists(): | |
| print(f"✅ Dataset already extracted at {extracted_path}") | |
| return extracted_path | |
| print("📥 Downloading dataset from HF Hub...") | |
| zip_path = hf_hub_download( | |
| repo_id="MSherbinii/ipad-industrial-anomaly", | |
| filename="ipad_dataset.zip", | |
| repo_type="dataset", | |
| cache_dir=str(cache_dir) | |
| ) | |
| print(f"📦 Extracting dataset to {cache_dir}...") | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(cache_dir) | |
| # Verify extraction | |
| if not extracted_path.exists(): | |
| raise ValueError(f"Expected {extracted_path} after extraction, but not found") | |
| if not (extracted_path / "S01" / "training" / "frames").exists(): | |
| raise ValueError(f"Dataset structure incorrect. Missing S01/training/frames in {extracted_path}") | |
| print(f"✅ Dataset extracted to {extracted_path}") | |
| return extracted_path | |
| def create_dataloaders( | |
| dataset_path: str, | |
| device_name: str = "S01", | |
| batch_size: int = 4, | |
| num_workers: int = 4, | |
| clip_length: int = 16, | |
| frame_size: Tuple[int, int] = (256, 256) | |
| ) -> Tuple[DataLoader, DataLoader]: | |
| """ | |
| Create train and test DataLoaders for a specific device | |
| Args: | |
| dataset_path: Path to extracted IPAD dataset | |
| device_name: Device ID (e.g., "S01") | |
| batch_size: Batch size for DataLoader | |
| num_workers: Number of worker processes | |
| clip_length: Frames per clip | |
| frame_size: Frame dimensions | |
| Returns: | |
| Tuple of (train_loader, test_loader) | |
| """ | |
| train_dataset = IPADVideoDataset( | |
| root_dir=dataset_path, | |
| device_name=device_name, | |
| split="train", | |
| clip_length=clip_length, | |
| frame_size=frame_size, | |
| stride=clip_length // 2 # 50% overlap for training | |
| ) | |
| test_dataset = IPADVideoDataset( | |
| root_dir=dataset_path, | |
| device_name=device_name, | |
| split="test", | |
| clip_length=clip_length, | |
| frame_size=frame_size, | |
| stride=clip_length # No overlap for testing | |
| ) | |
| train_loader = DataLoader( | |
| train_dataset, | |
| batch_size=batch_size, | |
| shuffle=True, | |
| num_workers=num_workers, | |
| pin_memory=True, | |
| drop_last=True | |
| ) | |
| test_loader = DataLoader( | |
| test_dataset, | |
| batch_size=batch_size, | |
| shuffle=False, | |
| num_workers=num_workers, | |
| pin_memory=True, | |
| drop_last=False | |
| ) | |
| return train_loader, test_loader | |
| # Device name mappings | |
| DEVICE_NAMES = [ | |
| "S01", "S02", "S03", "S04", "S05", "S06", | |
| "S07", "S08", "S09", "S10", "S11", "S12", | |
| "R01", "R02", "R03", "R04" | |
| ] | |
| SYNTHETIC_DEVICES = [f"S{i:02d}" for i in range(1, 13)] | |
| REAL_DEVICES = [f"R{i:02d}" for i in range(1, 5)] | |