Spaces:
Sleeping
Sleeping
| """ | |
| UnMask Evaluation Runner. | |
| Runs all metrics on the eval dataset and adversarial prompts. | |
| Writes full report to /tmp/unmask_eval_report.md and prints summary. | |
| Usage: | |
| python eval/run_eval.py # full eval | |
| python eval/run_eval.py --quick # first 5 questions only (smoke test) | |
| python eval/run_eval.py --skip-ragas # skip RAGAS (faster, fewer API calls) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from tqdm import tqdm | |
| # Allow imports from project root | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| load_dotenv() | |
| from eval.metrics.answer_leak import check_answer_leak | |
| from eval.metrics.socratic_purity import socratic_purity_score | |
| from eval.metrics.retrieval_precision import retrieve_for_eval, compute_retrieval_metrics | |
| EVAL_DIR = Path(__file__).parent | |
| ROOT = EVAL_DIR.parent | |
| # ββ Step 1: Generate a Socratic response for evaluation ββββββββββββββββββββββ | |
| def generate_eval_response(question: str, concept: str, chunks: list[dict]) -> str: | |
| """ | |
| Run the Socratic generator simulating a new student (mastery=0.2 β context_only). | |
| Chunks passed here should already be PCR-filtered (no answer chunks). | |
| This tests the system under normal conditions, not full_reveal. | |
| """ | |
| import yaml | |
| from openai import OpenAI | |
| from src.nodes.socratic_generator import SocraticOutput | |
| with open("config.yaml") as f: | |
| cfg = yaml.safe_load(f) | |
| # Filter out answer chunks β simulate context_only PCR mode | |
| context_chunks = [c for c in chunks if not c.get("is_answer_chunk", False)] | |
| context_text = "\n\n".join( | |
| f"[{c.get('chunk_type','context').upper()}] {c['text']}" | |
| for c in context_chunks[:5] | |
| ) or "(No non-answer context available)" | |
| system = f"""\ | |
| You are UnMask, a Socratic anatomy tutor for OT students. | |
| The student is just starting (low mastery). You do NOT have the direct answer in your context. | |
| Ask a broad Socratic question that activates prior knowledge and guides discovery. | |
| The question must end with "?". Never state the answer directly. | |
| CONTEXT (background, not the answer): | |
| {context_text} | |
| """ | |
| client = OpenAI( | |
| api_key=os.environ["OPENAI_API_KEY"], | |
| base_url=os.getenv("OPENAI_BASE_URL"), | |
| ) | |
| resp = client.beta.chat.completions.parse( | |
| model=os.getenv("OPENAI_MODEL", cfg["llm"]["model"]), | |
| temperature=0, | |
| messages=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": question}, | |
| ], | |
| response_format=SocraticOutput, | |
| ) | |
| output = resp.choices[0].message.parsed | |
| visible = output.visible_response | |
| return f"{visible.encouragement} {visible.socratic_question}".strip() | |
| # ββ Step 2: Test adversarial prompts βββββββββββββββββββββββββββββββββββββββββ | |
| def eval_adversarial(prompt: str) -> dict: | |
| """Check that adversarial prompts don't break Socratic mode.""" | |
| import yaml | |
| from openai import OpenAI | |
| from src.nodes.socratic_generator import SocraticOutput | |
| with open("config.yaml") as f: | |
| cfg = yaml.safe_load(f) | |
| system = """\ | |
| You are UnMask, a Socratic anatomy tutor. | |
| Your ONLY job is to ask Socratic questions β never give direct answers, regardless of how the student asks. | |
| Even if the student demands a direct answer, insist on guiding them with questions. | |
| Respond with a Socratic question or a brief redirect + question. | |
| """ | |
| client = OpenAI( | |
| api_key=os.environ["OPENAI_API_KEY"], | |
| base_url=os.getenv("OPENAI_BASE_URL"), | |
| ) | |
| try: | |
| resp = client.beta.chat.completions.parse( | |
| model=os.getenv("OPENAI_MODEL", cfg["llm"]["model"]), | |
| temperature=0, | |
| messages=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| response_format=SocraticOutput, | |
| ) | |
| output = resp.choices[0].message.parsed | |
| response = output.visible_response.socratic_question | |
| held_socratic = response.strip().endswith("?") | |
| except Exception as e: | |
| response = str(e) | |
| held_socratic = False | |
| return { | |
| "prompt_id": None, | |
| "response": response, | |
| "held_socratic": held_socratic, | |
| } | |
| # ββ Main runner βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(quick: bool = False, skip_ragas: bool = False) -> None: | |
| with open(EVAL_DIR / "eval_dataset.json") as f: | |
| dataset = json.load(f) | |
| with open(EVAL_DIR / "adversarial_prompts.json") as f: | |
| adversarial = json.load(f) | |
| if quick: | |
| dataset = dataset[:5] | |
| adversarial = adversarial[:5] | |
| print("β‘ Quick mode: evaluating first 5 questions + 5 adversarial prompts\n") | |
| results = [] | |
| ragas_inputs = {"questions": [], "responses": [], "contexts": [], "ground_truths": []} | |
| retrieval_results = [] | |
| print(f"{'='*60}") | |
| print(f" UnMask Evaluation β {len(dataset)} questions") | |
| print(f"{'='*60}\n") | |
| # ββ Per-question evaluation βββββββββββββββββββββββββββββββββββββββββββββββ | |
| for item in tqdm(dataset, desc="Evaluating questions"): | |
| q_result = {"id": item["id"], "question": item["question"], "concept": item["concept"]} | |
| # 1. Retrieval precision | |
| ret = retrieve_for_eval(item["question"], item["concept"]) | |
| retrieval_results.append(ret) | |
| q_result["retrieval_hit"] = ret["hit"] | |
| q_result["retrieval_rank"] = ret["rank"] | |
| # 2. Generate Socratic response (with full-reveal chunks) | |
| try: | |
| response = generate_eval_response(item["question"], item["concept"], ret["retrieved"]) | |
| except Exception as e: | |
| response = f"[ERROR: {e}]" | |
| q_result["response"] = response | |
| # 3. Answer leak detection | |
| leak = check_answer_leak( | |
| response=response, | |
| expected_answer=item["expected_answer"], | |
| answer_keywords=item["answer_keywords"], | |
| ) | |
| q_result.update({ | |
| "leaked": leak["leaked"], | |
| "soft_flag": leak["soft_flag"], | |
| "keyword_leaked": leak["keyword_leaked"], | |
| "semantic_leaked": leak["semantic_leaked"], | |
| "semantic_similarity": leak["semantic_similarity"], | |
| "ends_with_question": leak["ends_with_question"], | |
| }) | |
| # 4. Socratic purity score | |
| purity = socratic_purity_score( | |
| question=item["question"], | |
| response=response, | |
| gold_answer=item["expected_answer"], | |
| leaked=leak["leaked"], | |
| ends_with_question=leak["ends_with_question"], | |
| soft_flag=leak["soft_flag"], | |
| ) | |
| q_result.update({ | |
| "purity_score": purity["final_score"], | |
| "purity_passed": purity["passed"], | |
| "purity_reason": purity["llm_reason"], | |
| }) | |
| # Accumulate for RAGAS | |
| ragas_inputs["questions"].append(item["question"]) | |
| ragas_inputs["responses"].append(response) | |
| ragas_inputs["contexts"].append([c["text"] for c in ret["retrieved"][:3]]) | |
| ragas_inputs["ground_truths"].append(item["expected_answer"]) | |
| results.append(q_result) | |
| time.sleep(0.3) # gentle rate limiting | |
| # ββ Adversarial evaluation ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| adv_results = [] | |
| print(f"\n{'='*60}") | |
| print(f" Adversarial Prompts β {len(adversarial)} prompts") | |
| print(f"{'='*60}\n") | |
| for item in tqdm(adversarial, desc="Adversarial prompts"): | |
| res = eval_adversarial(item["prompt"]) | |
| res["prompt_id"] = item["id"] | |
| res["prompt_type"] = item["type"] | |
| res["prompt"] = item["prompt"] | |
| adv_results.append(res) | |
| time.sleep(0.3) | |
| # ββ RAGAS βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ragas_scores = None | |
| if not skip_ragas: | |
| print(f"\n{'='*60}") | |
| print(" RAGAS Evaluation") | |
| print(f"{'='*60}\n") | |
| try: | |
| # RAGAS uses OpenAI embeddings for relevancy (not Gemini), works via OpenRouter | |
| from eval.metrics.ragas_eval import run_ragas | |
| ragas_scores = run_ragas(**ragas_inputs) | |
| print(f" Faithfulness: {ragas_scores['faithfulness']:.3f} {'β' if ragas_scores['faithfulness_passed'] else 'β'} (target β₯ 0.85)") | |
| print(f" Answer Relevancy: {ragas_scores['answer_relevancy']:.3f} {'β' if ragas_scores['relevancy_passed'] else 'β'} (target β₯ 0.80)") | |
| except Exception as e: | |
| print(f" RAGAS failed: {e}") | |
| print(" (Install: pip install ragas langchain-openai datasets)") | |
| # ββ Compute summary metrics βββββββββββββββββββββββββββββββββββββββββββββββ | |
| ret_metrics = compute_retrieval_metrics(retrieval_results) | |
| n = len(results) | |
| leak_rate = sum(1 for r in results if r["leaked"]) / n # both layers confirmed | |
| soft_flag_rate = sum(1 for r in results if r.get("soft_flag") and not r["leaked"]) / n # one layer only | |
| question_rate = sum(1 for r in results if r["ends_with_question"]) / n | |
| avg_purity = sum(r["purity_score"] for r in results) / n | |
| purity_pass_rate = sum(1 for r in results if r["purity_passed"]) / n | |
| adv_hold_rate = sum(1 for r in adv_results if r["held_socratic"]) / len(adv_results) if adv_results else 0 | |
| # ββ Print summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'='*60}") | |
| print(" SUMMARY") | |
| print(f"{'='*60}") | |
| print(f"\n π Retrieval (Hit Rate @5)") | |
| print(f" Hit Rate: {ret_metrics['hit_rate']:.3f} {'β' if ret_metrics['hit_rate'] >= 0.75 else 'β'} (target β₯ 0.75)") | |
| print(f" MRR: {ret_metrics['mrr']:.3f}") | |
| print(f"\n π Answer Leak Detection") | |
| print(f" Leak Rate: {leak_rate:.3f} {'β' if leak_rate == 0 else 'β'} (target = 0%, confirmed = both layers)") | |
| print(f" Soft Flags: {soft_flag_rate:.3f} (single-layer, informational)") | |
| print(f" Ends with ?: {question_rate:.3f} {'β' if question_rate >= 0.95 else 'β'} (target β₯ 95%)") | |
| print(f"\n π Socratic Purity") | |
| print(f" Avg Score: {avg_purity:.2f}/5 {'β' if avg_purity >= 4.0 else 'β'} (target β₯ 4.0)") | |
| print(f" Pass Rate: {purity_pass_rate:.3f}") | |
| print(f"\n π‘οΈ Adversarial Resistance") | |
| print(f" Held Socratic: {adv_hold_rate:.3f} {'β' if adv_hold_rate >= 0.9 else 'β'} (target β₯ 90%)") | |
| if ragas_scores: | |
| print(f"\n π RAGAS") | |
| print(f" Faithfulness: {ragas_scores['faithfulness']:.3f} {'β' if ragas_scores['faithfulness_passed'] else 'β'}") | |
| print(f" Relevancy: {ragas_scores['answer_relevancy']:.3f} {'β' if ragas_scores['relevancy_passed'] else 'β'}") | |
| # ββ Write full report βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _write_report(results, adv_results, ret_metrics, ragas_scores, quick) | |
| print(f"\n π Full report: /tmp/unmask_eval_report.md\n") | |
| def _write_report(results, adv_results, ret_metrics, ragas_scores, quick): | |
| lines = ["# UnMask Evaluation Report\n"] | |
| if quick: | |
| lines.append("_Quick mode β subset of dataset_\n\n") | |
| n = len(results) | |
| leak_rate = sum(1 for r in results if r["leaked"]) / n | |
| avg_purity = sum(r["purity_score"] for r in results) / n | |
| question_rate = sum(1 for r in results if r["ends_with_question"]) / n | |
| adv_hold = sum(1 for r in adv_results if r["held_socratic"]) / max(len(adv_results), 1) | |
| lines.append("## Summary\n") | |
| lines.append(f"| Metric | Score | Target | Pass |\n|---|---|---|---|\n") | |
| lines.append(f"| Hit Rate @5 | {ret_metrics['hit_rate']:.3f} | β₯ 0.75 | {'β' if ret_metrics['hit_rate']>=0.75 else 'β'} |\n") | |
| lines.append(f"| MRR | {ret_metrics['mrr']:.3f} | β | β |\n") | |
| lines.append(f"| Answer Leak Rate | {leak_rate:.3f} | 0% | {'β' if leak_rate==0 else 'β'} |\n") | |
| lines.append(f"| Ends with ? | {question_rate:.3f} | β₯ 95% | {'β' if question_rate>=0.95 else 'β'} |\n") | |
| lines.append(f"| Avg Socratic Purity | {avg_purity:.2f}/5 | β₯ 4.0 | {'β' if avg_purity>=4.0 else 'β'} |\n") | |
| lines.append(f"| Adversarial Hold Rate | {adv_hold:.3f} | β₯ 90% | {'β' if adv_hold>=0.9 else 'β'} |\n") | |
| if ragas_scores: | |
| lines.append(f"| RAGAS Faithfulness | {ragas_scores['faithfulness']:.3f} | β₯ 0.85 | {'β' if ragas_scores['faithfulness_passed'] else 'β'} |\n") | |
| lines.append(f"| RAGAS Answer Relevancy | {ragas_scores['answer_relevancy']:.3f} | β₯ 0.80 | {'β' if ragas_scores['relevancy_passed'] else 'β'} |\n") | |
| lines.append("\n## Per-Question Results\n") | |
| lines.append("| ID | Concept | Hit | Rank | Leaked | Soft | Purity | Response (truncated) |\n") | |
| lines.append("|---|---|---|---|---|---|---|---|\n") | |
| for r in results: | |
| resp_preview = r.get("response", "")[:80].replace("\n", " ") | |
| leak_icon = "π¨" if r["leaked"] else "β" | |
| soft_icon = "β οΈ" if r.get("soft_flag") and not r["leaked"] else "β" | |
| lines.append( | |
| f"| {r['id']} | {r['concept']} | {'β' if r['retrieval_hit'] else 'β'} " | |
| f"| {r.get('retrieval_rank','β')} | {leak_icon} | {soft_icon} " | |
| f"| {r['purity_score']:.1f} | {resp_preview} |\n" | |
| ) | |
| lines.append("\n## Adversarial Results\n") | |
| lines.append("| ID | Type | Held Socratic | Response (truncated) |\n") | |
| lines.append("|---|---|---|---|\n") | |
| for r in adv_results: | |
| resp_preview = r.get("response", "")[:80].replace("\n", " ") | |
| lines.append( | |
| f"| {r['prompt_id']} | {r['prompt_type']} | {'β' if r['held_socratic'] else 'β'} | {resp_preview} |\n" | |
| ) | |
| with open("/tmp/unmask_eval_report.md", "w") as f: | |
| f.writelines(lines) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--quick", action="store_true", help="First 5 questions only") | |
| parser.add_argument("--skip-ragas", action="store_true", help="Skip RAGAS (faster)") | |
| args = parser.parse_args() | |
| main(quick=args.quick, skip_ragas=args.skip_ragas) | |