FirstChat / test_modele_nlp.py
Medyassino's picture
Add files using upload-large-folder tool
59dc998 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_modele_nlp.py
==================
Test / inférence pour le modèle GPT custom entraîné par train_nlp_ft_10epochs_8hours.py.
Fonctions:
- charger automatiquement le tokenizer, la config et le checkpoint
- générer du texte à partir d'un prompt
- tester plusieurs prompts
- évaluer loss / perplexité sur un fichier texte local
Exemples:
python test_modele_nlp.py \
--model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \
--prompt "Bonjour, voici un résumé de l'actualité"
python test_modele_nlp.py \
--model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \
--prompt-file prompts.txt \
--max-new-tokens 200
python test_modele_nlp.py \
--model-dir ./nlp_1b_h100_ft_nlp_mix_10ep_8h \
--eval-texts-file eval.txt \
--eval-batch-size 4
"""
from __future__ import annotations
import argparse
import json
import math
import os
from collections import OrderedDict
from contextlib import nullcontext
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.attention import SDPBackend, sdpa_kernel
from transformers import PreTrainedTokenizerFast
# =============================================================================
# CONFIG / UTILS
# =============================================================================
PAD_TOKEN = "<pad>"
BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"
DTYPE_MAP = {
"float16": torch.float16,
"bfloat16": torch.bfloat16,
"float32": torch.float32,
}
def normalize_state_dict_keys(sd: dict) -> OrderedDict:
out = OrderedDict()
for k, v in sd.items():
for prefix in ("module._orig_mod.", "_orig_mod.", "module."):
if k.startswith(prefix):
k = k[len(prefix):]
break
out[k] = v
return out
def get_device(device_arg: str) -> torch.device:
if device_arg == "auto":
if torch.cuda.is_available():
return torch.device("cuda")
return torch.device("cpu")
return torch.device(device_arg)
def autocast_context(device: torch.device, dtype: torch.dtype):
if device.type == "cuda" and dtype in (torch.float16, torch.bfloat16):
return torch.autocast(device_type="cuda", dtype=dtype)
return nullcontext()
def resolve_checkpoint(model_dir: Path, checkpoint_name: Optional[str]) -> Path:
if checkpoint_name:
ckpt = model_dir / checkpoint_name
if not ckpt.exists():
raise FileNotFoundError(f"Checkpoint introuvable: {ckpt}")
return ckpt
candidates = [
model_dir / "model_best.pt",
model_dir / "model.pt",
model_dir / "train_state.pt",
]
for ckpt in candidates:
if ckpt.exists():
return ckpt
raise FileNotFoundError(
f"Aucun checkpoint trouvé dans {model_dir}. Cherchés: model_best.pt, model.pt, train_state.pt"
)
def resolve_tokenizer_dir(model_dir: Path, explicit_tokenizer_dir: Optional[str]) -> Path:
if explicit_tokenizer_dir:
tok_dir = Path(explicit_tokenizer_dir)
if not tok_dir.exists():
raise FileNotFoundError(f"Tokenizer dir introuvable: {tok_dir}")
return tok_dir
candidates = [
model_dir / "tokenizer_32k",
model_dir.parent / "nlp_1b_h100_opt" / "tokenizer_32k",
Path("./nlp_1b_h100_opt/tokenizer_32k"),
]
for tok_dir in candidates:
if (tok_dir / "tokenizer.json").exists():
return tok_dir
raise FileNotFoundError(
"Tokenizer introuvable. Passe --tokenizer-dir explicitement."
)
# =============================================================================
# MODEL
# =============================================================================
@dataclass
class GPTConfig:
vocab_size: int = 32000
block_size: int = 1024
d_model: int = 1536
n_heads: int = 24
n_layers: int = 24
d_ff: int = 6144
dropout: float = 0.0
use_checkpointing: bool = False
class RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.weight = nn.Parameter(torch.ones(dim))
self.eps = eps
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.weight * x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
class RotaryEmbedding(nn.Module):
def __init__(self, dim: int, base: int = 10_000, max_seq: int = 16_384):
super().__init__()
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
t = torch.arange(max_seq).float()
freqs = torch.outer(t, inv_freq)
self.register_buffer("cos_cache", torch.repeat_interleave(freqs.cos(), 2, dim=-1), persistent=False)
self.register_buffer("sin_cache", torch.repeat_interleave(freqs.sin(), 2, dim=-1), persistent=False)
def forward(self, seq_len: int, dtype: torch.dtype, device: torch.device):
cos = self.cos_cache[:seq_len].to(device=device, dtype=dtype)
sin = self.sin_cache[:seq_len].to(device=device, dtype=dtype)
return cos, sin
def rotate_half(x: torch.Tensor) -> torch.Tensor:
x1 = x[..., ::2]
x2 = x[..., 1::2]
return torch.stack((-x2, x1), dim=-1).flatten(-2)
def apply_rope(x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor) -> torch.Tensor:
return x * cos.unsqueeze(0).unsqueeze(0) + rotate_half(x) * sin.unsqueeze(0).unsqueeze(0)
class CausalSelfAttention(nn.Module):
def __init__(self, cfg: GPTConfig):
super().__init__()
assert cfg.d_model % cfg.n_heads == 0
self.n_heads = cfg.n_heads
self.head_dim = cfg.d_model // cfg.n_heads
self.qkv = nn.Linear(cfg.d_model, 3 * cfg.d_model, bias=False)
self.proj = nn.Linear(cfg.d_model, cfg.d_model, bias=False)
self.dropout_p = cfg.dropout
self.rope = RotaryEmbedding(self.head_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
b, t, c = x.shape
q, k, v = self.qkv(x).split(c, dim=-1)
q = q.view(b, t, self.n_heads, self.head_dim).transpose(1, 2)
k = k.view(b, t, self.n_heads, self.head_dim).transpose(1, 2)
v = v.view(b, t, self.n_heads, self.head_dim).transpose(1, 2)
cos, sin = self.rope(t, x.dtype, x.device)
q = apply_rope(q, cos, sin)
k = apply_rope(k, cos, sin)
with sdpa_kernel([SDPBackend.FLASH_ATTENTION, SDPBackend.EFFICIENT_ATTENTION, SDPBackend.MATH]):
y = F.scaled_dot_product_attention(
q,
k,
v,
dropout_p=0.0,
is_causal=True,
)
y = y.transpose(1, 2).contiguous().view(b, t, c)
return self.proj(y)
class SwiGLU(nn.Module):
def __init__(self, cfg: GPTConfig):
super().__init__()
self.w1 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False)
self.w2 = nn.Linear(cfg.d_model, cfg.d_ff, bias=False)
self.w3 = nn.Linear(cfg.d_ff, cfg.d_model, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.w3(F.silu(self.w1(x)) * self.w2(x))
class Block(nn.Module):
def __init__(self, cfg: GPTConfig):
super().__init__()
self.ln1 = RMSNorm(cfg.d_model)
self.attn = CausalSelfAttention(cfg)
self.ln2 = RMSNorm(cfg.d_model)
self.ff = SwiGLU(cfg)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x + self.attn(self.ln1(x))
x = x + self.ff(self.ln2(x))
return x
class GPT(nn.Module):
def __init__(self, cfg: GPTConfig):
super().__init__()
self.cfg = cfg
self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.d_model)
self.blocks = nn.ModuleList([Block(cfg) for _ in range(cfg.n_layers)])
self.ln_f = RMSNorm(cfg.d_model)
self.lm_head = nn.Linear(cfg.d_model, cfg.vocab_size, bias=False)
self.lm_head.weight = self.tok_emb.weight
self.apply(self._init_weights)
@staticmethod
def _init_weights(m: nn.Module) -> None:
if isinstance(m, (nn.Linear, nn.Embedding)):
nn.init.normal_(m.weight, mean=0.0, std=0.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, input_ids: torch.Tensor, labels: Optional[torch.Tensor] = None):
x = self.tok_emb(input_ids)
for block in self.blocks:
x = block(x)
logits = self.lm_head(self.ln_f(x))
loss = None
if labels is not None:
loss = F.cross_entropy(
logits.reshape(-1, logits.size(-1)),
labels.reshape(-1),
ignore_index=-100,
)
return logits, loss
class LoRALinear(nn.Module):
def __init__(self, base_layer: nn.Linear, r: int = 64, alpha: int = 128, dropout: float = 0.05):
super().__init__()
self.base = base_layer
self.r = r
self.scale = alpha / max(1, r)
in_f, out_f = base_layer.in_features, base_layer.out_features
device = base_layer.weight.device
dtype = base_layer.weight.dtype
self.lora_A = nn.Linear(in_f, r, bias=False, device=device, dtype=dtype)
self.lora_B = nn.Linear(r, out_f, bias=False, device=device, dtype=dtype)
self.drop = nn.Dropout(dropout)
nn.init.kaiming_uniform_(self.lora_A.weight, a=math.sqrt(5))
nn.init.zeros_(self.lora_B.weight)
for p in self.base.parameters():
p.requires_grad = False
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.base(x) + self.lora_B(self.lora_A(self.drop(x))) * self.scale
def apply_qlora_for_loading(model: GPT) -> GPT:
targets = []
for name, module in model.named_modules():
if name.split(".")[-1] in {"qkv", "proj", "w1", "w2", "w3"} and isinstance(module, nn.Linear):
targets.append((name, module))
for name, module in targets:
parts = name.split(".")
parent = model
for part in parts[:-1]:
parent = getattr(parent, part)
setattr(parent, parts[-1], LoRALinear(module))
return model
def is_lora_state_dict(sd: dict) -> bool:
return any(s in k for k in sd.keys() for s in (".lora_A.weight", ".lora_B.weight", ".base.weight"))
# =============================================================================
# LOAD
# =============================================================================
def load_model_and_tokenizer(
model_dir: Path,
checkpoint_name: Optional[str],
tokenizer_dir: Optional[str],
device: torch.device,
dtype: torch.dtype,
):
ckpt_path = resolve_checkpoint(model_dir, checkpoint_name)
tok_dir = resolve_tokenizer_dir(model_dir, tokenizer_dir)
tokenizer = PreTrainedTokenizerFast.from_pretrained(str(tok_dir))
ckpt = torch.load(ckpt_path, map_location="cpu")
if "config" in ckpt:
cfg_dict = ckpt["config"]
else:
cfg_path = model_dir / "config.json"
if not cfg_path.exists():
raise FileNotFoundError("config.json introuvable et aucune config dans le checkpoint.")
cfg_dict = json.loads(cfg_path.read_text(encoding="utf-8"))
cfg = GPTConfig(**cfg_dict)
cfg.vocab_size = len(tokenizer)
cfg.use_checkpointing = False
model = GPT(cfg)
sd = normalize_state_dict_keys(ckpt["model"])
if is_lora_state_dict(sd):
model = apply_qlora_for_loading(model)
missing, unexpected = model.load_state_dict(sd, strict=False)
if missing:
print(f"[warn] clés manquantes: {len(missing)}")
if unexpected:
print(f"[warn] clés inattendues: {len(unexpected)}")
model.to(device=device)
if device.type == "cuda":
model.to(dtype=dtype)
model.eval()
return model, tokenizer, ckpt_path
# =============================================================================
# GENERATION
# =============================================================================
@torch.inference_mode()
def generate_text(
model: GPT,
tokenizer: PreTrainedTokenizerFast,
prompt: str,
device: torch.device,
dtype: torch.dtype,
max_new_tokens: int = 128,
temperature: float = 0.8,
top_k: int = 50,
top_p: float = 0.95,
repetition_penalty: float = 1.05,
do_sample: bool = True,
) -> str:
if not prompt.strip():
prompt = "Bonjour"
input_ids = tokenizer.encode(prompt, add_special_tokens=True)
x = torch.tensor([input_ids], dtype=torch.long, device=device)
block_size = model.cfg.block_size
eos_id = tokenizer.eos_token_id
for _ in range(max_new_tokens):
x_cond = x[:, -block_size:]
with autocast_context(device, dtype):
logits, _ = model(x_cond)
next_token_logits = logits[:, -1, :]
if repetition_penalty != 1.0:
unique_tokens = torch.unique(x_cond)
next_token_logits[:, unique_tokens] /= repetition_penalty
if not do_sample or temperature <= 0:
next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)
else:
next_token_logits = next_token_logits / temperature
if top_k > 0:
values, _ = torch.topk(next_token_logits, k=min(top_k, next_token_logits.size(-1)), dim=-1)
min_keep = values[:, -1].unsqueeze(-1)
next_token_logits = torch.where(
next_token_logits < min_keep,
torch.full_like(next_token_logits, float("-inf")),
next_token_logits,
)
if 0.0 < top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True, dim=-1)
probs = F.softmax(sorted_logits, dim=-1)
cumprobs = torch.cumsum(probs, dim=-1)
sorted_mask = cumprobs > top_p
sorted_mask[..., 1:] = sorted_mask[..., :-1].clone()
sorted_mask[..., 0] = False
sorted_logits = sorted_logits.masked_fill(sorted_mask, float("-inf"))
next_token_logits = torch.full_like(next_token_logits, float("-inf"))
next_token_logits.scatter_(dim=-1, index=sorted_indices, src=sorted_logits)
probs = F.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
x = torch.cat([x, next_token], dim=1)
if eos_id is not None and int(next_token.item()) == eos_id:
break
return tokenizer.decode(x[0].tolist(), skip_special_tokens=True)
# =============================================================================
# EVAL
# =============================================================================
def read_texts_from_file(path: Path) -> list[str]:
raw = path.read_text(encoding="utf-8", errors="ignore")
if "\n\n" in raw:
chunks = [x.strip() for x in raw.split("\n\n") if x.strip()]
else:
chunks = [x.strip() for x in raw.splitlines() if x.strip()]
return chunks
class PackedEvalDataset(torch.utils.data.Dataset):
def __init__(self, texts: list[str], tokenizer: PreTrainedTokenizerFast, block_size: int):
bos = tokenizer.bos_token_id
eos = tokenizer.eos_token_id
tokens: list[int] = []
for text in texts:
ids = tokenizer.encode(text, add_special_tokens=False)
if ids:
tokens.extend([bos] + ids + [eos])
self.samples = []
for i in range(0, max(0, len(tokens) - block_size - 1), block_size + 1):
chunk = tokens[i: i + block_size + 1]
if len(chunk) == block_size + 1:
self.samples.append({
"input_ids": torch.tensor(chunk[:-1], dtype=torch.long),
"labels": torch.tensor(chunk[1:], dtype=torch.long),
})
def __len__(self):
return len(self.samples)
def __getitem__(self, idx: int):
return self.samples[idx]
@torch.inference_mode()
def evaluate_file(
model: GPT,
tokenizer: PreTrainedTokenizerFast,
eval_texts_file: Path,
device: torch.device,
dtype: torch.dtype,
eval_batch_size: int,
):
texts = read_texts_from_file(eval_texts_file)
if not texts:
raise ValueError(f"Aucun texte exploitable dans {eval_texts_file}")
dataset = PackedEvalDataset(texts, tokenizer, model.cfg.block_size)
if len(dataset) == 0:
raise ValueError("Pas assez de tokens pour former un bloc d'évaluation.")
loader = torch.utils.data.DataLoader(
dataset,
batch_size=eval_batch_size,
shuffle=False,
num_workers=0,
pin_memory=torch.cuda.is_available(),
)
loss_sum = 0.0
n_batches = 0
for batch in loader:
inp = batch["input_ids"].to(device, non_blocking=True)
lbl = batch["labels"].to(device, non_blocking=True)
with autocast_context(device, dtype):
_, loss = model(inp, lbl)
loss_sum += float(loss.item())
n_batches += 1
avg_loss = loss_sum / max(1, n_batches)
ppl = math.exp(min(avg_loss, 20.0))
return {
"num_texts": len(texts),
"num_batches": n_batches,
"avg_loss": avg_loss,
"perplexity": ppl,
}
# =============================================================================
# MAIN
# =============================================================================
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Test / génération pour GPT custom NLP.")
p.add_argument("--model-dir", type=str, required=True, help="Dossier de sortie du modèle fine-tuné.")
p.add_argument("--checkpoint", type=str, default=None, help="Nom du checkpoint dans model-dir (ex: model_best.pt).")
p.add_argument("--tokenizer-dir", type=str, default=None, help="Dossier du tokenizer si différent.")
p.add_argument("--device", type=str, default="auto", help="auto, cpu, cuda, cuda:0...")
p.add_argument("--dtype", type=str, default="bfloat16", choices=["float16", "bfloat16", "float32"])
p.add_argument("--prompt", type=str, default=None, help="Prompt unique à générer.")
p.add_argument("--prompt-file", type=str, default=None, help="Fichier texte avec prompts, un par ligne.")
p.add_argument("--max-new-tokens", type=int, default=160)
p.add_argument("--temperature", type=float, default=0.8)
p.add_argument("--top-k", type=int, default=50)
p.add_argument("--top-p", type=float, default=0.95)
p.add_argument("--repetition-penalty", type=float, default=1.05)
p.add_argument("--greedy", action="store_true", help="Désactive le sampling.")
p.add_argument("--eval-texts-file", type=str, default=None, help="Fichier texte local pour calculer loss/perplexité.")
p.add_argument("--eval-batch-size", type=int, default=4)
return p
def main() -> None:
args = build_parser().parse_args()
device = get_device(args.device)
dtype = DTYPE_MAP[args.dtype]
model_dir = Path(args.model_dir)
if not model_dir.exists():
raise FileNotFoundError(f"model-dir introuvable: {model_dir}")
if device.type == "cuda":
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.set_float32_matmul_precision("high")
model, tokenizer, ckpt_path = load_model_and_tokenizer(
model_dir=model_dir,
checkpoint_name=args.checkpoint,
tokenizer_dir=args.tokenizer_dir,
device=device,
dtype=dtype,
)
print("=" * 88)
print("TEST DU MODÈLE GPT CUSTOM")
print("=" * 88)
print(f"Device : {device}")
print(f"DType : {dtype}")
print(f"Checkpoint : {ckpt_path}")
print(f"Tokenizer vocab: {len(tokenizer)}")
print(f"Block size : {model.cfg.block_size}")
print(f"Paramètres : {sum(p.numel() for p in model.parameters()) / 1e6:.1f} M")
if args.eval_texts_file:
stats = evaluate_file(
model=model,
tokenizer=tokenizer,
eval_texts_file=Path(args.eval_texts_file),
device=device,
dtype=dtype,
eval_batch_size=args.eval_batch_size,
)
print("\n[ÉVALUATION]")
print(json.dumps(stats, indent=2, ensure_ascii=False))
prompts: list[str] = []
if args.prompt:
prompts.append(args.prompt)
if args.prompt_file:
prompts.extend(read_texts_from_file(Path(args.prompt_file)))
if not prompts and not args.eval_texts_file:
prompts = [
"Bonjour, présente-toi en quelques lignes.",
"Résume ce texte en français simple : l'intelligence artificielle transforme plusieurs secteurs.",
"Écris un petit paragraphe en arabe sur l'éducation.",
]
if prompts:
print("\n[GÉNÉRATION]")
for i, prompt in enumerate(prompts, start=1):
out = generate_text(
model=model,
tokenizer=tokenizer,
prompt=prompt,
device=device,
dtype=dtype,
max_new_tokens=args.max_new_tokens,
temperature=args.temperature,
top_k=args.top_k,
top_p=args.top_p,
repetition_penalty=args.repetition_penalty,
do_sample=not args.greedy,
)
print("-" * 88)
print(f"Prompt {i}:")
print(prompt)
print("\nSortie:")
print(out)
if __name__ == "__main__":
main()