from pathlib import Path import pandas as pd from datasets import Dataset, DatasetDict from sklearn.model_selection import train_test_split import torch from torch.utils.data import Dataset as TorchDataset import numpy as np from PIL import Image from transformers import CLIPProcessor DATA_PATH = Path(__file__).parent.parent / "data" IMAGES_PATH = DATA_PATH / "imgs" LABELS_CSV = DATA_PATH / "labels.csv" def load_golden_dataset() -> pd.DataFrame: df = pd.read_csv(LABELS_CSV) # Convert image paths from /data/imgs/... to actual file paths df["image"] = df["image"].apply( lambda x: str(IMAGES_PATH / Path(x).name) ) # Verify files exist existing = df["image"].apply(lambda x: Path(x).exists()) missing_count = (~existing).sum() if missing_count > 0: print(f"Warning: {missing_count} image files not found") df = df[existing].copy() # Preprocess labels: combine UNCERTAIN with FAMILY_SAFE (0), SUGGESTIVE (1) # 0 = FAMILY_SAFE/UNCERTAIN (closer to 0 means FAMILY_SAFE/UNCERTAIN) # 1 = SUGGESTIVE (closer to 1 means SUGGESTIVE) df["label"] = df["choice"].apply( lambda x: 0 if x in ["FAMILY_SAFE", "UNCERTAIN"] else 1 ) return df def create_dataset_splits( train_size: float = 0.7, test_size: float = 0.15, val_size: float = 0.15, random_state: int = 42 ) -> DatasetDict: # Validate split sizes assert abs(train_size + test_size + val_size - 1.0) < 1e-6, \ "Split sizes must sum to 1.0" # Load data df = load_golden_dataset() print(f"Loaded {len(df)} golden self-labelled images") print("Original label distribution:") print(df["choice"].value_counts()) print("\nBinary label distribution (after preprocessing):") print(df["label"].value_counts()) print(" (0 = FAMILY_SAFE/UNCERTAIN, 1 = SUGGESTIVE)") # First split: train vs (test + val) # Stratify by binary label to maintain distribution train_df, temp_df = train_test_split( df, test_size=(test_size + val_size), stratify=df["label"], random_state=random_state ) # Second split: test vs val # Adjust test_size for the remaining data test_proportion = test_size / (test_size + val_size) test_df, val_df = train_test_split( temp_df, test_size=(1 - test_proportion), stratify=temp_df["label"], random_state=random_state ) print("\nSplit sizes:") print(f" Train: {len(train_df)} ({len(train_df)/len(df)*100:.1f}%)") print(f" Test: {len(test_df)} ({len(test_df)/len(df)*100:.1f}%)") print(f" val: {len(val_df)} ({len(val_df)/len(df)*100:.1f}%)") # Convert to HuggingFace Datasets train_ds = Dataset.from_pandas(train_df) test_ds = Dataset.from_pandas(test_df) val_ds = Dataset.from_pandas(val_df) # Create DatasetDict dataset_dict = DatasetDict({ "train": train_ds, "test": test_ds, "val": val_ds }) return dataset_dict def get_dataset( train_size: float = 0.7, test_size: float = 0.15, val_size: float = 0.15, random_state: int = 42 ) -> DatasetDict: return create_dataset_splits( train_size=train_size, test_size=test_size, val_size=val_size, random_state=random_state ) class ImageDataset(TorchDataset): """PyTorch Dataset for image classification.""" def __init__(self, image_paths: list[str], labels: np.ndarray, processor: CLIPProcessor): self.image_paths = image_paths self.labels = torch.tensor(labels, dtype=torch.long) self.processor = processor def __len__(self) -> int: return len(self.image_paths) def __getitem__(self, idx: int) -> tuple[dict, torch.Tensor]: # Single item fetching (PyTorch DataLoader handles batching automatically) img_path = self.image_paths[idx] image = Image.open(img_path).convert("RGB") # Process image with CLIP processor inputs = self.processor(images=image, return_tensors="pt") # Remove batch dimension from processor output pixel_values = inputs["pixel_values"].squeeze(0) label = self.labels[idx] return {"pixel_values": pixel_values}, label