| """ |
| test_model.py |
| ============= |
| Validation test suite for the MiniLM project. |
| |
| Run this script before publishing to HuggingFace to confirm |
| that all components are working correctly end-to-end. |
| |
| Author : AndrΓ© Costa |
| License : MIT |
| |
| Usage: |
| # Run all tests |
| python test_model.py |
| |
| # Run a specific test group only |
| python test_model.py --only tokenizer |
| python test_model.py --only corpus |
| python test_model.py --only model |
| python test_model.py --only generate |
| python test_model.py --only export |
| """ |
|
|
| import os |
| import sys |
| import math |
| import argparse |
| import traceback |
|
|
| import torch |
|
|
| |
| |
| |
|
|
| PASS = " [PASS]" |
| FAIL = " [FAIL]" |
| SKIP = " [SKIP]" |
| SEP = "β" * 55 |
|
|
| results = [] |
|
|
|
|
| def section(title: str) -> None: |
| print(f"\n{SEP}") |
| print(f" {title}") |
| print(SEP) |
|
|
|
|
| def check(name: str, condition: bool, detail: str = "") -> bool: |
| status = PASS if condition else FAIL |
| line = f"{status} {name}" |
| if detail: |
| line += f"\n {detail}" |
| print(line) |
| results.append((name, condition)) |
| return condition |
|
|
|
|
| def skip(name: str, reason: str) -> None: |
| print(f"{SKIP} {name} ({reason})") |
| results.append((name, None)) |
|
|
|
|
| def summary() -> None: |
| print(f"\n{SEP}") |
| print(" Summary") |
| print(SEP) |
| passed = sum(1 for _, r in results if r is True) |
| failed = sum(1 for _, r in results if r is False) |
| skipped = sum(1 for _, r in results if r is None) |
| total = passed + failed |
| print(f" Passed : {passed}/{total}") |
| print(f" Failed : {failed}/{total}") |
| if skipped: |
| print(f" Skipped : {skipped}") |
| print(SEP) |
| if failed > 0: |
| print("\n Fix the failed tests before publishing.\n") |
| sys.exit(1) |
| else: |
| print("\n All tests passed. Ready to export and publish.\n") |
|
|
|
|
| |
| |
| |
|
|
| def test_tokenizer() -> None: |
| section("1 β BPE Tokenizer") |
|
|
| |
| tok_ok = check( |
| "Tokenizer files exist (./tokenizer/)", |
| os.path.isfile("./tokenizer/tokenizer.json") and |
| os.path.isfile("./tokenizer/vocab.json"), |
| "Run 'python bpe_tokenizer.py' first." |
| ) |
| if not tok_ok: |
| skip("Tokenizer load", "tokenizer files missing") |
| skip("Encode / decode", "tokenizer files missing") |
| skip("Vocab size", "tokenizer files missing") |
| skip("No UNK tokens", "tokenizer files missing") |
| return |
|
|
| |
| try: |
| from bpe_tokenizer import BPETokenizer |
| tokenizer = BPETokenizer.load("./tokenizer") |
| check("Tokenizer loads without errors", True) |
| except Exception as e: |
| check("Tokenizer loads without errors", False, str(e)) |
| skip("Encode / decode", "load failed") |
| skip("Vocab size", "load failed") |
| skip("No UNK tokens", "load failed") |
| return |
|
|
| |
| check( |
| "Vocab size == 16384", |
| tokenizer.vocab_size == 16384, |
| f"Got vocab_size={tokenizer.vocab_size}" |
| ) |
|
|
| |
| test_strings = [ |
| "Hello, world!", |
| "Once upon a time there was a little girl.", |
| "OlΓ‘ mundo! Aprendizado de mΓ‘quina.", |
| "The quick brown fox jumps over the lazy dog.", |
| "Redes neurais aprendem padrΓ΅es complexos.", |
| ] |
| all_ok = True |
| for text in test_strings: |
| ids = tokenizer.encode(text) |
| decoded = tokenizer.decode(ids) |
| if decoded != text: |
| all_ok = False |
| check(f"Encode/decode: {repr(text)}", False, |
| f"Expected {repr(text)}, got {repr(decoded)}") |
| check("Encode/decode round-trip (5 strings)", all_ok) |
|
|
| |
| exotic = "γγγ«γ‘γ― π Ω
Ψ±ΨΨ¨Ψ§" |
| try: |
| ids = tokenizer.encode(exotic) |
| decoded = tokenizer.decode(ids) |
| check("Encodes non-Latin text without errors", True) |
| except Exception as e: |
| check("Encodes non-Latin text without errors", False, str(e)) |
|
|
|
|
| def test_corpus() -> None: |
| section("2 β Corpus") |
|
|
| |
| for split in ["train", "val", "test"]: |
| path = f"./corpus/{split}" |
| exists = os.path.isdir(path) and len(os.listdir(path)) > 0 |
| check( |
| f"Corpus split exists: {split}", |
| exists, |
| "Run 'python data_pipeline.py' first." if not exists else "" |
| ) |
|
|
| if not os.path.isdir("./corpus/train"): |
| skip("Corpus loads via CorpusDataset", "corpus missing") |
| skip("Corpus chunk shape", "corpus missing") |
| skip("Corpus token range", "corpus missing") |
| return |
|
|
| |
| try: |
| from data_pipeline import CorpusDataset |
| dataset = CorpusDataset("./corpus/train") |
| check( |
| "CorpusDataset loads without errors", |
| len(dataset) > 0, |
| f"Chunks: {len(dataset):,}" |
| ) |
| except Exception as e: |
| check("CorpusDataset loads without errors", False, str(e)) |
| skip("Corpus chunk shape", "load failed") |
| skip("Corpus token range", "load failed") |
| return |
|
|
| |
| sample = dataset[0] |
| check( |
| "Chunk shape == (512,)", |
| sample.shape == (512,), |
| f"Got shape {sample.shape}" |
| ) |
|
|
| |
| from bpe_tokenizer import BPETokenizer |
| tokenizer = BPETokenizer.load("./tokenizer") |
| vocab_size = tokenizer.vocab_size |
|
|
| bad_ids = [(sample < 0).sum().item(), (sample >= vocab_size).sum().item()] |
| check( |
| "All token IDs within vocab range", |
| bad_ids[0] == 0 and bad_ids[1] == 0, |
| f"{bad_ids[0]} negative, {bad_ids[1]} out-of-range IDs found" |
| ) |
|
|
|
|
| def test_model() -> None: |
| section("3 β Model (forward pass)") |
|
|
| try: |
| from transformer import MiniLM, ModelConfig |
| except Exception as e: |
| check("transformer.py imports", False, str(e)) |
| return |
|
|
| check("transformer.py imports", True) |
|
|
| |
| try: |
| config = ModelConfig() |
| model = MiniLM(config) |
| check( |
| "Model instantiates", |
| True, |
| f"{config.n_params / 1e6:.1f}M parameters" |
| ) |
| except Exception as e: |
| check("Model instantiates", False, str(e)) |
| skip("Forward pass", "instantiation failed") |
| skip("Loss ~ log(vocab)", "instantiation failed") |
| skip("Loss decreases", "instantiation failed") |
| return |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model = model.to(device) |
| model.eval() |
|
|
| |
| try: |
| B, T = 2, 64 |
| ids = torch.randint(0, config.vocab_size, (B, T + 1)).to(device) |
| inputs = ids[:, :-1].contiguous() |
| targets = ids[:, 1:].contiguous() |
| with torch.no_grad(): |
| logits, loss = model(inputs, targets) |
| check( |
| "Forward pass runs without errors", |
| logits.shape == (B, T, config.vocab_size), |
| f"logits shape: {logits.shape}" |
| ) |
| except Exception as e: |
| check("Forward pass runs without errors", False, str(e)) |
| skip("Loss ~ log(vocab)", "forward pass failed") |
| skip("Loss decreases", "forward pass failed") |
| return |
|
|
| |
| expected_loss = math.log(config.vocab_size) |
| tolerance = expected_loss * 0.5 |
| actual_loss = loss.item() |
| check( |
| f"Initial loss near log(vocab_size) = {expected_loss:.2f}", |
| abs(actual_loss - expected_loss) < tolerance, |
| f"Got loss={actual_loss:.4f}, expected ~{expected_loss:.4f}" |
| ) |
|
|
| |
| |
| |
| |
| try: |
| model.train() |
| optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3) |
| for _ in range(3): |
| optimizer.zero_grad() |
| ids = torch.randint(0, config.vocab_size, (2, 65)).to(device) |
| _, loss = model(ids[:, :-1].contiguous(), ids[:, 1:].contiguous()) |
| loss.backward() |
| optimizer.step() |
| check( |
| "Backward pass runs without errors", |
| True, |
| f"Final loss: {loss.item():.4f}" |
| ) |
| except Exception as e: |
| check("Backward pass runs without errors", False, str(e)) |
|
|
|
|
| def test_generate() -> None: |
| section("4 β Text Generation") |
|
|
| |
| ckpt_path = "./checkpoints/best_model.pt" |
| if not os.path.isfile(ckpt_path): |
| skip("Load checkpoint", "best_model.pt not found β train first") |
| skip("Generate tokens", "checkpoint missing") |
| skip("Output length correct", "checkpoint missing") |
| return |
|
|
| try: |
| from transformer import MiniLM, ModelConfig |
| from bpe_tokenizer import BPETokenizer |
|
|
| ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=True) |
| cfg_dict = ckpt["model_config"] |
| cfg_dict.pop("d_head", None) |
| config = ModelConfig(**cfg_dict) |
| model = MiniLM(config) |
| |
| state_dict = ckpt["model_state"] |
| if any(k.startswith("_orig_mod.") for k in state_dict): |
| state_dict = {k.replace("_orig_mod.", ""): v for k, v in state_dict.items()} |
| model.load_state_dict(state_dict) |
| model.eval() |
| check("Checkpoint loads without errors", True) |
| except Exception as e: |
| check("Checkpoint loads without errors", False, str(e)) |
| skip("Generate tokens", "load failed") |
| skip("Output length correct", "load failed") |
| return |
|
|
| try: |
| tokenizer = BPETokenizer.load("./tokenizer") |
| prompts = ["Once upon a time", "The model learned"] |
| n_new = 20 |
|
|
| for prompt in prompts: |
| input_ids = torch.tensor([tokenizer.encode(prompt)]) |
| with torch.no_grad(): |
| output = model.generate( |
| input_ids, |
| max_new_tokens=n_new, |
| temperature=0.8, |
| top_k=50, |
| ) |
| generated_text = tokenizer.decode(output[0].tolist()) |
| n_generated = output.shape[1] - input_ids.shape[1] |
|
|
| check( |
| f"Generates {n_new} tokens from: {repr(prompt)}", |
| n_generated == n_new, |
| f"Output: {repr(generated_text)}" |
| ) |
| except Exception as e: |
| check("Generate tokens", False, str(e)) |
|
|
|
|
| def test_export() -> None: |
| section("5 β HuggingFace Export") |
|
|
| export_dir = "./hf_export" |
|
|
| if not os.path.isdir(export_dir): |
| skip("Export files exist", "hf_export/ not found β run --mode export first") |
| skip("config.json valid", "hf_export/ not found") |
| skip("Weights file exists","hf_export/ not found") |
| skip("Model card exists", "hf_export/ not found") |
| skip("Tokenizer files", "hf_export/ not found") |
| return |
|
|
| |
| required = [ |
| "config.json", |
| "README.md", |
| "tokenizer.json", |
| "vocab.json", |
| ] |
| for fname in required: |
| path = os.path.join(export_dir, fname) |
| check(f"Export file exists: {fname}", os.path.isfile(path)) |
|
|
| |
| has_weights = ( |
| os.path.isfile(os.path.join(export_dir, "model.safetensors")) or |
| os.path.isfile(os.path.join(export_dir, "pytorch_model.bin")) |
| ) |
| check("Model weights file exists (safetensors or .bin)", has_weights) |
|
|
| |
| try: |
| import json |
| with open(os.path.join(export_dir, "config.json")) as f: |
| cfg = json.load(f) |
| required_keys = [ |
| "vocab_size", "hidden_size", "num_hidden_layers", |
| "num_attention_heads", "intermediate_size" |
| ] |
| missing = [k for k in required_keys if k not in cfg] |
| check( |
| "config.json contains required fields", |
| len(missing) == 0, |
| f"Missing: {missing}" if missing else "" |
| ) |
| except Exception as e: |
| check("config.json is valid", False, str(e)) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="MiniLM β pre-publication test suite") |
| parser.add_argument( |
| "--only", |
| choices=["tokenizer", "corpus", "model", "generate", "export"], |
| default=None, |
| help="Run only a specific test group" |
| ) |
| args = parser.parse_args() |
|
|
| print("=" * 55) |
| print(" MiniLM β Pre-publication Test Suite") |
| print("=" * 55) |
|
|
| groups = { |
| "tokenizer": test_tokenizer, |
| "corpus": test_corpus, |
| "model": test_model, |
| "generate": test_generate, |
| "export": test_export, |
| } |
|
|
| if args.only: |
| groups[args.only]() |
| else: |
| for fn in groups.values(): |
| try: |
| fn() |
| except Exception as e: |
| print(f"\n [ERROR] Unexpected error in {fn.__name__}:") |
| traceback.print_exc() |
|
|
| summary() |
|
|