CaptionIQ / src /preprocess.py
pavanpraneeth's picture
Upload folder using huggingface_hub
290f366 verified
Raw
History Blame Contribute Delete
10.4 kB
"""
CaptionIQ — Caption Preprocessing
Downloads Flickr8K dataset, cleans captions, builds vocabulary,
creates train/val/test splits, and saves everything to disk.
"""
import os
import re
import string
import pickle
import kagglehub
import numpy as np
from collections import Counter
from tensorflow.keras.preprocessing.text import Tokenizer
from tqdm import tqdm
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.config import (
DATA_DIR, FLICKR_IMAGES_DIR, FLICKR_TEXT_DIR,
CAPTIONS_FILE, TOKENIZER_FILE,
TRAIN_IMAGES_FILE, VAL_IMAGES_FILE, TEST_IMAGES_FILE,
START_TOKEN, END_TOKEN,
TRAIN_SIZE, VAL_SIZE, TEST_SIZE,
MIN_WORD_FREQ,
)
def download_flickr8k():
"""
Download Flickr8K dataset using kagglehub.
Falls back to manual instructions if kagglehub fails.
"""
print("=" * 60)
print("Downloading Flickr8K dataset via kagglehub...")
print("=" * 60)
try:
path = kagglehub.dataset_download("adityajn105/flickr8k")
print(f"Dataset downloaded to: {path}")
return path
except Exception as e:
print(f"\nkagglehub download failed: {e}")
print("\nPlease download the Flickr8K dataset manually:")
print(" 1. Go to https://www.kaggle.com/datasets/adityajn105/flickr8k")
print(" 2. Download and extract to:")
print(f" Images → {FLICKR_IMAGES_DIR}")
print(f" Captions → {FLICKR_TEXT_DIR}")
return None
def setup_dataset_dirs(kaggle_path: str = None):
"""
Ensure dataset directories exist and link downloaded data.
"""
os.makedirs(FLICKR_IMAGES_DIR, exist_ok=True)
os.makedirs(FLICKR_TEXT_DIR, exist_ok=True)
if kaggle_path:
# kagglehub downloads to a cache dir; copy/link files
import shutil
# Find images directory
for root, dirs, files in os.walk(kaggle_path):
for f in files:
src = os.path.join(root, f)
if f.endswith(".jpg") or f.endswith(".png"):
dst = os.path.join(FLICKR_IMAGES_DIR, f)
if not os.path.exists(dst):
shutil.copy2(src, dst)
elif f.endswith(".txt") or f.endswith(".csv"):
dst = os.path.join(FLICKR_TEXT_DIR, f)
if not os.path.exists(dst):
shutil.copy2(src, dst)
print(f"Images copied to: {FLICKR_IMAGES_DIR}")
print(f"Text files copied to: {FLICKR_TEXT_DIR}")
def load_raw_captions() -> dict:
"""
Parse raw captions file into {image_id: [caption1, caption2, ...]}.
Supports both Flickr8k.token.txt format and captions.txt CSV format.
"""
captions = {}
# Try Flickr8k.token.txt format first
token_file = os.path.join(FLICKR_TEXT_DIR, "Flickr8k.token.txt")
if os.path.exists(token_file):
print(f"Loading captions from: {token_file}")
with open(token_file, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# Format: image_name#caption_idx\tcaption_text
parts = line.split("\t", 1)
if len(parts) != 2:
continue
img_caption_id, caption = parts
image_id = img_caption_id.split("#")[0]
if image_id not in captions:
captions[image_id] = []
captions[image_id].append(caption)
return captions
# Try captions.txt CSV format (kaggle version)
captions_csv = os.path.join(FLICKR_TEXT_DIR, "captions.txt")
if os.path.exists(captions_csv):
print(f"Loading captions from: {captions_csv}")
with open(captions_csv, "r") as f:
header = f.readline() # skip header
for line in f:
line = line.strip()
if not line:
continue
# Format: image,caption
parts = line.split(",", 1)
if len(parts) != 2:
continue
image_id, caption = parts
if image_id not in captions:
captions[image_id] = []
captions[image_id].append(caption)
return captions
raise FileNotFoundError(
f"No captions file found. Expected one of:\n"
f" {token_file}\n"
f" {captions_csv}\n"
f"Please download the Flickr8K dataset first."
)
def clean_caption(caption: str) -> str:
"""
Clean a single caption:
- Lowercase
- Remove punctuation / digits / special chars
- Remove single-character words (except 'a')
- Strip extra whitespace
"""
caption = caption.lower()
# Remove digits
caption = re.sub(r"\d+", "", caption)
# Remove punctuation
caption = caption.translate(str.maketrans("", "", string.punctuation))
# Remove single characters except 'a'
caption = " ".join(w for w in caption.split() if len(w) > 1 or w == "a")
# Strip extra whitespace
caption = caption.strip()
return caption
def clean_all_captions(captions: dict) -> dict:
"""
Clean all captions and add start/end tokens.
"""
cleaned = {}
for image_id, caption_list in tqdm(captions.items(), desc="Cleaning captions"):
cleaned[image_id] = []
for cap in caption_list:
cap = clean_caption(cap)
cap = f"{START_TOKEN} {cap} {END_TOKEN}"
cleaned[image_id].append(cap)
return cleaned
def save_captions(captions: dict, filepath: str):
"""
Save cleaned captions to file (image_id<tab>caption per line).
"""
with open(filepath, "w") as f:
for image_id, caption_list in captions.items():
for cap in caption_list:
f.write(f"{image_id}\t{cap}\n")
print(f"Saved cleaned captions to: {filepath}")
def create_splits(captions: dict):
"""
Split image IDs into train / val / test sets.
Uses Flickr8K official split files if available, otherwise random split.
"""
all_images = sorted(captions.keys())
# Try official split files
train_file = os.path.join(FLICKR_TEXT_DIR, "Flickr_8k.trainImages.txt")
val_file = os.path.join(FLICKR_TEXT_DIR, "Flickr_8k.devImages.txt")
test_file = os.path.join(FLICKR_TEXT_DIR, "Flickr_8k.testImages.txt")
if all(os.path.exists(f) for f in [train_file, val_file, test_file]):
print("Using official Flickr8K split files...")
train = _load_split_file(train_file, all_images)
val = _load_split_file(val_file, all_images)
test = _load_split_file(test_file, all_images)
else:
print("Official splits not found — creating random splits...")
np.random.seed(42)
np.random.shuffle(all_images)
train = all_images[:TRAIN_SIZE]
val = all_images[TRAIN_SIZE:TRAIN_SIZE + VAL_SIZE]
test = all_images[TRAIN_SIZE + VAL_SIZE:TRAIN_SIZE + VAL_SIZE + TEST_SIZE]
# Save split files
_save_split(train, TRAIN_IMAGES_FILE, "Train")
_save_split(val, VAL_IMAGES_FILE, "Val")
_save_split(test, TEST_IMAGES_FILE, "Test")
return train, val, test
def _load_split_file(filepath: str, valid_images: list) -> list:
"""Load image IDs from an official split file."""
valid_set = set(valid_images)
with open(filepath, "r") as f:
return [line.strip() for line in f if line.strip() in valid_set]
def _save_split(image_ids: list, filepath: str, name: str):
"""Save a list of image IDs to file."""
with open(filepath, "w") as f:
for img_id in image_ids:
f.write(f"{img_id}\n")
print(f" {name}: {len(image_ids)} images → {filepath}")
def build_tokenizer(captions: dict, train_images: list) -> Tokenizer:
"""
Fit a Keras Tokenizer on training captions only.
Filters vocabulary to keep only words with frequency >= MIN_WORD_FREQ.
Save to disk as pickle.
"""
# Collect all training captions
train_captions = []
train_set = set(train_images)
for img_id in train_set:
if img_id in captions:
train_captions.extend(captions[img_id])
# Fit tokenizer (word_index is ordered by frequency)
tokenizer = Tokenizer()
tokenizer.fit_on_texts(train_captions)
total_words = len(tokenizer.word_index)
# Filter: keep only words with frequency >= MIN_WORD_FREQ
freq_words = sum(
1 for c in tokenizer.word_counts.values()
if c >= MIN_WORD_FREQ
)
print(f" Total unique words: {total_words}")
print(f" Words with freq >= {MIN_WORD_FREQ}: {freq_words}")
print(f" Filtered out: {total_words - freq_words} rare words")
# num_words keeps the top (num_words - 1) words by frequency
tokenizer.num_words = freq_words + 1 # +1 for padding index 0
vocab_size = tokenizer.num_words
print(f"Vocabulary size: {vocab_size}")
# Compute max caption length
max_length = max(len(cap.split()) for cap in train_captions)
print(f"Max caption length: {max_length}")
# Save tokenizer
with open(TOKENIZER_FILE, "wb") as f:
pickle.dump(tokenizer, f)
print(f"Tokenizer saved to: {TOKENIZER_FILE}")
return tokenizer
def main():
"""Run the full preprocessing pipeline."""
print("=" * 60)
print(" CaptionIQ — Preprocessing Pipeline")
print("=" * 60)
# Step 1: Download dataset
kaggle_path = download_flickr8k()
setup_dataset_dirs(kaggle_path)
# Step 2: Load and clean captions
raw_captions = load_raw_captions()
print(f"Loaded captions for {len(raw_captions)} images")
cleaned_captions = clean_all_captions(raw_captions)
save_captions(cleaned_captions, CAPTIONS_FILE)
# Step 3: Create train/val/test splits
train, val, test = create_splits(cleaned_captions)
# Step 4: Build tokenizer on training data
tokenizer = build_tokenizer(cleaned_captions, train)
print("\n" + "=" * 60)
print(" Preprocessing complete!")
print("=" * 60)
print(f" Cleaned captions: {CAPTIONS_FILE}")
print(f" Tokenizer: {TOKENIZER_FILE}")
print(f" Train split: {len(train)} images")
print(f" Val split: {len(val)} images")
print(f" Test split: {len(test)} images")
if __name__ == "__main__":
main()