Spaces:
Running
Running
| """ | |
| inference.py | |
| ============ | |
| Correct D3PM inference for Sanskrit paraphrase generation. | |
| The model's forward() takes CLEAN tgt and noises it internally. | |
| So inference passes x0_estimate (starting all-[MASK]) as tgt each step, | |
| letting the model noise it and then predict a cleaner version. | |
| Also includes: robust checkpoint loading (auto-detects architecture | |
| from saved weights — no CONFIG mismatch crashes). | |
| """ | |
| import json | |
| import torch | |
| import os, sys | |
| import re | |
| from tqdm import tqdm | |
| from torch.utils.data import DataLoader, Subset | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| from config import CONFIG | |
| # ── Checkpoint loader ───────────────────────────────────────────────── | |
| def _resolve_device(cfg_device: str) -> torch.device: | |
| cfg_device = (cfg_device or "").lower() | |
| if cfg_device == "cuda" and torch.cuda.is_available(): | |
| return torch.device("cuda") | |
| if cfg_device == "mps" and torch.backends.mps.is_available(): | |
| return torch.device("mps") | |
| if cfg_device in {"cpu", "cuda", "mps"}: | |
| return torch.device("cpu") | |
| if torch.cuda.is_available(): | |
| return torch.device("cuda") | |
| if torch.backends.mps.is_available(): | |
| return torch.device("mps") | |
| return torch.device("cpu") | |
| def load_model(ckpt_path: str, base_cfg: dict, device: torch.device): | |
| """ | |
| Auto-detect architecture from checkpoint weight shapes, | |
| then load. Never fails due to CONFIG vs checkpoint mismatch. | |
| """ | |
| import copy | |
| from model.sanskrit_model import SanskritModel | |
| cfg = copy.deepcopy(base_cfg) | |
| state = torch.load(ckpt_path, map_location='cpu') | |
| # d_model + vocab_size | |
| ek = 'model.src_embed.token_emb.weight' | |
| if ek in state: | |
| vocab, d = state[ek].shape | |
| cfg['model']['vocab_size'] = vocab | |
| cfg['model']['d_model'] = d | |
| cfg['model']['d_ff'] = d * 4 | |
| # n_layers | |
| ids = {int(k.split('.')[2]) for k in state if k.startswith('model.encoder_blocks.')} | |
| if ids: | |
| cfg['model']['n_layers'] = max(ids) + 1 | |
| # max_seq_len | |
| pk = 'model.src_embed.pos_enc.pe' | |
| if pk in state: | |
| cfg['model']['max_seq_len'] = state[pk].shape[1] | |
| # n_heads | |
| d = cfg['model']['d_model'] | |
| h = cfg['model'].get('n_heads', 6) | |
| if d % h != 0: | |
| h = next(x for x in [8, 6, 4, 2, 1] if d % x == 0) | |
| cfg['model']['n_heads'] = h | |
| print(f"🔍 Detected: d_model={cfg['model']['d_model']}, " | |
| f"n_layers={cfg['model']['n_layers']}, " | |
| f"max_seq_len={cfg['model']['max_seq_len']}, " | |
| f"n_heads={cfg['model']['n_heads']}") | |
| model = SanskritModel(cfg).to(device) | |
| raw_state = torch.load(ckpt_path, map_location=device) | |
| model_state = model.state_dict() | |
| filtered_state = {} | |
| skipped_mismatch = [] | |
| for k, v in raw_state.items(): | |
| if k in model_state and hasattr(v, "shape") and hasattr(model_state[k], "shape"): | |
| if tuple(v.shape) != tuple(model_state[k].shape): | |
| skipped_mismatch.append((k, tuple(v.shape), tuple(model_state[k].shape))) | |
| continue | |
| filtered_state[k] = v | |
| missing, unexpected = model.load_state_dict(filtered_state, strict=False) | |
| # hint_gate may be absent in older checkpoints — initialise safely | |
| allowed = {'model.hint_gate.0.weight', 'model.hint_gate.0.bias'} | |
| real_missing = [k for k in missing if k not in allowed] | |
| if real_missing: | |
| print(f"⚠️ Missing keys: {real_missing[:3]} …") | |
| if unexpected: | |
| print(f"⚠️ Unexpected keys: {unexpected[:3]} …") | |
| if skipped_mismatch: | |
| print(f"⚠️ Shape-mismatched keys skipped: {len(skipped_mismatch)}") | |
| # Enable compact-attention branch only when checkpoint actually provides it. | |
| has_compact = any(".compact_out_proj.weight" in k for k in filtered_state.keys()) | |
| if has_compact and hasattr(model, "model") and hasattr(model.model, "decoder_blocks"): | |
| for block in model.model.decoder_blocks: | |
| if hasattr(block, "cross_attn") and hasattr(block.cross_attn, "use_compact"): | |
| block.cross_attn.use_compact = True | |
| print("ℹ️ Compact cross-attention branch enabled from checkpoint.") | |
| if hasattr(model.model, 'hint_gate') and 'model.hint_gate.0.weight' in missing: | |
| with torch.no_grad(): | |
| w = model.model.hint_gate[0].weight | |
| torch.nn.init.zeros_(model.model.hint_gate[0].bias) | |
| torch.nn.init.eye_(w) if w.shape[0] == w.shape[1] \ | |
| else torch.nn.init.xavier_uniform_(w) | |
| print("ℹ️ hint_gate initialised to identity (not in checkpoint).") | |
| print("✅ Model loaded.") | |
| return model, cfg | |
| # ── Core inference function (same path as validation) ──────────────── | |
| def run_inference(model, input_ids, cfg): | |
| """ | |
| Reverse diffusion sampling (clean path). | |
| Uses cached reverse diffusion when available, otherwise model.generate(). | |
| """ | |
| inf = cfg['inference'] | |
| model.eval() | |
| kwargs = dict( | |
| num_steps=inf['num_steps'], | |
| temperature=inf['temperature'], | |
| top_k=inf['top_k'], | |
| repetition_penalty=inf.get('repetition_penalty', 1.2), | |
| diversity_penalty=inf.get('diversity_penalty', 0.0), | |
| ) | |
| if hasattr(model, "generate_cached"): | |
| out = model.generate_cached(input_ids, **kwargs) | |
| else: | |
| out = model.generate(input_ids, **kwargs) | |
| # Optional retry with stronger anti-repetition settings. | |
| if inf.get("auto_retry_on_repetition", True): | |
| repeat_threshold = float(inf.get("repeat_ratio_threshold", 0.40)) | |
| max_repeat_run = int(inf.get("max_repeat_run", 4)) | |
| if _mean_repeat_ratio(out) >= repeat_threshold: | |
| retry_kwargs = dict(kwargs) | |
| retry_kwargs["temperature"] = max(0.6, float(kwargs["temperature"]) - 0.1) | |
| retry_kwargs["top_k"] = max(20, int(kwargs["top_k"]) - 10) | |
| retry_kwargs["repetition_penalty"] = max(float(kwargs["repetition_penalty"]), 1.6) | |
| retry_kwargs["diversity_penalty"] = max(float(kwargs["diversity_penalty"]), 0.3) | |
| if hasattr(model, "generate_cached"): | |
| retry = model.generate_cached(input_ids, **retry_kwargs) | |
| else: | |
| retry = model.generate(input_ids, **retry_kwargs) | |
| if _mean_repeat_ratio(retry) < _mean_repeat_ratio(out): | |
| out = retry | |
| out = _dedup_repeated_ids(out, max_repeat_run=max_repeat_run) | |
| return out | |
| def _mean_repeat_ratio(ids_tensor: torch.Tensor) -> float: | |
| if ids_tensor is None or ids_tensor.numel() == 0: | |
| return 0.0 | |
| ratios = [] | |
| for row in ids_tensor: | |
| ids = [int(x) for x in row.tolist() if int(x) > 4] | |
| if len(ids) < 2: | |
| ratios.append(0.0) | |
| continue | |
| repeats = sum(1 for i in range(1, len(ids)) if ids[i] == ids[i - 1]) | |
| ratios.append(repeats / max(1, len(ids) - 1)) | |
| return float(sum(ratios) / max(1, len(ratios))) | |
| def _dedup_repeated_ids(ids_tensor: torch.Tensor, max_repeat_run: int = 4) -> torch.Tensor: | |
| """ | |
| Keep generation path unchanged, but clean extreme run-on token loops in final output ids. | |
| """ | |
| if ids_tensor is None or ids_tensor.numel() == 0: | |
| return ids_tensor | |
| cleaned_rows = [] | |
| for row in ids_tensor.tolist(): | |
| out = [] | |
| prev = None | |
| run = 0 | |
| for tok in row: | |
| if tok <= 4: | |
| out.append(tok) | |
| prev = tok | |
| run = 1 | |
| continue | |
| if tok == prev: | |
| run += 1 | |
| if run > max_repeat_run: | |
| continue | |
| else: | |
| run = 1 | |
| out.append(tok) | |
| prev = tok | |
| # Preserve original length for downstream decode assumptions. | |
| if len(out) < len(row): | |
| out.extend([1] * (len(row) - len(out))) | |
| else: | |
| out = out[:len(row)] | |
| cleaned_rows.append(out) | |
| return torch.tensor(cleaned_rows, dtype=ids_tensor.dtype, device=ids_tensor.device) | |
| def _decode_clean(tgt_tok, ids): | |
| out = [] | |
| for x in ids: | |
| if x in (1, 4) and out: | |
| break | |
| if x > 4: | |
| out.append(x) | |
| text = tgt_tok.decode(out).strip() | |
| return _clean_repetition_text(text) | |
| def _clean_repetition_text(text: str, max_repeat_run: int = 3) -> str: | |
| words = [w for w in text.split() if w.strip()] | |
| if not words: | |
| return text.strip() | |
| cleaned = [] | |
| prev = None | |
| run = 0 | |
| for w in words: | |
| if w == prev: | |
| run += 1 | |
| if run > max_repeat_run: | |
| continue | |
| else: | |
| run = 1 | |
| cleaned.append(w) | |
| prev = w | |
| return " ".join(cleaned).strip() | |
| # ── Cleanup heuristics from UI inference pipeline ───────────────────── | |
| _IAST_VOWELS = [ | |
| ("ai", "ऐ"), ("au", "औ"), | |
| ("ā", "आ"), ("ī", "ई"), ("ū", "ऊ"), | |
| ("ṛ", "ऋ"), ("ṝ", "ॠ"), ("ḷ", "ऌ"), ("ḹ", "ॡ"), | |
| ("a", "अ"), ("i", "इ"), ("u", "उ"), | |
| ("e", "ए"), ("o", "ओ"), | |
| ] | |
| _IAST_MATRAS = [ | |
| ("ai", "ै"), ("au", "ौ"), | |
| ("ā", "ा"), ("ī", "ी"), ("ū", "ू"), | |
| ("ṛ", "ृ"), ("ṝ", "ॄ"), ("ḷ", "ॢ"), ("ḹ", "ॣ"), | |
| ("a", ""), ("i", "ि"), ("u", "ु"), | |
| ("e", "े"), ("o", "ो"), | |
| ] | |
| _IAST_CONS = [ | |
| ("kṣ", "क्ष"), ("jñ", "ज्ञ"), ("tr", "त्र"), | |
| ("kh", "ख"), ("gh", "घ"), ("ch", "छ"), ("jh", "झ"), | |
| ("ṭh", "ठ"), ("ḍh", "ढ"), ("th", "थ"), ("dh", "ध"), | |
| ("ph", "फ"), ("bh", "भ"), | |
| ("ṅ", "ङ"), ("ñ", "ञ"), ("ṭ", "ट"), ("ḍ", "ड"), | |
| ("ṇ", "ण"), ("ś", "श"), ("ṣ", "ष"), ("ḥ", "ः"), | |
| ("ṃ", "ं"), ("ṁ", "ं"), | |
| ("y", "य"), ("r", "र"), ("l", "ल"), ("v", "व"), | |
| ("s", "स"), ("h", "ह"), | |
| ("k", "क"), ("g", "ग"), ("c", "च"), ("j", "ज"), | |
| ("t", "त"), ("d", "द"), ("n", "न"), | |
| ("p", "प"), ("b", "ब"), ("m", "म"), | |
| ] | |
| _PUNCT = {".": "।", "|": "।", "||": "॥", ",": ",", "?": "?", "!": "!"} | |
| def _iast_to_deva(text: str) -> str: | |
| s = (text or "").lower() | |
| out = [] | |
| i = 0 | |
| pending_consonant = False | |
| def _match_any(pairs, pos): | |
| for k, v in pairs: | |
| if s.startswith(k, pos): | |
| return k, v | |
| return None, None | |
| while i < len(s): | |
| if s[i].isspace(): | |
| pending_consonant = False | |
| out.append(s[i]) | |
| i += 1 | |
| continue | |
| if s[i:i+2] == "||": | |
| pending_consonant = False | |
| out.append(_PUNCT["||"]) | |
| i += 2 | |
| continue | |
| if s[i] in _PUNCT: | |
| pending_consonant = False | |
| out.append(_PUNCT[s[i]]) | |
| i += 1 | |
| continue | |
| v_key, v_deva = _match_any(_IAST_VOWELS, i) | |
| if v_key: | |
| if pending_consonant: | |
| _, v_matra = _match_any(_IAST_MATRAS, i) | |
| out[-1] = out[-1] + (v_matra or "") | |
| pending_consonant = False | |
| else: | |
| out.append(v_deva) | |
| i += len(v_key) | |
| continue | |
| c_key, c_deva = _match_any(_IAST_CONS, i) | |
| if c_key: | |
| if pending_consonant: | |
| out[-1] = out[-1] + "्" | |
| out.append(c_deva) | |
| pending_consonant = True | |
| i += len(c_key) | |
| continue | |
| out.append(s[i]) | |
| pending_consonant = False | |
| i += 1 | |
| return "".join(out).strip() | |
| def _compute_cer(pred: str, ref: str) -> float: | |
| if pred == ref: | |
| return 0.0 | |
| if not pred or not ref: | |
| return 1.0 | |
| m, n = len(pred), len(ref) | |
| dp = list(range(n + 1)) | |
| for i in range(1, m + 1): | |
| prev = dp[0] | |
| dp[0] = i | |
| for j in range(1, n + 1): | |
| temp = dp[j] | |
| cost = 0 if pred[i - 1] == ref[j - 1] else 1 | |
| dp[j] = min(dp[j] + 1, dp[j - 1] + 1, prev + cost) | |
| prev = temp | |
| return dp[n] / max(m, n) | |
| def _cleanup_thresholds(temperature: float, top_k: int): | |
| temp = float(temperature) | |
| k = max(1, int(top_k)) | |
| t_norm = max(0.0, min((temp - 0.4) / 0.6, 1.0)) | |
| k_norm = max(0.0, min((k - 20) / 80.0, 1.0)) | |
| diversity = 0.6 * t_norm + 0.4 * k_norm | |
| cer_threshold = 0.10 + 0.18 * diversity | |
| deva_ratio_threshold = 0.60 - 0.20 * diversity | |
| return cer_threshold, deva_ratio_threshold | |
| def _decode_with_cleanup(tgt_tok, ids, src_text: str, inf_cfg: dict): | |
| model_out = _decode_clean(tgt_tok, ids) | |
| rule_out = _iast_to_deva(src_text.strip()) | |
| deva_chars = sum(1 for ch in model_out if "\u0900" <= ch <= "\u097F") | |
| deva_ratio = deva_chars / max(1, len(model_out)) | |
| cer = _compute_cer(model_out, rule_out) | |
| cer_thr, ratio_thr = _cleanup_thresholds( | |
| inf_cfg.get("temperature", 0.8), | |
| inf_cfg.get("top_k", 40), | |
| ) | |
| if deva_ratio < ratio_thr or len(model_out) > 2.0 * max(1, len(rule_out)) or cer > cer_thr: | |
| return rule_out | |
| return model_out | |
| # ── Interactive demo ────────────────────────────────────────────────── | |
| def interactive_demo(checkpoint=None, single_text=None): | |
| from model.tokenizer import SanskritSourceTokenizer, SanskritTargetTokenizer | |
| cfg = CONFIG | |
| device = _resolve_device(cfg['training'].get('device', 'cpu')) | |
| model_name = cfg['model_type'] | |
| has_neg = cfg['data']['include_negative_examples'] | |
| ckpt = checkpoint or f"results/{model_name}_neg_{has_neg}/best_model.pt" | |
| if not os.path.exists(ckpt): | |
| raise FileNotFoundError(f"No checkpoint at {ckpt} — train first.") | |
| model, cfg = load_model(ckpt, cfg, device) | |
| model.eval() | |
| src_tok = SanskritSourceTokenizer( | |
| vocab_size=cfg['model'].get('src_vocab_size', 16000), | |
| max_len=cfg['model']['max_seq_len'], | |
| ) | |
| tgt_tok = SanskritTargetTokenizer( | |
| vocab_size=cfg['model'].get('tgt_vocab_size', 16000), | |
| max_len=cfg['model']['max_seq_len'], | |
| ) | |
| print("\n" + "="*55) | |
| print("Sanskrit D3PM Paraphrase — type verse, get paraphrase") | |
| print("="*55 + "\n") | |
| while True: | |
| try: | |
| text = (single_text if single_text is not None else input("INPUT > ")).strip() | |
| except (EOFError, KeyboardInterrupt): | |
| break | |
| if not text or text.lower() in ('quit', 'exit', 'q'): | |
| break | |
| ids = torch.tensor( | |
| [src_tok.encode(text)[:cfg['model']['max_seq_len']]], | |
| dtype=torch.long, device=device | |
| ) | |
| out = run_inference(model, ids, cfg) | |
| cleaned = _decode_with_cleanup(tgt_tok, out[0].tolist(), text, cfg["inference"]) | |
| print(f"PARAPHRASE → {cleaned}\n") | |
| if single_text is not None: | |
| break | |
| # ── Batch evaluation ────────────────────────────────────────────────── | |
| def batch_evaluate(sample_size=500, checkpoint=None): | |
| from data.dataset import OptimizedSanskritDataset | |
| from model.tokenizer import SanskritSourceTokenizer, SanskritTargetTokenizer | |
| cfg = CONFIG | |
| device = _resolve_device(cfg['training'].get('device', 'cpu')) | |
| model_name = cfg['model_type'] | |
| has_neg = cfg['data']['include_negative_examples'] | |
| exp_dir = f"results/{model_name}_neg_{has_neg}" | |
| ckpt = checkpoint or f"{exp_dir}/best_model.pt" | |
| if not os.path.exists(ckpt): | |
| raise FileNotFoundError(f"No checkpoint at {ckpt}") | |
| model, cfg = load_model(ckpt, cfg, device) | |
| model.eval() | |
| src_tok = SanskritSourceTokenizer( | |
| vocab_size=cfg['model'].get('src_vocab_size', 16000), | |
| max_len=cfg['model']['max_seq_len'], | |
| ) | |
| tgt_tok = SanskritTargetTokenizer( | |
| vocab_size=cfg['model'].get('tgt_vocab_size', 16000), | |
| max_len=cfg['model']['max_seq_len'], | |
| ) | |
| def collate(batch): | |
| return { | |
| 'input_ids': torch.stack([b['input_ids'].long() for b in batch]), | |
| 'target_text': [b['target_text'] for b in batch], | |
| 'input_text': [b['input_text'] for b in batch], | |
| } | |
| dataset = OptimizedSanskritDataset( | |
| split='test', | |
| max_len=cfg['model']['max_seq_len'], | |
| cfg=cfg, | |
| src_tokenizer=src_tok, | |
| tgt_tokenizer=tgt_tok, | |
| ) | |
| indices = list(range(min(sample_size, len(dataset)))) | |
| loader = DataLoader( | |
| Subset(dataset, indices), | |
| batch_size=cfg['training']['batch_size'], | |
| shuffle=False, collate_fn=collate | |
| ) | |
| all_preds, all_refs, all_inputs = [], [], [] | |
| print(f"⏳ Generating {len(indices)} paraphrases …") | |
| for batch in tqdm(loader): | |
| ids = batch['input_ids'].to(device) | |
| out = run_inference(model, ids, cfg) | |
| for i in range(out.size(0)): | |
| all_preds.append(_decode_with_cleanup( | |
| tgt_tok, out[i].tolist(), batch['input_text'][i], cfg["inference"] | |
| )) | |
| all_refs.append(batch['target_text'][i].strip()) | |
| all_inputs.append(batch['input_text'][i].strip()) | |
| # Metrics | |
| bleu_score, bert_f1 = 0.0, 0.0 | |
| try: | |
| from nltk.translate.bleu_score import corpus_bleu | |
| bleu_score = corpus_bleu( | |
| [[r.split()] for r in all_refs], | |
| [p.split() for p in all_preds] | |
| ) | |
| except Exception: | |
| pass | |
| try: | |
| import evaluate as hf_eval | |
| res = hf_eval.load('bertscore').compute( | |
| predictions=all_preds, references=all_refs, lang='hi' | |
| ) | |
| bert_f1 = sum(res['f1']) / len(res['f1']) | |
| except Exception: | |
| pass | |
| # Save | |
| out_path = f"{exp_dir}/evaluation_results.txt" | |
| pred_path = f"{exp_dir}/evaluation_predictions.jsonl" | |
| with open(out_path, 'w', encoding='utf-8') as f: | |
| f.write(f"Model : {model_name}\n") | |
| f.write(f"Negatives: {has_neg}\n") | |
| f.write(f"Steps : {cfg['inference']['num_steps']}\n") | |
| f.write(f"Temp : {cfg['inference']['temperature']}\n") | |
| f.write(f"RepPen : {cfg['inference']['repetition_penalty']}\n") | |
| f.write(f"DivPen : {cfg['inference']['diversity_penalty']}\n") | |
| f.write(f"BLEU : {bleu_score:.4f}\n") | |
| f.write(f"BERTScore: {bert_f1:.4f}\n\n") | |
| f.write("=== SAMPLES ===\n") | |
| for i in range(min(20, len(all_preds))): | |
| f.write(f"IN : {all_inputs[i]}\n") | |
| f.write(f"REF : {all_refs[i]}\n") | |
| f.write(f"PRED: {all_preds[i]}\n") | |
| f.write("-" * 60 + "\n") | |
| with open(pred_path, 'w', encoding='utf-8') as f: | |
| for src, ref, pred in zip(all_inputs, all_refs, all_preds): | |
| row = {"input": src, "reference": ref, "prediction": pred} | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| print(f"\n✅ Results → {out_path}") | |
| print(f"🗂️ Saved predictions → {pred_path}") | |
| print(f"📊 BLEU: {bleu_score:.4f} | BERTScore: {bert_f1:.4f}") | |
| return all_preds, all_refs | |
| if __name__ == '__main__': | |
| import argparse | |
| p = argparse.ArgumentParser() | |
| p.add_argument('--mode', choices=['demo', 'eval'], default='demo') | |
| p.add_argument('--samples', type=int, default=500) | |
| p.add_argument('--checkpoint', type=str, default=None) | |
| p.add_argument('--text', type=str, default=None, help='Run one-shot demo input and exit') | |
| args = p.parse_args() | |
| if args.mode == 'demo': | |
| interactive_demo(checkpoint=args.checkpoint, single_text=args.text) | |
| else: | |
| batch_evaluate(args.samples, checkpoint=args.checkpoint) | |