| |
| |
| """ |
| test_modele_nlp.py |
| ================== |
| Test / inférence pour le modèle GPT custom entraîné par train_nlp_ft_10epochs_8hours.py. |
| |
| Fonctions: |
| - charger automatiquement le tokenizer, la config et le checkpoint |
| - générer du texte à partir d'un prompt |
| - tester plusieurs prompts |
| - évaluer loss / perplexité sur un fichier texte local |
| |
| Exemples: |
| python test_modele_nlp.py \ |
| --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ |
| --prompt "Bonjour, voici un résumé de l'actualité" |
| |
| python test_modele_nlp.py \ |
| --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ |
| --prompt-file prompts.txt \ |
| --max-new-tokens 200 |
| |
| python test_modele_nlp.py \ |
| --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ |
| --eval-texts-file eval.txt \ |
| --eval-batch-size 4 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import os |
| from collections import OrderedDict |
| from contextlib import nullcontext |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Optional |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.nn.attention import SDPBackend, sdpa_kernel |
| from transformers import PreTrainedTokenizerFast |
|
|
|
|
| |
| |
| |
|
|
| PAD_TOKEN = "<pad>" |
| BOS_TOKEN = "<bos>" |
| EOS_TOKEN = "<eos>" |
| UNK_TOKEN = "<unk>" |
| DTYPE_MAP = { |
| "float16": torch.float16, |
| "bfloat16": torch.bfloat16, |
| "float32": torch.float32, |
| } |
|
|
|
|
| def normalize_state_dict_keys(sd: dict) -> OrderedDict: |
| out = OrderedDict() |
| for k, v in sd.items(): |
| for prefix in ("module._orig_mod.", "_orig_mod.", "module."): |
| if k.startswith(prefix): |
| k = k[len(prefix):] |
| break |
| out[k] = v |
| return out |
|
|
|
|
| def get_device(device_arg: str) -> torch.device: |
| if device_arg == "auto": |
| if torch.cuda.is_available(): |
| return torch.device("cuda") |
| return torch.device("cpu") |
| return torch.device(device_arg) |
|
|
|
|
| def autocast_context(device: torch.device, dtype: torch.dtype): |
| if device.type == "cuda" and dtype in (torch.float16, torch.bfloat16): |
| return torch.autocast(device_type="cuda", dtype=dtype) |
| return nullcontext() |
|
|
|
|
| def resolve_checkpoint(model_dir: Path, checkpoint_name: Optional[str]) -> Path: |
| if checkpoint_name: |
| ckpt = model_dir / checkpoint_name |
| if not ckpt.exists(): |
| raise FileNotFoundError(f"Checkpoint introuvable: {ckpt}") |
| return ckpt |
|
|
| candidates = [ |
| model_dir / "model_best.pt", |
| model_dir / "model.pt", |
| model_dir / "train_state.pt", |
| ] |
| for ckpt in candidates: |
| if ckpt.exists(): |
| return ckpt |
| raise FileNotFoundError( |
| f"Aucun checkpoint trouvé dans {model_dir}. Cherchés: model_best.pt, model.pt, train_state.pt" |
| ) |
|
|
|
|
| def resolve_tokenizer_dir(model_dir: Path, explicit_tokenizer_dir: Optional[str]) -> Path: |
| if explicit_tokenizer_dir: |
| tok_dir = Path(explicit_tokenizer_dir) |
| if not tok_dir.exists(): |
| raise FileNotFoundError(f"Tokenizer dir introuvable: {tok_dir}") |
| return tok_dir |
|
|
| candidates = [ |
| model_dir / "tokenizer_32k", |
| model_dir.parent / "nlp_1b_h100_opt" / "tokenizer_32k", |
| Path("./nlp_1b_h100_opt/tokenizer_32k"), |
| ] |
| for tok_dir in candidates: |
| if (tok_dir / "tokenizer.json").exists(): |
| return tok_dir |
| raise FileNotFoundError( |
| "Tokenizer introuvable. Passe --tokenizer-dir explicitement." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class GPTConfig: |
| vocab_size: int = 32000 |
| block_size: int = 1024 |
| d_model: int = 1536 |
| n_heads: int = 24 |
| n_layers: int = 24 |
| d_ff: int = 6144 |
| dropout: float = 0.0 |
| use_checkpointing: bool = False |
|
|
|
|
| class RMSNorm(nn.Module): |
| def __init__(self, dim: int, eps: float = 1e-6): |
| super().__init__() |
| self.weight = nn.Parameter(torch.ones(dim)) |
| self.eps = eps |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.weight * x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
|
|
| class RotaryEmbedding(nn.Module): |
| def __init__(self, dim: int, base: int = 10_000, max_seq: int = 16_384): |
| super().__init__() |
| inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim)) |
| t = torch.arange(max_seq).float() |
| freqs = torch.outer(t, inv_freq) |
| self.register_buffer("cos_cache", torch.repeat_interleave(freqs.cos(), 2, dim=-1), persistent=False) |
| self.register_buffer("sin_cache", torch.repeat_interleave(freqs.sin(), 2, dim=-1), persistent=False) |
|
|
| def forward(self, seq_len: int, dtype: torch.dtype, device: torch.device): |
| cos = self.cos_cache[:seq_len].to(device=device, dtype=dtype) |
| sin = self.sin_cache[:seq_len].to(device=device, dtype=dtype) |
| return cos, sin |
|
|
|
|
| def rotate_half(x: torch.Tensor) -> torch.Tensor: |
| x1 = x[..., ::2] |
| x2 = x[..., 1::2] |
| return torch.stack((-x2, x1), dim=-1).flatten(-2) |
|
|
|
|
| def apply_rope(x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor) -> torch.Tensor: |
| return x * cos.unsqueeze(0).unsqueeze(0) + rotate_half(x) * sin.unsqueeze(0).unsqueeze(0) |
|
|
|
|
| class CausalSelfAttention(nn.Module): |
| def __init__(self, cfg: GPTConfig): |
| super().__init__() |
| assert cfg.d_model % cfg.n_heads == 0 |
| self.n_heads = cfg.n_heads |
| self.head_dim = cfg.d_model // cfg.n_heads |
| self.qkv = nn.Linear(cfg.d_model, 3 * cfg.d_model, bias=False) |
| self.proj = nn.Linear(cfg.d_model, cfg.d_model, bias=False) |
| self.dropout_p = cfg.dropout |
| self.rope = RotaryEmbedding(self.head_dim) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| b, t, c = x.shape |
| q, k, v = self.qkv(x).split(c, dim=-1) |
| q = q.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) |
| k = k.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) |
| v = v.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) |
|
|
| cos, sin = self.rope(t, x.dtype, x.device) |
| q = apply_rope(q, cos, sin) |
| k = apply_rope(k, cos, sin) |
|
|
| with sdpa_kernel([SDPBackend.FLASH_ATTENTION, SDPBackend.EFFICIENT_ATTENTION, SDPBackend.MATH]): |
| y = F.scaled_dot_product_attention( |
| q, |
| k, |
| v, |
| dropout_p=0.0, |
| is_causal=True, |
| ) |
| y = y.transpose(1, 2).contiguous().view(b, t, c) |
| return self.proj(y) |
|
|
|
|
| class SwiGLU(nn.Module): |
| def __init__(self, cfg: GPTConfig): |
| super().__init__() |
| self.w1 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False) |
| self.w2 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False) |
| self.w3 = nn.Linear(cfg.d_ff, cfg.d_model, bias=False) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.w3(F.silu(self.w1(x)) * self.w2(x)) |
|
|
|
|
| class Block(nn.Module): |
| def __init__(self, cfg: GPTConfig): |
| super().__init__() |
| self.ln1 = RMSNorm(cfg.d_model) |
| self.attn = CausalSelfAttention(cfg) |
| self.ln2 = RMSNorm(cfg.d_model) |
| self.ff = SwiGLU(cfg) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = x + self.attn(self.ln1(x)) |
| x = x + self.ff(self.ln2(x)) |
| return x |
|
|
|
|
| class GPT(nn.Module): |
| def __init__(self, cfg: GPTConfig): |
| super().__init__() |
| self.cfg = cfg |
| self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.d_model) |
| self.blocks = nn.ModuleList([Block(cfg) for _ in range(cfg.n_layers)]) |
| self.ln_f = RMSNorm(cfg.d_model) |
| self.lm_head = nn.Linear(cfg.d_model, cfg.vocab_size, bias=False) |
| self.lm_head.weight = self.tok_emb.weight |
| self.apply(self._init_weights) |
|
|
| @staticmethod |
| def _init_weights(m: nn.Module) -> None: |
| if isinstance(m, (nn.Linear, nn.Embedding)): |
| nn.init.normal_(m.weight, mean=0.0, std=0.02) |
| if isinstance(m, nn.Linear) and m.bias is not None: |
| nn.init.zeros_(m.bias) |
|
|
| def forward(self, input_ids: torch.Tensor, labels: Optional[torch.Tensor] = None): |
| x = self.tok_emb(input_ids) |
| for block in self.blocks: |
| x = block(x) |
| logits = self.lm_head(self.ln_f(x)) |
| loss = None |
| if labels is not None: |
| loss = F.cross_entropy( |
| logits.reshape(-1, logits.size(-1)), |
| labels.reshape(-1), |
| ignore_index=-100, |
| ) |
| return logits, loss |
|
|
|
|
| class LoRALinear(nn.Module): |
| def __init__(self, base_layer: nn.Linear, r: int = 64, alpha: int = 128, dropout: float = 0.05): |
| super().__init__() |
| self.base = base_layer |
| self.r = r |
| self.scale = alpha / max(1, r) |
| in_f, out_f = base_layer.in_features, base_layer.out_features |
| device = base_layer.weight.device |
| dtype = base_layer.weight.dtype |
| self.lora_A = nn.Linear(in_f, r, bias=False, device=device, dtype=dtype) |
| self.lora_B = nn.Linear(r, out_f, bias=False, device=device, dtype=dtype) |
| self.drop = nn.Dropout(dropout) |
| nn.init.kaiming_uniform_(self.lora_A.weight, a=math.sqrt(5)) |
| nn.init.zeros_(self.lora_B.weight) |
| for p in self.base.parameters(): |
| p.requires_grad = False |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.base(x) + self.lora_B(self.lora_A(self.drop(x))) * self.scale |
|
|
|
|
| def apply_qlora_for_loading(model: GPT) -> GPT: |
| targets = [] |
| for name, module in model.named_modules(): |
| if name.split(".")[-1] in {"qkv", "proj", "w1", "w2", "w3"} and isinstance(module, nn.Linear): |
| targets.append((name, module)) |
|
|
| for name, module in targets: |
| parts = name.split(".") |
| parent = model |
| for part in parts[:-1]: |
| parent = getattr(parent, part) |
| setattr(parent, parts[-1], LoRALinear(module)) |
| return model |
|
|
|
|
| def is_lora_state_dict(sd: dict) -> bool: |
| return any(s in k for k in sd.keys() for s in (".lora_A.weight", ".lora_B.weight", ".base.weight")) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def load_model_and_tokenizer( |
| model_dir: Path, |
| checkpoint_name: Optional[str], |
| tokenizer_dir: Optional[str], |
| device: torch.device, |
| dtype: torch.dtype, |
| ): |
| ckpt_path = resolve_checkpoint(model_dir, checkpoint_name) |
| tok_dir = resolve_tokenizer_dir(model_dir, tokenizer_dir) |
|
|
| tokenizer = PreTrainedTokenizerFast.from_pretrained(str(tok_dir)) |
| ckpt = torch.load(ckpt_path, map_location="cpu") |
|
|
| if "config" in ckpt: |
| cfg_dict = ckpt["config"] |
| else: |
| cfg_path = model_dir / "config.json" |
| if not cfg_path.exists(): |
| raise FileNotFoundError("config.json introuvable et aucune config dans le checkpoint.") |
| cfg_dict = json.loads(cfg_path.read_text(encoding="utf-8")) |
|
|
| cfg = GPTConfig(**cfg_dict) |
| cfg.vocab_size = len(tokenizer) |
| cfg.use_checkpointing = False |
|
|
| model = GPT(cfg) |
| sd = normalize_state_dict_keys(ckpt["model"]) |
|
|
| if is_lora_state_dict(sd): |
| model = apply_qlora_for_loading(model) |
|
|
| missing, unexpected = model.load_state_dict(sd, strict=False) |
| if missing: |
| print(f"[warn] clés manquantes: {len(missing)}") |
| if unexpected: |
| print(f"[warn] clés inattendues: {len(unexpected)}") |
|
|
| model.to(device=device) |
| if device.type == "cuda": |
| model.to(dtype=dtype) |
| model.eval() |
| return model, tokenizer, ckpt_path |
|
|
|
|
| |
| |
| |
|
|
| @torch.inference_mode() |
| def generate_text( |
| model: GPT, |
| tokenizer: PreTrainedTokenizerFast, |
| prompt: str, |
| device: torch.device, |
| dtype: torch.dtype, |
| max_new_tokens: int = 128, |
| temperature: float = 0.8, |
| top_k: int = 50, |
| top_p: float = 0.95, |
| repetition_penalty: float = 1.05, |
| do_sample: bool = True, |
| ) -> str: |
| if not prompt.strip(): |
| prompt = "Bonjour" |
|
|
| input_ids = tokenizer.encode(prompt, add_special_tokens=True) |
| x = torch.tensor([input_ids], dtype=torch.long, device=device) |
| block_size = model.cfg.block_size |
| eos_id = tokenizer.eos_token_id |
|
|
| for _ in range(max_new_tokens): |
| x_cond = x[:, -block_size:] |
| with autocast_context(device, dtype): |
| logits, _ = model(x_cond) |
| next_token_logits = logits[:, -1, :] |
|
|
| if repetition_penalty != 1.0: |
| unique_tokens = torch.unique(x_cond) |
| next_token_logits[:, unique_tokens] /= repetition_penalty |
|
|
| if not do_sample or temperature <= 0: |
| next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True) |
| else: |
| next_token_logits = next_token_logits / temperature |
|
|
| if top_k > 0: |
| values, _ = torch.topk(next_token_logits, k=min(top_k, next_token_logits.size(-1)), dim=-1) |
| min_keep = values[:, -1].unsqueeze(-1) |
| next_token_logits = torch.where( |
| next_token_logits < min_keep, |
| torch.full_like(next_token_logits, float("-inf")), |
| next_token_logits, |
| ) |
|
|
| if 0.0 < top_p < 1.0: |
| sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True, dim=-1) |
| probs = F.softmax(sorted_logits, dim=-1) |
| cumprobs = torch.cumsum(probs, dim=-1) |
| sorted_mask = cumprobs > top_p |
| sorted_mask[..., 1:] = sorted_mask[..., :-1].clone() |
| sorted_mask[..., 0] = False |
| sorted_logits = sorted_logits.masked_fill(sorted_mask, float("-inf")) |
| next_token_logits = torch.full_like(next_token_logits, float("-inf")) |
| next_token_logits.scatter_(dim=-1, index=sorted_indices, src=sorted_logits) |
|
|
| probs = F.softmax(next_token_logits, dim=-1) |
| next_token = torch.multinomial(probs, num_samples=1) |
|
|
| x = torch.cat([x, next_token], dim=1) |
| if eos_id is not None and int(next_token.item()) == eos_id: |
| break |
|
|
| return tokenizer.decode(x[0].tolist(), skip_special_tokens=True) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def read_texts_from_file(path: Path) -> list[str]: |
| raw = path.read_text(encoding="utf-8", errors="ignore") |
| if "\n\n" in raw: |
| chunks = [x.strip() for x in raw.split("\n\n") if x.strip()] |
| else: |
| chunks = [x.strip() for x in raw.splitlines() if x.strip()] |
| return chunks |
|
|
|
|
| class PackedEvalDataset(torch.utils.data.Dataset): |
| def __init__(self, texts: list[str], tokenizer: PreTrainedTokenizerFast, block_size: int): |
| bos = tokenizer.bos_token_id |
| eos = tokenizer.eos_token_id |
| tokens: list[int] = [] |
| for text in texts: |
| ids = tokenizer.encode(text, add_special_tokens=False) |
| if ids: |
| tokens.extend([bos] + ids + [eos]) |
| self.samples = [] |
| for i in range(0, max(0, len(tokens) - block_size - 1), block_size + 1): |
| chunk = tokens[i: i + block_size + 1] |
| if len(chunk) == block_size + 1: |
| self.samples.append({ |
| "input_ids": torch.tensor(chunk[:-1], dtype=torch.long), |
| "labels": torch.tensor(chunk[1:], dtype=torch.long), |
| }) |
|
|
| def __len__(self): |
| return len(self.samples) |
|
|
| def __getitem__(self, idx: int): |
| return self.samples[idx] |
|
|
|
|
| @torch.inference_mode() |
| def evaluate_file( |
| model: GPT, |
| tokenizer: PreTrainedTokenizerFast, |
| eval_texts_file: Path, |
| device: torch.device, |
| dtype: torch.dtype, |
| eval_batch_size: int, |
| ): |
| texts = read_texts_from_file(eval_texts_file) |
| if not texts: |
| raise ValueError(f"Aucun texte exploitable dans {eval_texts_file}") |
|
|
| dataset = PackedEvalDataset(texts, tokenizer, model.cfg.block_size) |
| if len(dataset) == 0: |
| raise ValueError("Pas assez de tokens pour former un bloc d'évaluation.") |
|
|
| loader = torch.utils.data.DataLoader( |
| dataset, |
| batch_size=eval_batch_size, |
| shuffle=False, |
| num_workers=0, |
| pin_memory=torch.cuda.is_available(), |
| ) |
|
|
| loss_sum = 0.0 |
| n_batches = 0 |
| for batch in loader: |
| inp = batch["input_ids"].to(device, non_blocking=True) |
| lbl = batch["labels"].to(device, non_blocking=True) |
| with autocast_context(device, dtype): |
| _, loss = model(inp, lbl) |
| loss_sum += float(loss.item()) |
| n_batches += 1 |
|
|
| avg_loss = loss_sum / max(1, n_batches) |
| ppl = math.exp(min(avg_loss, 20.0)) |
| return { |
| "num_texts": len(texts), |
| "num_batches": n_batches, |
| "avg_loss": avg_loss, |
| "perplexity": ppl, |
| } |
|
|
|
|
| |
| |
| |
|
|
|
|
| def build_parser() -> argparse.ArgumentParser: |
| p = argparse.ArgumentParser(description="Test / génération pour GPT custom NLP.") |
| p.add_argument("--model-dir", type=str, required=True, help="Dossier de sortie du modèle fine-tuné.") |
| p.add_argument("--checkpoint", type=str, default=None, help="Nom du checkpoint dans model-dir (ex: model_best.pt).") |
| p.add_argument("--tokenizer-dir", type=str, default=None, help="Dossier du tokenizer si différent.") |
| p.add_argument("--device", type=str, default="auto", help="auto, cpu, cuda, cuda:0...") |
| p.add_argument("--dtype", type=str, default="bfloat16", choices=["float16", "bfloat16", "float32"]) |
|
|
| p.add_argument("--prompt", type=str, default=None, help="Prompt unique à générer.") |
| p.add_argument("--prompt-file", type=str, default=None, help="Fichier texte avec prompts, un par ligne.") |
| p.add_argument("--max-new-tokens", type=int, default=160) |
| p.add_argument("--temperature", type=float, default=0.8) |
| p.add_argument("--top-k", type=int, default=50) |
| p.add_argument("--top-p", type=float, default=0.95) |
| p.add_argument("--repetition-penalty", type=float, default=1.05) |
| p.add_argument("--greedy", action="store_true", help="Désactive le sampling.") |
|
|
| p.add_argument("--eval-texts-file", type=str, default=None, help="Fichier texte local pour calculer loss/perplexité.") |
| p.add_argument("--eval-batch-size", type=int, default=4) |
| return p |
|
|
|
|
| def main() -> None: |
| args = build_parser().parse_args() |
| device = get_device(args.device) |
| dtype = DTYPE_MAP[args.dtype] |
| model_dir = Path(args.model_dir) |
|
|
| if not model_dir.exists(): |
| raise FileNotFoundError(f"model-dir introuvable: {model_dir}") |
|
|
| if device.type == "cuda": |
| torch.backends.cuda.matmul.allow_tf32 = True |
| torch.backends.cudnn.allow_tf32 = True |
| torch.set_float32_matmul_precision("high") |
|
|
| model, tokenizer, ckpt_path = load_model_and_tokenizer( |
| model_dir=model_dir, |
| checkpoint_name=args.checkpoint, |
| tokenizer_dir=args.tokenizer_dir, |
| device=device, |
| dtype=dtype, |
| ) |
|
|
| print("=" * 88) |
| print("TEST DU MODÈLE GPT CUSTOM") |
| print("=" * 88) |
| print(f"Device : {device}") |
| print(f"DType : {dtype}") |
| print(f"Checkpoint : {ckpt_path}") |
| print(f"Tokenizer vocab: {len(tokenizer)}") |
| print(f"Block size : {model.cfg.block_size}") |
| print(f"Paramètres : {sum(p.numel() for p in model.parameters()) / 1e6:.1f} M") |
|
|
| if args.eval_texts_file: |
| stats = evaluate_file( |
| model=model, |
| tokenizer=tokenizer, |
| eval_texts_file=Path(args.eval_texts_file), |
| device=device, |
| dtype=dtype, |
| eval_batch_size=args.eval_batch_size, |
| ) |
| print("\n[ÉVALUATION]") |
| print(json.dumps(stats, indent=2, ensure_ascii=False)) |
|
|
| prompts: list[str] = [] |
| if args.prompt: |
| prompts.append(args.prompt) |
| if args.prompt_file: |
| prompts.extend(read_texts_from_file(Path(args.prompt_file))) |
|
|
| if not prompts and not args.eval_texts_file: |
| prompts = [ |
| "Bonjour, présente-toi en quelques lignes.", |
| "Résume ce texte en français simple : l'intelligence artificielle transforme plusieurs secteurs.", |
| "Écris un petit paragraphe en arabe sur l'éducation.", |
| ] |
|
|
| if prompts: |
| print("\n[GÉNÉRATION]") |
| for i, prompt in enumerate(prompts, start=1): |
| out = generate_text( |
| model=model, |
| tokenizer=tokenizer, |
| prompt=prompt, |
| device=device, |
| dtype=dtype, |
| max_new_tokens=args.max_new_tokens, |
| temperature=args.temperature, |
| top_k=args.top_k, |
| top_p=args.top_p, |
| repetition_penalty=args.repetition_penalty, |
| do_sample=not args.greedy, |
| ) |
| print("-" * 88) |
| print(f"Prompt {i}:") |
| print(prompt) |
| print("\nSortie:") |
| print(out) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|