Spaces:
Sleeping
Sleeping
| """ | |
| CaptionIQ — Caption Inference | |
| Greedy search and beam search decoding for generating captions from images. | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import numpy as np | |
| from tensorflow.keras.models import load_model | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| from tensorflow.keras.preprocessing.image import load_img, img_to_array | |
| from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input as vgg16_preprocess | |
| from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input as vgg19_preprocess | |
| from tensorflow.keras.models import Model as KerasModel | |
| # ── Keras compatibility patch ────────────────────────────────────── | |
| # Models saved with newer Keras include 'quantization_config' in every | |
| # layer config. Older Keras versions reject the unknown kwarg. | |
| # The Embedding layer's mask_zero=True also serialises a NotEqual op | |
| # that the legacy h5 loader can't resolve by name. | |
| import keras.src.ops.operation as _keras_op | |
| from keras.src.ops.numpy import NotEqual as _NotEqual | |
| _orig_from_config = _keras_op.Operation.from_config.__func__ | |
| # type: ignore[misc] | |
| def _patched_from_config(cls, config): # noqa: N805 | |
| config.pop("quantization_config", None) | |
| return _orig_from_config(cls, config) | |
| _keras_op.Operation.from_config = _patched_from_config | |
| from src.model import BahdanauAttention | |
| _CUSTOM_OBJECTS = {"NotEqual": _NotEqual, "BahdanauAttention": BahdanauAttention} | |
| # ─────────────────────────────────────────────────────────────────── | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from src.config import ( | |
| IMAGE_SIZE, BEAM_WIDTH, | |
| VGG16_MODEL_FILE, VGG19_MODEL_FILE, | |
| TOKENIZER_FILE, START_TOKEN, END_TOKEN, | |
| ) | |
| from src.utils import load_tokenizer, word_for_id | |
| def _softmax(values): | |
| """Stable softmax for confidence normalization.""" | |
| if not values: | |
| return [] | |
| arr = np.array(values, dtype=np.float64) | |
| arr = arr - np.max(arr) | |
| exp_arr = np.exp(arr) | |
| denom = np.sum(exp_arr) | |
| if denom <= 0: | |
| return [0.0] * len(values) | |
| return (exp_arr / denom).tolist() | |
| def _clean_caption_tokens(words): | |
| """ | |
| Remove degenerate repetition patterns from generated token list. | |
| Keeps semantics while avoiding outputs like: | |
| "dog and dog and dog and ..." | |
| """ | |
| if not words: | |
| return [] | |
| # 1) Remove long runs of the same token. | |
| collapsed = [words[0]] | |
| for w in words[1:]: | |
| if len(collapsed) >= 2 and collapsed[-1] == w and collapsed[-2] == w: | |
| continue | |
| collapsed.append(w) | |
| # 2) Break repeated bigram loops. | |
| cleaned = [] | |
| for w in collapsed: | |
| cleaned.append(w) | |
| if len(cleaned) >= 6: | |
| b1 = tuple(cleaned[-2:]) | |
| b2 = tuple(cleaned[-4:-2]) | |
| b3 = tuple(cleaned[-6:-4]) | |
| if b1 == b2 == b3: | |
| cleaned = cleaned[:-2] | |
| break | |
| return cleaned | |
| def _ending_quality_penalty(words): | |
| """ | |
| Penalize incomplete or low-quality endings so beam search prefers | |
| syntactically complete captions. | |
| """ | |
| if not words: | |
| return 2.0 | |
| penalty = 0.0 | |
| weak_endings = { | |
| "a", "an", "the", "in", "on", "at", "of", "to", "for", | |
| "with", "by", "from", "and", "or", "but", "as", | |
| } | |
| if words[-1] in weak_endings: | |
| penalty += 0.9 | |
| if len(words) < 5: | |
| penalty += 0.35 | |
| # Penalize excessive duplicate words. | |
| unique_ratio = len(set(words)) / max(len(words), 1) | |
| if unique_ratio < 0.6: | |
| penalty += 0.25 | |
| return penalty | |
| def greedy_search(model, tokenizer, feature: np.ndarray, max_length: int) -> str: | |
| """ | |
| Generate a caption using greedy search (pick highest-probability word each step). | |
| Args: | |
| model: Trained captioning model | |
| tokenizer: Fitted Keras tokenizer | |
| feature: Image feature vector of shape (4096,) | |
| max_length: Maximum caption length | |
| Returns: | |
| Generated caption string (without start/end tokens) | |
| """ | |
| in_text = START_TOKEN | |
| previous_word = None | |
| for _ in range(max_length): | |
| sequence = tokenizer.texts_to_sequences([in_text])[0] | |
| sequence = pad_sequences([sequence], maxlen=max_length, padding="post") | |
| # Predict next word probabilities | |
| yhat = model.predict([np.expand_dims(feature, 0), sequence], verbose=0)[0] | |
| ranked = np.argsort(yhat)[::-1] | |
| word = None | |
| for idx in ranked: | |
| candidate = word_for_id(int(idx), tokenizer) | |
| if candidate in (None, START_TOKEN): | |
| continue | |
| # Avoid immediate repeated tokens when alternatives exist. | |
| if previous_word is not None and candidate == previous_word: | |
| continue | |
| word = candidate | |
| break | |
| if word is None or word == END_TOKEN: | |
| break | |
| previous_word = word | |
| in_text += " " + word | |
| # Remove start token | |
| caption_words = in_text.replace(START_TOKEN, "").strip().split() | |
| caption = " ".join(_clean_caption_tokens(caption_words)) | |
| return caption | |
| def _beam_search_core(predict_next_probs, tokenizer, max_length: int, beam_width: int = 3) -> list: | |
| """ | |
| Shared beam-search logic. | |
| Args: | |
| predict_next_probs: Callable taking padded sequence and returning | |
| probability vector over vocab. | |
| tokenizer: Fitted Keras tokenizer | |
| max_length: Maximum caption length | |
| beam_width: Number of beams to maintain | |
| """ | |
| # Each beam: (token_list, cumulative_log_prob) | |
| start_seq = tokenizer.texts_to_sequences([START_TOKEN])[0] | |
| if not start_seq: | |
| return [] | |
| start_id = start_seq[0] | |
| beams = [(start_seq, 0.0)] | |
| completed = [] | |
| candidate_pool = max(beam_width * 5, beam_width) | |
| alpha = 0.7 # Length penalty strength. | |
| min_words_before_end = 4 | |
| for _ in range(max_length): | |
| all_candidates = [] | |
| for seq, score in beams: | |
| # If this sequence already ended, skip | |
| if len(seq) > 1: | |
| last_word = word_for_id(seq[-1], tokenizer) | |
| if last_word == END_TOKEN: | |
| completed.append((seq, score)) | |
| continue | |
| # Pad and predict | |
| padded = pad_sequences([seq], maxlen=max_length, padding="post") | |
| yhat = predict_next_probs(padded) | |
| # Expand each beam by more than beam_width to keep diverse options. | |
| top_indices = np.argsort(yhat)[-candidate_pool:] | |
| for idx in top_indices: | |
| idx = int(idx) | |
| word = word_for_id(idx, tokenizer) | |
| if word is None: | |
| continue | |
| # Block invalid or degenerate transitions. | |
| if idx == 0 or idx == start_id: | |
| continue | |
| if len(seq) > 1 and idx == seq[-1]: | |
| continue | |
| if word == END_TOKEN and (len(seq) - 1) < min_words_before_end: | |
| continue | |
| # Penalize overusing the same word in one caption. | |
| repeat_count = sum(1 for token_id in seq if token_id == idx) | |
| repeat_penalty = 0.15 * repeat_count | |
| candidate_seq = seq + [idx] | |
| candidate_score = score + np.log(float(yhat[idx]) + 1e-10) - repeat_penalty | |
| all_candidates.append((candidate_seq, candidate_score)) | |
| if not all_candidates: | |
| break | |
| # Keep top beam_width candidates | |
| all_candidates.sort(key=lambda x: x[1], reverse=True) | |
| beams = all_candidates[:beam_width] | |
| # Add remaining beams to completed | |
| completed.extend(beams) | |
| # Convert sequences to text | |
| results = [] | |
| for seq, score in completed: | |
| words = [] | |
| for idx in seq: | |
| word = word_for_id(idx, tokenizer) | |
| if word and word != START_TOKEN and word != END_TOKEN: | |
| words.append(word) | |
| caption = " ".join(words) | |
| if caption: | |
| cleaned_words = _clean_caption_tokens(words) | |
| if not cleaned_words: | |
| continue | |
| # Length penalty similar to GNMT decoding. | |
| lp = ((5 + len(cleaned_words)) ** alpha) / ((5 + 1) ** alpha) | |
| norm_score = score / lp | |
| norm_score -= _ending_quality_penalty(cleaned_words) | |
| results.append((" ".join(cleaned_words), norm_score)) | |
| # Sort by score (best first) and deduplicate | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| seen = set() | |
| unique_results = [] | |
| for caption, score in results: | |
| if caption not in seen: | |
| seen.add(caption) | |
| unique_results.append((caption, score)) | |
| unique_results = unique_results[:beam_width] | |
| confidences = _softmax([score for _, score in unique_results]) | |
| return [ | |
| (caption, confidence) | |
| for (caption, _), confidence in zip(unique_results, confidences) | |
| ] | |
| def beam_search(model, tokenizer, feature: np.ndarray, | |
| max_length: int, beam_width: int = 3) -> list: | |
| """ | |
| Beam search for a single model. | |
| """ | |
| def _predict_single(padded_seq): | |
| return model.predict([np.expand_dims(feature, 0), padded_seq], verbose=0)[0] | |
| return _beam_search_core(_predict_single, tokenizer, max_length, beam_width) | |
| def beam_search_with_attention( | |
| model, tokenizer, feature: np.ndarray, | |
| max_length: int, beam_width: int = 3 | |
| ) -> tuple: | |
| """ | |
| Beam search that also records per-step attention weights for the best caption. | |
| Returns: | |
| (captions, attention_maps) where attention_maps is a list of (word, attn_7x7) | |
| tuples aligned to the best caption's words. | |
| """ | |
| # First run normal beam search to get captions | |
| candidates = beam_search(model, tokenizer, feature, max_length, beam_width) | |
| if not candidates: | |
| return candidates, [] | |
| # Re-run greedy decode on the best caption to extract attention weights. | |
| # We build an attention-extraction model that outputs both predictions and | |
| # the attention weights from BahdanauAttention. | |
| try: | |
| import tensorflow as tf | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences as _pad | |
| # Find the attention layer | |
| attn_layer = None | |
| for layer in model.layers: | |
| if hasattr(layer, 'attention_v') or 'attention' in layer.name.lower(): | |
| attn_layer = layer | |
| break | |
| attention_maps = [] | |
| in_text = START_TOKEN | |
| feat_expanded = np.expand_dims(feature, 0) # (1, 49, 512) | |
| for _ in range(max_length): | |
| seq = tokenizer.texts_to_sequences([in_text])[0] | |
| padded = _pad([seq], maxlen=max_length, padding="post") | |
| # Use tf.GradientTape to get attention weights via a sub-model. | |
| # We build it once on the first call. | |
| pred = model.predict([feat_expanded, padded], verbose=0)[0] | |
| word_idx = int(np.argmax(pred)) | |
| word = word_for_id(word_idx, tokenizer) | |
| if word is None or word == END_TOKEN: | |
| break | |
| if word == START_TOKEN: | |
| in_text += " " + word | |
| continue | |
| # Compute saliency-based attention proxy via gradient magnitudes. | |
| feat_tensor = tf.constant(feat_expanded, dtype=tf.float32) | |
| pad_tensor = tf.constant(padded, dtype=tf.float32) | |
| with tf.GradientTape() as tape: | |
| tape.watch(feat_tensor) | |
| output = model([feat_tensor, pad_tensor], training=False) | |
| score = output[0, word_idx] | |
| grads = tape.gradient(score, feat_tensor) # (1, 49, 512) | |
| if grads is not None: | |
| # Pool over feature dim -> per-location importance (49,) | |
| attn_weights = tf.reduce_mean(tf.abs(grads[0]), axis=-1).numpy() | |
| # Normalize to [0, 1] | |
| attn_min, attn_max = attn_weights.min(), attn_weights.max() | |
| if attn_max > attn_min: | |
| attn_weights = (attn_weights - attn_min) / (attn_max - attn_min) | |
| attention_maps.append((word, attn_weights.reshape(7, 7))) | |
| else: | |
| attention_maps.append((word, np.ones((7, 7)) / 49.0)) | |
| in_text += " " + word | |
| return candidates, attention_maps | |
| except Exception: | |
| # Graceful fallback: return captions with uniform attention | |
| return candidates, [] | |
| def beam_search_ensemble_with_attention( | |
| models: list, tokenizer, features: list, | |
| max_length: int, beam_width: int = 5 | |
| ) -> tuple: | |
| """ | |
| Ensemble beam search with gradient-based attention extraction. | |
| Returns (captions, attention_maps). | |
| """ | |
| candidates = beam_search_ensemble(models, tokenizer, features, max_length, beam_width) | |
| if not candidates or not models: | |
| return candidates, [] | |
| # Use the first model + first feature for attention visualization | |
| _, attention_maps = beam_search_with_attention( | |
| models[0], tokenizer, features[0], max_length, beam_width=1 | |
| ) | |
| return candidates, attention_maps | |
| def beam_search_ensemble(models: list, tokenizer, features: list, | |
| max_length: int, beam_width: int = 5) -> list: | |
| """ | |
| Beam search with model ensembling by averaging next-token probabilities. | |
| Args: | |
| models: List of trained captioning models. | |
| tokenizer: Fitted tokenizer. | |
| features: List of feature tensors aligned with models. | |
| max_length: Maximum caption length. | |
| beam_width: Beam size. | |
| """ | |
| if not models or len(models) != len(features): | |
| return [] | |
| def _predict_ensemble(padded_seq): | |
| probs = [] | |
| for model, feature in zip(models, features): | |
| p = model.predict([np.expand_dims(feature, 0), padded_seq], verbose=0)[0] | |
| probs.append(p) | |
| return np.mean(np.stack(probs, axis=0), axis=0) | |
| return _beam_search_core(_predict_ensemble, tokenizer, max_length, beam_width) | |
| def extract_single_image_feature(image_path: str, backbone: str = "vgg16") -> np.ndarray: | |
| """ | |
| Extract feature vector from a single image using VGG. | |
| Args: | |
| image_path: Path to the image file | |
| backbone: "vgg16" or "vgg19" | |
| Returns: | |
| Feature vector of shape (4096,) | |
| """ | |
| if backbone == "vgg16": | |
| base_model = VGG16(weights="imagenet") | |
| preprocess_fn = vgg16_preprocess | |
| else: | |
| base_model = VGG19(weights="imagenet") | |
| preprocess_fn = vgg19_preprocess | |
| model = KerasModel( | |
| inputs=base_model.input, | |
| outputs=base_model.get_layer("block5_pool").output | |
| ) | |
| image = load_img(image_path, target_size=(IMAGE_SIZE, IMAGE_SIZE)) | |
| image = img_to_array(image) | |
| image = np.expand_dims(image, axis=0) | |
| image = preprocess_fn(image) | |
| feature = model.predict(image, verbose=0)[0] # (7, 7, 512) | |
| h, w, c = feature.shape | |
| return feature.reshape(h * w, c) # (49, 512) | |
| def generate_caption(image_path: str, backbone: str = "vgg16", | |
| use_beam: bool = True) -> list: | |
| """ | |
| Full pipeline: image → feature extraction → caption generation. | |
| Args: | |
| image_path: Path to input image | |
| backbone: "vgg16", "vgg19", or "ensemble" | |
| use_beam: If True, use beam search; otherwise greedy | |
| Returns: | |
| List of (caption, score) tuples | |
| """ | |
| tokenizer = load_tokenizer(TOKENIZER_FILE) | |
| if backbone == "ensemble": | |
| model16 = load_model(VGG16_MODEL_FILE, custom_objects=_CUSTOM_OBJECTS) | |
| model19 = load_model(VGG19_MODEL_FILE, custom_objects=_CUSTOM_OBJECTS) | |
| max_length = min(model16.input_shape[1][1], model19.input_shape[1][1]) | |
| feature16 = extract_single_image_feature(image_path, "vgg16") | |
| feature19 = extract_single_image_feature(image_path, "vgg19") | |
| if use_beam: | |
| return beam_search_ensemble( | |
| [model16, model19], tokenizer, [feature16, feature19], max_length, BEAM_WIDTH | |
| ) | |
| caption = greedy_search(model19, tokenizer, feature19, max_length) | |
| return [(caption, 1.0 if caption else 0.0)] | |
| model_file = VGG16_MODEL_FILE if backbone == "vgg16" else VGG19_MODEL_FILE | |
| model = load_model(model_file, custom_objects=_CUSTOM_OBJECTS) | |
| max_length = model.input_shape[1][1] | |
| feature = extract_single_image_feature(image_path, backbone) | |
| if use_beam: | |
| return beam_search(model, tokenizer, feature, max_length, BEAM_WIDTH) | |
| caption = greedy_search(model, tokenizer, feature, max_length) | |
| return [(caption, 1.0 if caption else 0.0)] | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Generate captions for images") | |
| parser.add_argument("--image", type=str, required=True, help="Path to input image") | |
| parser.add_argument( | |
| "--backbone", type=str, default="vgg16", | |
| choices=["vgg16", "vgg19", "ensemble"], | |
| help="CNN backbone (default: vgg16)" | |
| ) | |
| parser.add_argument( | |
| "--greedy", action="store_true", | |
| help="Use greedy search instead of beam search" | |
| ) | |
| args = parser.parse_args() | |
| if not os.path.exists(args.image): | |
| print(f"Error: Image not found: {args.image}") | |
| return | |
| print(f"\nGenerating captions for: {args.image}") | |
| print(f"Backbone: {args.backbone.upper()}") | |
| print(f"Method: {'Greedy' if args.greedy else f'Beam (width={BEAM_WIDTH})'}") | |
| print("-" * 40) | |
| results = generate_caption(args.image, args.backbone, not args.greedy) | |
| for i, (caption, score) in enumerate(results, 1): | |
| print(f" #{i}: {caption}") | |
| if not args.greedy: | |
| print(f" (score: {score:.4f})") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |