#!/usr/bin/env python3 """Comprehensive quality evaluation harness for HYDRA. Computes: PPL, BLEU-1, BLEU-4, ROUGE-1, ROUGE-L, factual accuracy, coherence metrics (distinct-2, repetition-rate, self-BLEU), and a composite quality_score. Usage: python scripts/eval_quality.py # eval latest model python scripts/eval_quality.py --checkpoint ckpt.pt # eval from checkpoint All metrics printed as key=value (grep-friendly). Runs in <30s on RTX 3060. """ from __future__ import annotations import math import os import sys import time from collections import Counter from typing import Optional # Ensure project root is on path _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) import torch import torch.nn.functional as F from hydra.config import ( D_MODEL, D_STATE, DEVICE_BATCH_SIZE, ENGRAM_KEY_DIM, ENGRAM_LAYER_IDX, ENGRAM_N_COLUMNS, EXPAND, HEADDIM, N_HEADS, N_LAYER, PostSemClawConfig, ) from hydra.eval import FACTUAL_EVAL from prepare import MAX_SEQ_LEN, Tokenizer, evaluate_bpb # --------------------------------------------------------------------------- # Eval prompts (hardcoded for reproducibility) # --------------------------------------------------------------------------- EVAL_PROMPTS = [ "The capital of France is", "In 1969, humans first", "Water boils at a temperature of", "The theory of relativity was developed by", "The largest planet in our solar system is", "Photosynthesis is the process by which", "The stock market crashed in", "DNA stands for", "The speed of light is approximately", "Shakespeare wrote the play", "The mitochondria is often called the", "In computer science, an algorithm is", "The chemical symbol for gold is", "The Great Wall of China was built to", "Gravity is a force that", "The human heart pumps blood through", "The Amazon rainforest is located in", "Pi is approximately equal to", "The first President of the United States was", "Oxygen makes up approximately", ] # Reference continuations (approximate, for BLEU/ROUGE) EVAL_REFERENCES = [ "Paris, which is also the largest city in France.", "landed on the Moon during the Apollo 11 mission.", "100 degrees Celsius or 212 degrees Fahrenheit at standard atmospheric pressure.", "Albert Einstein in the early twentieth century.", "Jupiter, which is a gas giant.", "plants convert sunlight into chemical energy and produce oxygen.", "1929, leading to the Great Depression.", "deoxyribonucleic acid, which carries genetic information.", "299,792 kilometers per second in a vacuum.", "Romeo and Juliet, one of the most famous tragedies.", "powerhouse of the cell because it produces energy.", "a step by step procedure for solving a problem.", "Au, from the Latin word aurum.", "protect against invasions from the north.", "attracts objects with mass toward each other.", "the circulatory system to deliver oxygen and nutrients.", "South America, primarily within Brazil.", "3.14159, and it represents the ratio of circumference to diameter.", "George Washington, who served from 1789 to 1797.", "21 percent of the Earth's atmosphere.", ] COHERENCE_PROMPTS = [ "The history of science shows that", "In modern society, technology has", "The relationship between education and", "Climate change is affecting the world because", "The development of artificial intelligence has led to", "Throughout human history, art has been", "The economy of a nation depends on", "Medical research has shown that", "The role of government in society is", "The ocean covers more than", ] # --------------------------------------------------------------------------- # Manual BLEU implementation (no nltk dependency) # --------------------------------------------------------------------------- def _get_ngrams(tokens: list[str], n: int) -> Counter: """Extract n-gram counts from token list.""" return Counter(tuple(tokens[i:i + n]) for i in range(len(tokens) - n + 1)) def _modified_precision(reference_tokens: list[str], hypothesis_tokens: list[str], n: int) -> tuple[int, int]: """Compute modified precision for n-grams.""" ref_ngrams = _get_ngrams(reference_tokens, n) hyp_ngrams = _get_ngrams(hypothesis_tokens, n) clipped_count = 0 total_count = 0 for ngram, count in hyp_ngrams.items(): clipped_count += min(count, ref_ngrams.get(ngram, 0)) total_count += count return clipped_count, max(total_count, 1) def compute_bleu(references: list[list[str]], hypotheses: list[list[str]], max_n: int = 4) -> dict[str, float]: """Corpus-level BLEU-1 through BLEU-max_n. Uses brevity penalty and geometric mean of modified precisions. """ precisions = [] for n in range(1, max_n + 1): total_clip = 0 total_count = 0 for ref, hyp in zip(references, hypotheses): clip, count = _modified_precision(ref, hyp, n) total_clip += clip total_count += count precisions.append(total_clip / max(total_count, 1)) # Brevity penalty ref_len = sum(len(r) for r in references) hyp_len = sum(len(h) for h in hypotheses) if hyp_len == 0: return {f"bleu{n}": 0.0 for n in range(1, max_n + 1)} bp = math.exp(min(0, 1 - ref_len / hyp_len)) result = {} for n in range(1, max_n + 1): # Geometric mean of precisions 1..n log_avg = sum(math.log(max(p, 1e-10)) for p in precisions[:n]) / n result[f"bleu{n}"] = bp * math.exp(log_avg) return result # --------------------------------------------------------------------------- # Manual ROUGE implementation (no rouge_score dependency) # --------------------------------------------------------------------------- def _lcs_length(x: list[str], y: list[str]) -> int: """Longest common subsequence length via DP.""" m, n = len(x), len(y) if m == 0 or n == 0: return 0 # Space-optimized: only keep current and previous row prev = [0] * (n + 1) curr = [0] * (n + 1) for i in range(1, m + 1): for j in range(1, n + 1): if x[i - 1] == y[j - 1]: curr[j] = prev[j - 1] + 1 else: curr[j] = max(prev[j], curr[j - 1]) prev, curr = curr, [0] * (n + 1) return prev[n] def compute_rouge(references: list[list[str]], hypotheses: list[list[str]]) -> dict[str, float]: """Compute ROUGE-1 (unigram F1) and ROUGE-L (LCS-based F1).""" rouge1_scores = [] rougel_scores = [] for ref, hyp in zip(references, hypotheses): if not ref or not hyp: rouge1_scores.append(0.0) rougel_scores.append(0.0) continue # ROUGE-1: unigram overlap ref_unigrams = Counter(ref) hyp_unigrams = Counter(hyp) overlap = sum((ref_unigrams & hyp_unigrams).values()) r1_precision = overlap / max(len(hyp), 1) r1_recall = overlap / max(len(ref), 1) r1_f1 = 2 * r1_precision * r1_recall / max(r1_precision + r1_recall, 1e-10) rouge1_scores.append(r1_f1) # ROUGE-L: LCS-based lcs = _lcs_length(ref, hyp) rl_precision = lcs / max(len(hyp), 1) rl_recall = lcs / max(len(ref), 1) rl_f1 = 2 * rl_precision * rl_recall / max(rl_precision + rl_recall, 1e-10) rougel_scores.append(rl_f1) return { "rouge1": sum(rouge1_scores) / max(len(rouge1_scores), 1), "rouge_l": sum(rougel_scores) / max(len(rougel_scores), 1), } # --------------------------------------------------------------------------- # Greedy generation # --------------------------------------------------------------------------- @torch.no_grad() def greedy_generate(model, tokenizer, prompt: str, max_new_tokens: int = 32, device: str = "cuda") -> str: """Greedy (argmax) autoregressive generation. Deterministic.""" ids = tokenizer.encode(prompt) x = torch.tensor([ids], device=device, dtype=torch.long) for _ in range(max_new_tokens): logits = model(x, targets=None) if logits.dim() == 3: next_logits = logits[0, -1, :] else: next_logits = logits[0] next_id = next_logits.argmax().unsqueeze(0).unsqueeze(0) x = torch.cat([x, next_id], dim=1) if x.size(1) >= MAX_SEQ_LEN: break all_ids = x[0].tolist() return tokenizer.decode(all_ids[len(ids):]) # --------------------------------------------------------------------------- # Coherence metrics # --------------------------------------------------------------------------- def compute_coherence(generations: list[str]) -> dict[str, float]: """Compute distinct-2, repetition rate, and self-BLEU across generations.""" all_bigrams = [] all_fourgrams = [] tokenized_gens = [] for gen in generations: tokens = gen.lower().split() tokenized_gens.append(tokens) bigrams = [tuple(tokens[i:i + 2]) for i in range(len(tokens) - 1)] fourgrams = [tuple(tokens[i:i + 4]) for i in range(len(tokens) - 3)] all_bigrams.extend(bigrams) all_fourgrams.extend(fourgrams) # Distinct-2: fraction of unique bigrams distinct2 = len(set(all_bigrams)) / max(len(all_bigrams), 1) # Repetition rate: fraction of 4-grams that appear more than once fourgram_counts = Counter(all_fourgrams) repeated = sum(1 for c in fourgram_counts.values() if c > 1) repetition_rate = repeated / max(len(fourgram_counts), 1) # Self-BLEU: average BLEU of each generation against all others # Lower = more diverse self_bleu_scores = [] for i, hyp in enumerate(tokenized_gens): if not hyp: continue others = [g for j, g in enumerate(tokenized_gens) if j != i and g] if not others: continue # Average BLEU against each other generation pair_scores = [] for ref in others: result = compute_bleu([ref], [hyp], max_n=4) pair_scores.append(result.get("bleu4", 0.0)) self_bleu_scores.append(sum(pair_scores) / len(pair_scores)) self_bleu = sum(self_bleu_scores) / max(len(self_bleu_scores), 1) return { "distinct2": distinct2, "repetition_rate": repetition_rate, "self_bleu": self_bleu, } # --------------------------------------------------------------------------- # Factual accuracy (reuse existing probes) # --------------------------------------------------------------------------- def compute_factual(model, tokenizer, device: str = "cuda") -> float: """Run factual eval probes, return accuracy [0,1].""" model.eval() hits = 0 with torch.no_grad(), torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): for prompt, answers in FACTUAL_EVAL: ids = tokenizer.encode(prompt) x = torch.tensor([ids], device=device, dtype=torch.long) logits = model(x, targets=None) if logits.dim() == 3: last_logits = logits[0, -1, :] else: last_logits = logits[0] probs = torch.softmax(last_logits.float(), dim=-1) top_k = min(20, probs.shape[-1]) top_ids = torch.topk(probs, top_k).indices.tolist() top_tokens = [tokenizer.decode([tid]).strip().lower() for tid in top_ids] answers_lower = [a.lower() for a in answers] if any(any(a in tok for a in answers_lower) for tok in top_tokens): hits += 1 return hits / max(len(FACTUAL_EVAL), 1) # --------------------------------------------------------------------------- # PPL (perplexity) via existing evaluate_bpb # --------------------------------------------------------------------------- def compute_ppl(model, tokenizer, batch_size: int = 8) -> tuple[float, float]: """Compute BPB and PPL. Returns (bpb, ppl).""" import prepare as _prepare_mod # Use smaller eval set for speed (<30s budget) orig_eval = _prepare_mod.EVAL_TOKENS _prepare_mod.EVAL_TOKENS = 2 * 524288 # ~1M tokens, fast try: with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): bpb = evaluate_bpb(model, tokenizer, batch_size) finally: _prepare_mod.EVAL_TOKENS = orig_eval ppl = 2 ** bpb return bpb, ppl # --------------------------------------------------------------------------- # Composite quality score # --------------------------------------------------------------------------- def compute_quality_score(ppl: float, bleu4: float, rouge_l: float, factual: float, repetition_rate: float) -> float: """Single composite metric for autoresearch optimization. Formula rationale: - PPL (30%): Primary language modeling metric, capped at 100 - BLEU-4 (20%): Generation quality vs references - ROUGE-L (20%): Recall of reference content - Factual (15%): Knowledge memorization - 1-repetition (15%): Diversity/coherence """ return ( 0.3 * (1 - min(ppl, 100) / 100) + 0.2 * bleu4 + 0.2 * rouge_l + 0.15 * factual + 0.15 * (1 - repetition_rate) ) # --------------------------------------------------------------------------- # Main evaluation entry point # --------------------------------------------------------------------------- def run_quality_eval( model: torch.nn.Module, tokenizer, device: str = "cuda", batch_size: int = 8, verbose: bool = True, ) -> dict[str, float]: """Run full quality evaluation suite. Returns dict of all metrics.""" model.eval() results: dict[str, float] = {} t0 = time.time() # 1. PPL / BPB if verbose: print("[eval] Computing PPL/BPB...", flush=True) bpb, ppl = compute_ppl(model, tokenizer, batch_size) results["bpb"] = bpb results["ppl"] = ppl # 2. Generate continuations for BLEU/ROUGE if verbose: print("[eval] Generating continuations (20 prompts, greedy)...", flush=True) hypotheses_text = [] with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): for prompt in EVAL_PROMPTS: gen = greedy_generate(model, tokenizer, prompt, max_new_tokens=32, device=device) hypotheses_text.append(gen) # Tokenize for BLEU/ROUGE (simple whitespace split) ref_tokens = [ref.lower().split() for ref in EVAL_REFERENCES] hyp_tokens = [hyp.lower().split() for hyp in hypotheses_text] # 3. BLEU if verbose: print("[eval] Computing BLEU...", flush=True) bleu = compute_bleu(ref_tokens, hyp_tokens, max_n=4) results["bleu1"] = bleu["bleu1"] results["bleu4"] = bleu["bleu4"] # 4. ROUGE if verbose: print("[eval] Computing ROUGE...", flush=True) rouge = compute_rouge(ref_tokens, hyp_tokens) results["rouge1"] = rouge["rouge1"] results["rouge_l"] = rouge["rouge_l"] # 5. Factual accuracy if verbose: print("[eval] Computing factual accuracy...", flush=True) with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): factual = compute_factual(model, tokenizer, device) results["factual"] = factual # 6. Coherence if verbose: print("[eval] Generating coherence passages (10 prompts, 64 tokens)...", flush=True) coherence_gens = [] with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): for prompt in COHERENCE_PROMPTS: gen = greedy_generate(model, tokenizer, prompt, max_new_tokens=64, device=device) coherence_gens.append(gen) coherence = compute_coherence(coherence_gens) results["distinct2"] = coherence["distinct2"] results["repetition_rate"] = coherence["repetition_rate"] results["self_bleu"] = coherence["self_bleu"] # 7. Composite score results["quality_score"] = compute_quality_score( ppl=results["ppl"], bleu4=results["bleu4"], rouge_l=results["rouge_l"], factual=results["factual"], repetition_rate=results["repetition_rate"], ) elapsed = time.time() - t0 results["eval_time_s"] = elapsed # Print all metrics if verbose: print("\n--- Quality Evaluation Results ---") for k, v in sorted(results.items()): print(f"{k}={v:.6f}") print("--- End Quality Evaluation ---\n") # Print sample generations print("--- Sample Generations ---") for i, (prompt, gen) in enumerate(zip(EVAL_PROMPTS[:5], hypotheses_text[:5])): print(f' [{i}] "{prompt}" -> "{gen.strip()[:80]}"') print("--- End Sample Generations ---\n") print("--- Coherence Samples ---") for i, (prompt, gen) in enumerate(zip(COHERENCE_PROMPTS[:3], coherence_gens[:3])): print(f' [{i}] "{prompt}" -> "{gen.strip()[:100]}"') print("--- End Coherence Samples ---\n") return results # --------------------------------------------------------------------------- # Standalone CLI # --------------------------------------------------------------------------- def _build_model_and_tokenizer(checkpoint: Optional[str] = None): """Build model + tokenizer, optionally loading from checkpoint.""" from hydra.model import PostSemClawModel device = torch.device("cuda") tokenizer = Tokenizer.from_directory() vocab_size = tokenizer.get_vocab_size() config = PostSemClawConfig( sequence_len=MAX_SEQ_LEN, vocab_size=vocab_size, n_layer=N_LAYER, d_model=D_MODEL, d_state=D_STATE, headdim=HEADDIM, n_heads=N_HEADS, expand=EXPAND, engram_n_columns=ENGRAM_N_COLUMNS, engram_key_dim=ENGRAM_KEY_DIM, engram_layer_idx=ENGRAM_LAYER_IDX, ) with torch.device("meta"): model = PostSemClawModel(config) model.to_empty(device=device) if checkpoint and os.path.exists(checkpoint): print(f"[eval] Loading checkpoint: {checkpoint}") state = torch.load(checkpoint, map_location=device, weights_only=True) model.load_state_dict(state, strict=False) else: print("[eval] No checkpoint — using freshly initialized weights") model.init_weights() model.eval() return model, tokenizer, device def main(): import argparse parser = argparse.ArgumentParser(description="HYDRA quality evaluation") parser.add_argument("--checkpoint", type=str, default=None, help="Path to model checkpoint") parser.add_argument("--batch-size", type=int, default=DEVICE_BATCH_SIZE, help="Batch size for PPL eval") args = parser.parse_args() model, tokenizer, device = _build_model_and_tokenizer(args.checkpoint) results = run_quality_eval(model, tokenizer, str(device), args.batch_size, verbose=True) # Final summary line (grep-friendly) print(f"QUALITY_SCORE={results['quality_score']:.6f} PPL={results['ppl']:.3f} " f"BPB={results['bpb']:.4f} BLEU4={results['bleu4']:.4f} " f"ROUGE_L={results['rouge_l']:.4f} FACTUAL={results['factual']:.4f} " f"REP_RATE={results['repetition_rate']:.4f}") if __name__ == "__main__": main()