image-captioning-api / tests /unit /test_beam_decoder.py
apoorvrajdev's picture
feat(evaluation): add beam search, metrics pipeline, and stabilized training workflow
91a1214
"""Beam-search decoder unit tests.
The full TF decoder forward path is exercised by the parity audit and the
smoke test in ``scripts/predict.py``. Here we test the *algorithmic* pieces
of beam search in isolation:
* Length penalty correctly rescales scores.
* Repetition penalty downweights seen tokens.
* n-gram blocker forbids exact-repeat n-grams.
* Detokeniser strips ``[start]`` / ``[end]`` and stops at ``[end]``.
A small fake model is used to verify end-to-end search behaviour without
loading TensorFlow weights.
"""
from __future__ import annotations
from unittest.mock import MagicMock
import numpy as np
import pytest
from captioning.inference.beam import (
_apply_repetition_penalty,
_Beam,
_blocks_repeat_ngram,
_detokenize,
_length_normalised,
generate_caption_beam,
)
def test_length_penalty_zero_returns_raw_score() -> None:
b = _Beam(token_ids=[1, 2, 3], score=-5.0)
assert _length_normalised(b, 0.0) == -5.0
def test_length_penalty_one_divides_by_length() -> None:
b = _Beam(token_ids=[1, 2, 3, 4], score=-6.0) # length=3
assert _length_normalised(b, 1.0) == pytest.approx(-2.0)
def test_repetition_penalty_downweights_seen_tokens() -> None:
log_probs = np.array([-1.0, -2.0, -3.0, -4.0])
out = _apply_repetition_penalty(log_probs.copy(), history_ids={1, 3}, penalty=2.0)
# Penalty subtracts log(2) ~ 0.693 from seen-token log-probs.
assert out[0] == pytest.approx(-1.0)
assert out[1] == pytest.approx(-2.0 - np.log(2.0))
assert out[2] == pytest.approx(-3.0)
assert out[3] == pytest.approx(-4.0 - np.log(2.0))
def test_repetition_penalty_one_is_noop() -> None:
log_probs = np.array([-1.0, -2.0, -3.0])
out = _apply_repetition_penalty(log_probs.copy(), history_ids={0, 1}, penalty=1.0)
np.testing.assert_array_equal(out, log_probs)
def test_blocks_repeat_ngram_detects_repeat() -> None:
# seq ends with [4, 5]; appending 6 forms trigram [4, 5, 6] not present.
assert not _blocks_repeat_ngram([1, 2, 3, 4, 5], 6, n=3)
# Now seq contains [4, 5, 6]; appending 6 still wouldn't form a repeat.
assert not _blocks_repeat_ngram([4, 5, 6, 4, 5], 7, n=3)
# seq has [4, 5, 6] AND ends with [4, 5]; appending 6 repeats [4, 5, 6].
assert _blocks_repeat_ngram([4, 5, 6, 1, 4, 5], 6, n=3)
def test_blocks_repeat_ngram_zero_size_disables() -> None:
assert not _blocks_repeat_ngram([1, 1, 1], 1, n=0)
def test_detokenize_stops_at_end_and_skips_special_tokens() -> None:
tokenizer = MagicMock()
# ids: [start]=1, "a"=2, "man"=3, [end]=4
table = {1: "[start]", 2: "a", 3: "man", 4: "[end]"}
tokenizer.decode_id = lambda i: table[i]
out = _detokenize([1, 2, 3, 4, 99], tokenizer, end_id=4)
assert out == "a man"
# ---- End-to-end beam search with a fake model -----------------------------
class _FakeModel:
"""Decoder fixture that always assigns the highest probability to ``best_id``.
The decoder output is the only piece beam search cares about; we stub the
CNN / encoder to identity-like behaviour so the whole inference pass runs
without TF being loaded.
"""
def __init__(self, vocab_size: int, best_id: int) -> None:
self.vocab_size = vocab_size
self.best_id = best_id
self.cnn_model = MagicMock(side_effect=self._identity_image)
self.encoder = MagicMock(side_effect=self._identity_encoder)
self.decoder = MagicMock(side_effect=self._decoder_step)
def _identity_image(self, img):
return img
def _identity_encoder(self, x, training):
return x
def _decoder_step(self, tokens, encoded, training, mask):
import tensorflow as tf
batch = int(tf.shape(tokens)[0])
seq_len = int(tf.shape(tokens)[1])
probs = np.full((batch, seq_len, self.vocab_size), 1e-3, dtype=np.float32)
probs[:, :, self.best_id] = 0.999
# Normalise so each row over vocab sums to ~1.
probs /= probs.sum(axis=-1, keepdims=True)
return tf.convert_to_tensor(probs)
def test_beam_search_emits_caption_when_model_prefers_end_token() -> None:
import tensorflow as tf
tokenizer = MagicMock()
# vocab: 0=pad, 1=[start], 2=[end], 3="dog"
word_to_id_table = {"[start]": 1, "[end]": 2}
decode_id_table = {0: "", 1: "[start]", 2: "[end]", 3: "dog"}
tokenizer.word_to_id = lambda w: word_to_id_table[w]
tokenizer.decode_id = lambda i: decode_id_table[i]
# Model always predicts "dog" (id=3).
model = _FakeModel(vocab_size=4, best_id=3)
image = tf.zeros((299, 299, 3), dtype=tf.float32)
caption = generate_caption_beam(
model,
tokenizer,
image,
max_length=6,
beam_width=2,
length_penalty=0.0,
)
# With no length penalty and no repetition penalty, the greedy-ish path
# outputs repeated "dog" until max_length. We just assert it produced
# *something* and didn't crash.
assert caption.startswith("dog")
def test_beam_search_terminates_on_eos() -> None:
"""Beam search must produce a clean caption when the model emits [end]."""
import tensorflow as tf
tokenizer = MagicMock()
word_to_id_table = {"[start]": 1, "[end]": 2}
decode_id_table = {0: "", 1: "[start]", 2: "[end]", 3: "dog"}
tokenizer.word_to_id = lambda w: word_to_id_table[w]
tokenizer.decode_id = lambda i: decode_id_table[i]
# Step 0: prefer "dog"; step 1+: prefer [end].
class _EosFakeModel(_FakeModel):
def _decoder_step(self, tokens, encoded, training, mask):
batch = int(tf.shape(tokens)[0])
seq_len = int(tf.shape(tokens)[1])
probs = np.full((batch, seq_len, self.vocab_size), 1e-3, dtype=np.float32)
probs[:, 0, 3] = 0.99 # at position 0 prefer "dog"
for pos in range(1, seq_len):
probs[:, pos, 2] = 0.99 # afterwards prefer [end]
probs /= probs.sum(axis=-1, keepdims=True)
return tf.convert_to_tensor(probs)
model = _EosFakeModel(vocab_size=4, best_id=3)
caption = generate_caption_beam(
model,
tokenizer,
tf.zeros((299, 299, 3), dtype=tf.float32),
max_length=6,
beam_width=2,
)
assert caption == "dog"