#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ test_modele_nlp.py ================== Test / inférence pour le modèle GPT custom entraîné par train_nlp_ft_10epochs_8hours.py. Fonctions: - charger automatiquement le tokenizer, la config et le checkpoint - générer du texte à partir d'un prompt - tester plusieurs prompts - évaluer loss / perplexité sur un fichier texte local Exemples: python test_modele_nlp.py \ --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ --prompt "Bonjour, voici un résumé de l'actualité" python test_modele_nlp.py \ --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ --prompt-file prompts.txt \ --max-new-tokens 200 python test_modele_nlp.py \ --model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \ --eval-texts-file eval.txt \ --eval-batch-size 4 """ from __future__ import annotations import argparse import json import math import os from collections import OrderedDict from contextlib import nullcontext from dataclasses import dataclass from pathlib import Path from typing import Optional import torch import torch.nn as nn import torch.nn.functional as F from torch.nn.attention import SDPBackend, sdpa_kernel from transformers import PreTrainedTokenizerFast # ============================================================================= # CONFIG / UTILS # ============================================================================= PAD_TOKEN = "" BOS_TOKEN = "" EOS_TOKEN = "" UNK_TOKEN = "" DTYPE_MAP = { "float16": torch.float16, "bfloat16": torch.bfloat16, "float32": torch.float32, } def normalize_state_dict_keys(sd: dict) -> OrderedDict: out = OrderedDict() for k, v in sd.items(): for prefix in ("module._orig_mod.", "_orig_mod.", "module."): if k.startswith(prefix): k = k[len(prefix):] break out[k] = v return out def get_device(device_arg: str) -> torch.device: if device_arg == "auto": if torch.cuda.is_available(): return torch.device("cuda") return torch.device("cpu") return torch.device(device_arg) def autocast_context(device: torch.device, dtype: torch.dtype): if device.type == "cuda" and dtype in (torch.float16, torch.bfloat16): return torch.autocast(device_type="cuda", dtype=dtype) return nullcontext() def resolve_checkpoint(model_dir: Path, checkpoint_name: Optional[str]) -> Path: if checkpoint_name: ckpt = model_dir / checkpoint_name if not ckpt.exists(): raise FileNotFoundError(f"Checkpoint introuvable: {ckpt}") return ckpt candidates = [ model_dir / "model_best.pt", model_dir / "model.pt", model_dir / "train_state.pt", ] for ckpt in candidates: if ckpt.exists(): return ckpt raise FileNotFoundError( f"Aucun checkpoint trouvé dans {model_dir}. Cherchés: model_best.pt, model.pt, train_state.pt" ) def resolve_tokenizer_dir(model_dir: Path, explicit_tokenizer_dir: Optional[str]) -> Path: if explicit_tokenizer_dir: tok_dir = Path(explicit_tokenizer_dir) if not tok_dir.exists(): raise FileNotFoundError(f"Tokenizer dir introuvable: {tok_dir}") return tok_dir candidates = [ model_dir / "tokenizer_32k", model_dir.parent / "nlp_1b_h100_opt" / "tokenizer_32k", Path("./nlp_1b_h100_opt/tokenizer_32k"), ] for tok_dir in candidates: if (tok_dir / "tokenizer.json").exists(): return tok_dir raise FileNotFoundError( "Tokenizer introuvable. Passe --tokenizer-dir explicitement." ) # ============================================================================= # MODEL # ============================================================================= @dataclass class GPTConfig: vocab_size: int = 32000 block_size: int = 1024 d_model: int = 1536 n_heads: int = 24 n_layers: int = 24 d_ff: int = 6144 dropout: float = 0.0 use_checkpointing: bool = False class RMSNorm(nn.Module): def __init__(self, dim: int, eps: float = 1e-6): super().__init__() self.weight = nn.Parameter(torch.ones(dim)) self.eps = eps def forward(self, x: torch.Tensor) -> torch.Tensor: return self.weight * x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) class RotaryEmbedding(nn.Module): def __init__(self, dim: int, base: int = 10_000, max_seq: int = 16_384): super().__init__() inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim)) t = torch.arange(max_seq).float() freqs = torch.outer(t, inv_freq) self.register_buffer("cos_cache", torch.repeat_interleave(freqs.cos(), 2, dim=-1), persistent=False) self.register_buffer("sin_cache", torch.repeat_interleave(freqs.sin(), 2, dim=-1), persistent=False) def forward(self, seq_len: int, dtype: torch.dtype, device: torch.device): cos = self.cos_cache[:seq_len].to(device=device, dtype=dtype) sin = self.sin_cache[:seq_len].to(device=device, dtype=dtype) return cos, sin def rotate_half(x: torch.Tensor) -> torch.Tensor: x1 = x[..., ::2] x2 = x[..., 1::2] return torch.stack((-x2, x1), dim=-1).flatten(-2) def apply_rope(x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor) -> torch.Tensor: return x * cos.unsqueeze(0).unsqueeze(0) + rotate_half(x) * sin.unsqueeze(0).unsqueeze(0) class CausalSelfAttention(nn.Module): def __init__(self, cfg: GPTConfig): super().__init__() assert cfg.d_model % cfg.n_heads == 0 self.n_heads = cfg.n_heads self.head_dim = cfg.d_model // cfg.n_heads self.qkv = nn.Linear(cfg.d_model, 3 * cfg.d_model, bias=False) self.proj = nn.Linear(cfg.d_model, cfg.d_model, bias=False) self.dropout_p = cfg.dropout self.rope = RotaryEmbedding(self.head_dim) def forward(self, x: torch.Tensor) -> torch.Tensor: b, t, c = x.shape q, k, v = self.qkv(x).split(c, dim=-1) q = q.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) k = k.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) v = v.view(b, t, self.n_heads, self.head_dim).transpose(1, 2) cos, sin = self.rope(t, x.dtype, x.device) q = apply_rope(q, cos, sin) k = apply_rope(k, cos, sin) with sdpa_kernel([SDPBackend.FLASH_ATTENTION, SDPBackend.EFFICIENT_ATTENTION, SDPBackend.MATH]): y = F.scaled_dot_product_attention( q, k, v, dropout_p=0.0, is_causal=True, ) y = y.transpose(1, 2).contiguous().view(b, t, c) return self.proj(y) class SwiGLU(nn.Module): def __init__(self, cfg: GPTConfig): super().__init__() self.w1 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False) self.w2 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False) self.w3 = nn.Linear(cfg.d_ff, cfg.d_model, bias=False) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.w3(F.silu(self.w1(x)) * self.w2(x)) class Block(nn.Module): def __init__(self, cfg: GPTConfig): super().__init__() self.ln1 = RMSNorm(cfg.d_model) self.attn = CausalSelfAttention(cfg) self.ln2 = RMSNorm(cfg.d_model) self.ff = SwiGLU(cfg) def forward(self, x: torch.Tensor) -> torch.Tensor: x = x + self.attn(self.ln1(x)) x = x + self.ff(self.ln2(x)) return x class GPT(nn.Module): def __init__(self, cfg: GPTConfig): super().__init__() self.cfg = cfg self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.d_model) self.blocks = nn.ModuleList([Block(cfg) for _ in range(cfg.n_layers)]) self.ln_f = RMSNorm(cfg.d_model) self.lm_head = nn.Linear(cfg.d_model, cfg.vocab_size, bias=False) self.lm_head.weight = self.tok_emb.weight self.apply(self._init_weights) @staticmethod def _init_weights(m: nn.Module) -> None: if isinstance(m, (nn.Linear, nn.Embedding)): nn.init.normal_(m.weight, mean=0.0, std=0.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.zeros_(m.bias) def forward(self, input_ids: torch.Tensor, labels: Optional[torch.Tensor] = None): x = self.tok_emb(input_ids) for block in self.blocks: x = block(x) logits = self.lm_head(self.ln_f(x)) loss = None if labels is not None: loss = F.cross_entropy( logits.reshape(-1, logits.size(-1)), labels.reshape(-1), ignore_index=-100, ) return logits, loss class LoRALinear(nn.Module): def __init__(self, base_layer: nn.Linear, r: int = 64, alpha: int = 128, dropout: float = 0.05): super().__init__() self.base = base_layer self.r = r self.scale = alpha / max(1, r) in_f, out_f = base_layer.in_features, base_layer.out_features device = base_layer.weight.device dtype = base_layer.weight.dtype self.lora_A = nn.Linear(in_f, r, bias=False, device=device, dtype=dtype) self.lora_B = nn.Linear(r, out_f, bias=False, device=device, dtype=dtype) self.drop = nn.Dropout(dropout) nn.init.kaiming_uniform_(self.lora_A.weight, a=math.sqrt(5)) nn.init.zeros_(self.lora_B.weight) for p in self.base.parameters(): p.requires_grad = False def forward(self, x: torch.Tensor) -> torch.Tensor: return self.base(x) + self.lora_B(self.lora_A(self.drop(x))) * self.scale def apply_qlora_for_loading(model: GPT) -> GPT: targets = [] for name, module in model.named_modules(): if name.split(".")[-1] in {"qkv", "proj", "w1", "w2", "w3"} and isinstance(module, nn.Linear): targets.append((name, module)) for name, module in targets: parts = name.split(".") parent = model for part in parts[:-1]: parent = getattr(parent, part) setattr(parent, parts[-1], LoRALinear(module)) return model def is_lora_state_dict(sd: dict) -> bool: return any(s in k for k in sd.keys() for s in (".lora_A.weight", ".lora_B.weight", ".base.weight")) # ============================================================================= # LOAD # ============================================================================= def load_model_and_tokenizer( model_dir: Path, checkpoint_name: Optional[str], tokenizer_dir: Optional[str], device: torch.device, dtype: torch.dtype, ): ckpt_path = resolve_checkpoint(model_dir, checkpoint_name) tok_dir = resolve_tokenizer_dir(model_dir, tokenizer_dir) tokenizer = PreTrainedTokenizerFast.from_pretrained(str(tok_dir)) ckpt = torch.load(ckpt_path, map_location="cpu") if "config" in ckpt: cfg_dict = ckpt["config"] else: cfg_path = model_dir / "config.json" if not cfg_path.exists(): raise FileNotFoundError("config.json introuvable et aucune config dans le checkpoint.") cfg_dict = json.loads(cfg_path.read_text(encoding="utf-8")) cfg = GPTConfig(**cfg_dict) cfg.vocab_size = len(tokenizer) cfg.use_checkpointing = False model = GPT(cfg) sd = normalize_state_dict_keys(ckpt["model"]) if is_lora_state_dict(sd): model = apply_qlora_for_loading(model) missing, unexpected = model.load_state_dict(sd, strict=False) if missing: print(f"[warn] clés manquantes: {len(missing)}") if unexpected: print(f"[warn] clés inattendues: {len(unexpected)}") model.to(device=device) if device.type == "cuda": model.to(dtype=dtype) model.eval() return model, tokenizer, ckpt_path # ============================================================================= # GENERATION # ============================================================================= @torch.inference_mode() def generate_text( model: GPT, tokenizer: PreTrainedTokenizerFast, prompt: str, device: torch.device, dtype: torch.dtype, max_new_tokens: int = 128, temperature: float = 0.8, top_k: int = 50, top_p: float = 0.95, repetition_penalty: float = 1.05, do_sample: bool = True, ) -> str: if not prompt.strip(): prompt = "Bonjour" input_ids = tokenizer.encode(prompt, add_special_tokens=True) x = torch.tensor([input_ids], dtype=torch.long, device=device) block_size = model.cfg.block_size eos_id = tokenizer.eos_token_id for _ in range(max_new_tokens): x_cond = x[:, -block_size:] with autocast_context(device, dtype): logits, _ = model(x_cond) next_token_logits = logits[:, -1, :] if repetition_penalty != 1.0: unique_tokens = torch.unique(x_cond) next_token_logits[:, unique_tokens] /= repetition_penalty if not do_sample or temperature <= 0: next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True) else: next_token_logits = next_token_logits / temperature if top_k > 0: values, _ = torch.topk(next_token_logits, k=min(top_k, next_token_logits.size(-1)), dim=-1) min_keep = values[:, -1].unsqueeze(-1) next_token_logits = torch.where( next_token_logits < min_keep, torch.full_like(next_token_logits, float("-inf")), next_token_logits, ) if 0.0 < top_p < 1.0: sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True, dim=-1) probs = F.softmax(sorted_logits, dim=-1) cumprobs = torch.cumsum(probs, dim=-1) sorted_mask = cumprobs > top_p sorted_mask[..., 1:] = sorted_mask[..., :-1].clone() sorted_mask[..., 0] = False sorted_logits = sorted_logits.masked_fill(sorted_mask, float("-inf")) next_token_logits = torch.full_like(next_token_logits, float("-inf")) next_token_logits.scatter_(dim=-1, index=sorted_indices, src=sorted_logits) probs = F.softmax(next_token_logits, dim=-1) next_token = torch.multinomial(probs, num_samples=1) x = torch.cat([x, next_token], dim=1) if eos_id is not None and int(next_token.item()) == eos_id: break return tokenizer.decode(x[0].tolist(), skip_special_tokens=True) # ============================================================================= # EVAL # ============================================================================= def read_texts_from_file(path: Path) -> list[str]: raw = path.read_text(encoding="utf-8", errors="ignore") if "\n\n" in raw: chunks = [x.strip() for x in raw.split("\n\n") if x.strip()] else: chunks = [x.strip() for x in raw.splitlines() if x.strip()] return chunks class PackedEvalDataset(torch.utils.data.Dataset): def __init__(self, texts: list[str], tokenizer: PreTrainedTokenizerFast, block_size: int): bos = tokenizer.bos_token_id eos = tokenizer.eos_token_id tokens: list[int] = [] for text in texts: ids = tokenizer.encode(text, add_special_tokens=False) if ids: tokens.extend([bos] + ids + [eos]) self.samples = [] for i in range(0, max(0, len(tokens) - block_size - 1), block_size + 1): chunk = tokens[i: i + block_size + 1] if len(chunk) == block_size + 1: self.samples.append({ "input_ids": torch.tensor(chunk[:-1], dtype=torch.long), "labels": torch.tensor(chunk[1:], dtype=torch.long), }) def __len__(self): return len(self.samples) def __getitem__(self, idx: int): return self.samples[idx] @torch.inference_mode() def evaluate_file( model: GPT, tokenizer: PreTrainedTokenizerFast, eval_texts_file: Path, device: torch.device, dtype: torch.dtype, eval_batch_size: int, ): texts = read_texts_from_file(eval_texts_file) if not texts: raise ValueError(f"Aucun texte exploitable dans {eval_texts_file}") dataset = PackedEvalDataset(texts, tokenizer, model.cfg.block_size) if len(dataset) == 0: raise ValueError("Pas assez de tokens pour former un bloc d'évaluation.") loader = torch.utils.data.DataLoader( dataset, batch_size=eval_batch_size, shuffle=False, num_workers=0, pin_memory=torch.cuda.is_available(), ) loss_sum = 0.0 n_batches = 0 for batch in loader: inp = batch["input_ids"].to(device, non_blocking=True) lbl = batch["labels"].to(device, non_blocking=True) with autocast_context(device, dtype): _, loss = model(inp, lbl) loss_sum += float(loss.item()) n_batches += 1 avg_loss = loss_sum / max(1, n_batches) ppl = math.exp(min(avg_loss, 20.0)) return { "num_texts": len(texts), "num_batches": n_batches, "avg_loss": avg_loss, "perplexity": ppl, } # ============================================================================= # MAIN # ============================================================================= def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Test / génération pour GPT custom NLP.") p.add_argument("--model-dir", type=str, required=True, help="Dossier de sortie du modèle fine-tuné.") p.add_argument("--checkpoint", type=str, default=None, help="Nom du checkpoint dans model-dir (ex: model_best.pt).") p.add_argument("--tokenizer-dir", type=str, default=None, help="Dossier du tokenizer si différent.") p.add_argument("--device", type=str, default="auto", help="auto, cpu, cuda, cuda:0...") p.add_argument("--dtype", type=str, default="bfloat16", choices=["float16", "bfloat16", "float32"]) p.add_argument("--prompt", type=str, default=None, help="Prompt unique à générer.") p.add_argument("--prompt-file", type=str, default=None, help="Fichier texte avec prompts, un par ligne.") p.add_argument("--max-new-tokens", type=int, default=160) p.add_argument("--temperature", type=float, default=0.8) p.add_argument("--top-k", type=int, default=50) p.add_argument("--top-p", type=float, default=0.95) p.add_argument("--repetition-penalty", type=float, default=1.05) p.add_argument("--greedy", action="store_true", help="Désactive le sampling.") p.add_argument("--eval-texts-file", type=str, default=None, help="Fichier texte local pour calculer loss/perplexité.") p.add_argument("--eval-batch-size", type=int, default=4) return p def main() -> None: args = build_parser().parse_args() device = get_device(args.device) dtype = DTYPE_MAP[args.dtype] model_dir = Path(args.model_dir) if not model_dir.exists(): raise FileNotFoundError(f"model-dir introuvable: {model_dir}") if device.type == "cuda": torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.set_float32_matmul_precision("high") model, tokenizer, ckpt_path = load_model_and_tokenizer( model_dir=model_dir, checkpoint_name=args.checkpoint, tokenizer_dir=args.tokenizer_dir, device=device, dtype=dtype, ) print("=" * 88) print("TEST DU MODÈLE GPT CUSTOM") print("=" * 88) print(f"Device : {device}") print(f"DType : {dtype}") print(f"Checkpoint : {ckpt_path}") print(f"Tokenizer vocab: {len(tokenizer)}") print(f"Block size : {model.cfg.block_size}") print(f"Paramètres : {sum(p.numel() for p in model.parameters()) / 1e6:.1f} M") if args.eval_texts_file: stats = evaluate_file( model=model, tokenizer=tokenizer, eval_texts_file=Path(args.eval_texts_file), device=device, dtype=dtype, eval_batch_size=args.eval_batch_size, ) print("\n[ÉVALUATION]") print(json.dumps(stats, indent=2, ensure_ascii=False)) prompts: list[str] = [] if args.prompt: prompts.append(args.prompt) if args.prompt_file: prompts.extend(read_texts_from_file(Path(args.prompt_file))) if not prompts and not args.eval_texts_file: prompts = [ "Bonjour, présente-toi en quelques lignes.", "Résume ce texte en français simple : l'intelligence artificielle transforme plusieurs secteurs.", "Écris un petit paragraphe en arabe sur l'éducation.", ] if prompts: print("\n[GÉNÉRATION]") for i, prompt in enumerate(prompts, start=1): out = generate_text( model=model, tokenizer=tokenizer, prompt=prompt, device=device, dtype=dtype, max_new_tokens=args.max_new_tokens, temperature=args.temperature, top_k=args.top_k, top_p=args.top_p, repetition_penalty=args.repetition_penalty, do_sample=not args.greedy, ) print("-" * 88) print(f"Prompt {i}:") print(prompt) print("\nSortie:") print(out) if __name__ == "__main__": main()