|
|
from pathlib import Path |
|
|
import pandas as pd |
|
|
from datasets import Dataset, DatasetDict |
|
|
from sklearn.model_selection import train_test_split |
|
|
import torch |
|
|
from torch.utils.data import Dataset as TorchDataset |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from transformers import CLIPProcessor |
|
|
|
|
|
|
|
|
DATA_PATH = Path(__file__).parent.parent / "data" |
|
|
IMAGES_PATH = DATA_PATH / "imgs" |
|
|
LABELS_CSV = DATA_PATH / "labels.csv" |
|
|
|
|
|
|
|
|
def load_golden_dataset() -> pd.DataFrame: |
|
|
df = pd.read_csv(LABELS_CSV) |
|
|
|
|
|
|
|
|
df["image"] = df["image"].apply( |
|
|
lambda x: str(IMAGES_PATH / Path(x).name) |
|
|
) |
|
|
|
|
|
|
|
|
existing = df["image"].apply(lambda x: Path(x).exists()) |
|
|
missing_count = (~existing).sum() |
|
|
if missing_count > 0: |
|
|
print(f"Warning: {missing_count} image files not found") |
|
|
df = df[existing].copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df["label"] = df["choice"].apply( |
|
|
lambda x: 0 if x in ["FAMILY_SAFE", "UNCERTAIN"] else 1 |
|
|
) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def create_dataset_splits( |
|
|
train_size: float = 0.7, |
|
|
test_size: float = 0.15, |
|
|
val_size: float = 0.15, |
|
|
random_state: int = 42 |
|
|
) -> DatasetDict: |
|
|
|
|
|
assert abs(train_size + test_size + val_size - 1.0) < 1e-6, \ |
|
|
"Split sizes must sum to 1.0" |
|
|
|
|
|
|
|
|
df = load_golden_dataset() |
|
|
|
|
|
print(f"Loaded {len(df)} golden self-labelled images") |
|
|
print("Original label distribution:") |
|
|
print(df["choice"].value_counts()) |
|
|
print("\nBinary label distribution (after preprocessing):") |
|
|
print(df["label"].value_counts()) |
|
|
print(" (0 = FAMILY_SAFE/UNCERTAIN, 1 = SUGGESTIVE)") |
|
|
|
|
|
|
|
|
|
|
|
train_df, temp_df = train_test_split( |
|
|
df, |
|
|
test_size=(test_size + val_size), |
|
|
stratify=df["label"], |
|
|
random_state=random_state |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
test_proportion = test_size / (test_size + val_size) |
|
|
test_df, val_df = train_test_split( |
|
|
temp_df, |
|
|
test_size=(1 - test_proportion), |
|
|
stratify=temp_df["label"], |
|
|
random_state=random_state |
|
|
) |
|
|
|
|
|
print("\nSplit sizes:") |
|
|
print(f" Train: {len(train_df)} ({len(train_df)/len(df)*100:.1f}%)") |
|
|
print(f" Test: {len(test_df)} ({len(test_df)/len(df)*100:.1f}%)") |
|
|
print(f" val: {len(val_df)} ({len(val_df)/len(df)*100:.1f}%)") |
|
|
|
|
|
|
|
|
train_ds = Dataset.from_pandas(train_df) |
|
|
test_ds = Dataset.from_pandas(test_df) |
|
|
val_ds = Dataset.from_pandas(val_df) |
|
|
|
|
|
|
|
|
dataset_dict = DatasetDict({ |
|
|
"train": train_ds, |
|
|
"test": test_ds, |
|
|
"val": val_ds |
|
|
}) |
|
|
|
|
|
return dataset_dict |
|
|
|
|
|
|
|
|
def get_dataset( |
|
|
train_size: float = 0.7, |
|
|
test_size: float = 0.15, |
|
|
val_size: float = 0.15, |
|
|
random_state: int = 42 |
|
|
) -> DatasetDict: |
|
|
return create_dataset_splits( |
|
|
train_size=train_size, |
|
|
test_size=test_size, |
|
|
val_size=val_size, |
|
|
random_state=random_state |
|
|
) |
|
|
|
|
|
|
|
|
class ImageDataset(TorchDataset): |
|
|
"""PyTorch Dataset for image classification.""" |
|
|
|
|
|
def __init__(self, image_paths: list[str], labels: np.ndarray, processor: CLIPProcessor): |
|
|
self.image_paths = image_paths |
|
|
self.labels = torch.tensor(labels, dtype=torch.long) |
|
|
self.processor = processor |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self.image_paths) |
|
|
|
|
|
def __getitem__(self, idx: int) -> tuple[dict, torch.Tensor]: |
|
|
|
|
|
img_path = self.image_paths[idx] |
|
|
image = Image.open(img_path).convert("RGB") |
|
|
|
|
|
|
|
|
inputs = self.processor(images=image, return_tensors="pt") |
|
|
|
|
|
pixel_values = inputs["pixel_values"].squeeze(0) |
|
|
|
|
|
label = self.labels[idx] |
|
|
return {"pixel_values": pixel_values}, label |
|
|
|