#!/usr/bin/env python3 """ test_statistical_e2e.py — Statistically rigorous JIT LoRA training evaluation. Dynamically loads real-world facts (post model training cutoff), pre-tests each against the model to confirm it's truly unknown, trains via LoRA, then evaluates with proper statistical analysis across multiple independent trials. Usage: # Ensure daemon is running with model activated python3 test_statistical_e2e.py # Custom options python3 test_statistical_e2e.py --facts-file raw_facts_2026.txt --trials 3 --max-facts 80 Data source: facts are loaded from a file generated by web search (not hardcoded). The file format is: CATEGORY: Q: A: KEYWORDS: """ import argparse import json import math import os import random import re import statistics import sys import time from dataclasses import dataclass, field from pathlib import Path from typing import Optional import requests # ─── Configuration ─────────────────────────────────────────────────────────── DAEMON_URL = "http://localhost:8766" DEFAULT_FACTS_FILE = os.path.join(os.path.dirname(__file__), "raw_facts_2026.txt") MAX_TOKENS_PRETEST = 80 # Short response — just need to check if it knows MAX_TOKENS_POSTTEST = 100 # Enough for a factual answer TRAIN_EPOCHS = 15 REGULARIZATION_RATIO = 0.33 # ≥33% real-world data to prevent catastrophic forgetting # ─── Data Structures ──────────────────────────────────────────────────────── @dataclass class Fact: category: str question: str answer: str keywords: list # minimum keywords to verify correctness pretest_response: str = "" pretest_known: bool = False # True if model already knows this fact def to_training_pair(self): return {"user": self.question, "assistant": self.answer} @dataclass class TrialResult: trial_id: int n_facts_tested: int n_confirmed_unknown: int n_training_pairs: int n_regularization_pairs: int training_steps: int training_time_s: float initial_loss: float final_loss: float # Post-training scores (count correct) recall_correct: int recall_total: int general_correct: int general_total: int # Per-category breakdown category_scores: dict = field(default_factory=dict) # ─── Fact Loading ──────────────────────────────────────────────────────────── def load_facts_from_file(filepath: str) -> list: """Parse the raw facts file into Fact objects.""" facts = [] current = {} with open(filepath) as f: for line in f: line = line.strip() if not line or line.startswith("#") or line.startswith("="): continue if line.startswith("CATEGORY:"): if current.get("question"): facts.append(Fact( category=current.get("category", "Unknown"), question=current["question"], answer=current.get("answer", ""), keywords=[k.strip().lower() for k in current.get("keywords", "").split(",") if k.strip()], )) current = {"category": line.split(":", 1)[1].strip()} elif line.startswith("Q:"): # If we have a pending fact, save it first if current.get("question"): facts.append(Fact( category=current.get("category", "Unknown"), question=current["question"], answer=current.get("answer", ""), keywords=[k.strip().lower() for k in current.get("keywords", "").split(",") if k.strip()], )) cat = current.get("category", "Unknown") current = {"category": cat} current["question"] = line[2:].strip() elif line.startswith("A:"): current["answer"] = line[2:].strip() elif line.startswith("KEYWORDS:"): current["keywords"] = line[9:].strip() # Don't forget the last fact if current.get("question"): facts.append(Fact( category=current.get("category", "Unknown"), question=current["question"], answer=current.get("answer", ""), keywords=[k.strip().lower() for k in current.get("keywords", "").split(",") if k.strip()], )) return facts # ─── General Knowledge Test Set ────────────────────────────────────────────── GENERAL_KNOWLEDGE = [ {"question": "What is the capital of France?", "keywords": ["paris"]}, {"question": "Who wrote Romeo and Juliet?", "keywords": ["shakespeare"]}, {"question": "What is the chemical symbol for water?", "keywords": ["h2o"]}, {"question": "What planet is closest to the Sun?", "keywords": ["mercury"]}, {"question": "What year did World War II end?", "keywords": ["1945"]}, {"question": "What is the speed of light in km/s approximately?", "keywords": ["299", "km"]}, {"question": "Who painted the Mona Lisa?", "keywords": ["vinci", "leonardo"]}, {"question": "What is the largest ocean on Earth?", "keywords": ["pacific"]}, {"question": "What gas do plants absorb from the atmosphere?", "keywords": ["co2", "carbon dioxide"]}, {"question": "What is the square root of 144?", "keywords": ["12"]}, {"question": "Who developed the theory of general relativity?", "keywords": ["einstein"]}, {"question": "What is the capital of Japan?", "keywords": ["tokyo"]}, {"question": "How many chromosomes do humans have?", "keywords": ["46", "23 pairs"]}, {"question": "What element has the atomic number 1?", "keywords": ["hydrogen"]}, {"question": "Who was the first person to walk on the Moon?", "keywords": ["armstrong"]}, {"question": "What is the boiling point of water in Celsius?", "keywords": ["100"]}, {"question": "What is the capital of Australia?", "keywords": ["canberra"]}, {"question": "What year was the United Nations founded?", "keywords": ["1945"]}, {"question": "What is the chemical formula for table salt?", "keywords": ["nacl"]}, {"question": "Who wrote 1984?", "keywords": ["orwell"]}, ] # ─── Regularization pairs (real-world Q&A to prevent forgetting) ───────────── REGULARIZATION_PAIRS = [ {"user": "What is the capital of France?", "assistant": "The capital of France is Paris."}, {"user": "Who wrote Romeo and Juliet?", "assistant": "William Shakespeare wrote Romeo and Juliet."}, {"user": "What is the chemical symbol for water?", "assistant": "The chemical symbol for water is H2O."}, {"user": "What planet is closest to the Sun?", "assistant": "Mercury is the closest planet to the Sun."}, {"user": "What year did World War II end?", "assistant": "World War II ended in 1945."}, {"user": "Who painted the Mona Lisa?", "assistant": "Leonardo da Vinci painted the Mona Lisa."}, {"user": "What is the largest ocean on Earth?", "assistant": "The Pacific Ocean is the largest ocean on Earth."}, {"user": "What gas do plants absorb from the atmosphere?", "assistant": "Plants absorb carbon dioxide (CO2) from the atmosphere."}, {"user": "What is the square root of 144?", "assistant": "The square root of 144 is 12."}, {"user": "Who developed the theory of general relativity?", "assistant": "Albert Einstein developed the theory of general relativity."}, {"user": "What is the capital of Japan?", "assistant": "The capital of Japan is Tokyo."}, {"user": "How many chromosomes do humans have?", "assistant": "Humans have 46 chromosomes, or 23 pairs."}, {"user": "What element has the atomic number 1?", "assistant": "Hydrogen has the atomic number 1."}, {"user": "Who was the first person to walk on the Moon?", "assistant": "Neil Armstrong was the first person to walk on the Moon in 1969."}, {"user": "What is the boiling point of water in Celsius?", "assistant": "The boiling point of water is 100 degrees Celsius."}, {"user": "What is the capital of Australia?", "assistant": "The capital of Australia is Canberra."}, {"user": "What year was the United Nations founded?", "assistant": "The United Nations was founded in 1945."}, {"user": "What is the chemical formula for table salt?", "assistant": "The chemical formula for table salt is NaCl (sodium chloride)."}, {"user": "Who wrote the novel 1984?", "assistant": "George Orwell wrote the novel 1984."}, {"user": "What is the tallest mountain in the world?", "assistant": "Mount Everest is the tallest mountain in the world at 8,849 meters."}, ] # ─── Daemon API ────────────────────────────────────────────────────────────── def daemon_status(): r = requests.get(f"{DAEMON_URL}/status", timeout=10) r.raise_for_status() return r.json() def daemon_reset(retries=3): """Reset adapter and data buffers for a clean trial.""" for attempt in range(retries): try: r = requests.post(f"{DAEMON_URL}/reset", json={"clear_data": True}, timeout=60) r.raise_for_status() return r.json() except Exception as e: if attempt < retries - 1: print(f" Reset attempt {attempt+1} failed: {e}, retrying in 5s...") time.sleep(5) else: raise def daemon_query(question: str, max_tokens: int = 100) -> str: """Query the model and collect the full response.""" try: r = requests.post( f"{DAEMON_URL}/chat", json={"messages": [{"role": "user", "content": question}], "max_tokens": max_tokens, "stream": True}, stream=True, timeout=180, ) r.raise_for_status() except Exception as e: print(f" [Query error: {e}]") return "" text = "" try: for line in r.iter_lines(decode_unicode=True): if not line or not line.startswith("data: "): continue payload = line[6:].strip() if payload == "[DONE]": break try: obj = json.loads(payload) delta = obj.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") # Filter out special tokens if content and not content.startswith("<|"): text += content except json.JSONDecodeError: continue except Exception as e: print(f" [Stream error: {e}, got so far: {text[:50]}]") return text.strip() def daemon_inject_and_train(training_pairs: list, epochs: int = TRAIN_EPOCHS) -> dict: """Inject training data and run epoch-based training. Converts {"user": ..., "assistant": ...} pairs to the daemon's expected format: [{"role": "user", "content": ...}, {"role": "assistant", "content": ...}] The /train endpoint is async — it starts training in background and returns immediately. We poll /status until training completes. """ # Convert pair format to message format messages = [] for pair in training_pairs: messages.append([ {"role": "user", "content": pair["user"]}, {"role": "assistant", "content": pair["assistant"]}, ]) r = requests.post( f"{DAEMON_URL}/train", json={"messages": messages, "epochs": epochs}, timeout=30, ) r.raise_for_status() start_response = r.json() print(f" Train started: injected={start_response.get('injected', 0)}, epochs={start_response.get('epochs', 0)}") # Poll until training completes poll_interval = 2 max_wait = 600 # 10 minutes max elapsed = 0 last_steps = 0 result = {"steps": 0, "final_loss": 0, "initial_loss": 0, "epochs_completed": 0, "early_stopped": False} while elapsed < max_wait: time.sleep(poll_interval) elapsed += poll_interval try: status = daemon_status() current_steps = status.get("total_steps", 0) current_loss = status.get("last_loss", 0) if current_steps != last_steps: last_steps = current_steps if not status.get("training", False): # Training finished result["steps"] = status.get("total_steps", 0) result["final_loss"] = status.get("last_loss", 0) result["initial_loss"] = result.get("initial_loss", current_loss) break # Update initial loss from first poll if result["initial_loss"] == 0 and current_loss > 0: result["initial_loss"] = current_loss if elapsed % 30 == 0: print(f" ... training: step={current_steps}, loss={current_loss:.4f}") except Exception as e: print(f" [Poll error: {e}]") return result def daemon_set_auto_train(enabled: bool): """Enable/disable auto_train on the daemon.""" try: r = requests.put( f"{DAEMON_URL}/config", json={"auto_train": enabled}, timeout=10, ) r.raise_for_status() except Exception as e: print(f" [Warning: could not set auto_train={enabled}: {e}]") # ─── Evaluation Logic ──────────────────────────────────────────────────────── def normalize_unicode(text: str) -> str: """Normalize Unicode subscripts/superscripts to ASCII equivalents.""" import unicodedata # Common subscript/superscript replacements replacements = { '₂': '2', '₃': '3', '₄': '4', '₅': '5', '₆': '6', '₀': '0', '₁': '1', '₇': '7', '₈': '8', '₉': '9', '²': '2', '³': '3', '⁴': '4', '⁵': '5', '⁶': '6', '⁰': '0', '¹': '1', '⁷': '7', '⁸': '8', '⁹': '9', } for old, new in replacements.items(): text = text.replace(old, new) return text def check_keywords(response: str, keywords: list, min_matches: int = 2) -> bool: """Check if response contains enough of the expected keywords. Requires at least `min_matches` keywords to match to avoid false positives from base models that hallucinate topic-relevant but factually wrong responses. For short keyword lists (<=2), requires all to match. """ if not keywords: return False response_lower = normalize_unicode(response.lower()) matches = sum(1 for kw in keywords if kw in response_lower) required = min(min_matches, len(keywords)) # Don't require more than we have return matches >= required def pretest_facts(facts: list) -> tuple: """Pre-test all facts against the model. Return (unknown, known) split.""" unknown = [] known = [] print(f"\n Pre-testing {len(facts)} facts against model...") for i, fact in enumerate(facts): response = daemon_query(fact.question, max_tokens=MAX_TOKENS_PRETEST) fact.pretest_response = response fact.pretest_known = check_keywords(response, fact.keywords) status = "KNOWN" if fact.pretest_known else "unknown" if (i + 1) % 10 == 0 or fact.pretest_known: print(f" [{i+1}/{len(facts)}] {status}: {fact.question[:60]}...") if fact.pretest_known: known.append(fact) else: unknown.append(fact) print(f" Pre-test complete: {len(unknown)} unknown, {len(known)} already known") return unknown, known def evaluate_recall(facts: list) -> list: """Post-training: test recall of each fact. Returns list of (fact, correct, response).""" results = [] for i, fact in enumerate(facts): response = daemon_query(fact.question, max_tokens=MAX_TOKENS_POSTTEST) correct = check_keywords(response, fact.keywords) results.append((fact, correct, response)) if (i + 1) % 10 == 0: print(f" [{i+1}/{len(facts)}] recall testing...") return results def evaluate_general_knowledge() -> list: """Test general knowledge preservation.""" results = [] for item in GENERAL_KNOWLEDGE: response = daemon_query(item["question"], max_tokens=100) correct = check_keywords(response, item["keywords"]) results.append((item, correct, response)) return results # ─── Statistics ────────────────────────────────────────────────────────────── def clopper_pearson(k: int, n: int, alpha: float = 0.05) -> tuple: """Clopper-Pearson exact binomial confidence interval.""" if n == 0: return (0.0, 0.0) from scipy import stats as scipy_stats lo = scipy_stats.beta.ppf(alpha / 2, k, n - k + 1) if k > 0 else 0.0 hi = scipy_stats.beta.ppf(1 - alpha / 2, k + 1, n - k) if k < n else 1.0 return (lo, hi) def wilson_interval(k: int, n: int, z: float = 1.96) -> tuple: """Wilson score confidence interval (no scipy needed).""" if n == 0: return (0.0, 0.0) p_hat = k / n denom = 1 + z**2 / n center = (p_hat + z**2 / (2 * n)) / denom margin = z * math.sqrt((p_hat * (1 - p_hat) + z**2 / (4 * n)) / n) / denom return (max(0.0, center - margin), min(1.0, center + margin)) # ─── Single Trial ──────────────────────────────────────────────────────────── def run_trial(facts: list, trial_id: int, epochs: int = TRAIN_EPOCHS) -> TrialResult: """Run a single trial: reset → pre-test → train → evaluate.""" print(f"\n{'='*70}") print(f" TRIAL {trial_id}") print(f"{'='*70}") # 1. Reset adapter for clean slate print(" Resetting adapter and data buffers...") daemon_reset() time.sleep(2) # 2. Pre-test: confirm model doesn't know these facts unknown_facts, known_facts = pretest_facts(facts) if len(unknown_facts) < 10: print(f" WARNING: Only {len(unknown_facts)} unknown facts — insufficient for evaluation") # Still proceed but flag it # 3. Generate training pairs from unknown facts novel_pairs = [f.to_training_pair() for f in unknown_facts] # 4. Calculate regularization needed for ≥33% ratio n_reg_needed = max(1, int(len(novel_pairs) * REGULARIZATION_RATIO / (1 - REGULARIZATION_RATIO))) n_reg_used = min(n_reg_needed, len(REGULARIZATION_PAIRS)) reg_pairs = REGULARIZATION_PAIRS[:n_reg_used] all_pairs = novel_pairs + reg_pairs random.shuffle(all_pairs) print(f" Training data: {len(novel_pairs)} novel + {n_reg_used} regularization = {len(all_pairs)} total") print(f" Regularization ratio: {n_reg_used / len(all_pairs) * 100:.1f}%") # 5. Train print(f" Training ({epochs} epochs max, early stopping enabled)...") t0 = time.time() train_result = daemon_inject_and_train(all_pairs, epochs=epochs) train_time = time.time() - t0 print(f" Training complete: {train_time:.1f}s") print(f" {json.dumps({k: train_result.get(k) for k in ['steps', 'final_loss', 'initial_loss', 'epochs_completed', 'early_stopped']}, default=str)}") time.sleep(2) # Let model settle # 6. Post-test: recall of unknown facts print(f"\n Evaluating recall ({len(unknown_facts)} facts)...") recall_results = evaluate_recall(unknown_facts) recall_correct = sum(1 for _, c, _ in recall_results if c) # 7. General knowledge preservation print(f" Evaluating general knowledge ({len(GENERAL_KNOWLEDGE)} questions)...") gen_results = evaluate_general_knowledge() gen_correct = sum(1 for _, c, _ in gen_results if c) # 8. Per-category breakdown category_scores = {} for fact, correct, _ in recall_results: cat = fact.category if cat not in category_scores: category_scores[cat] = {"correct": 0, "total": 0} category_scores[cat]["total"] += 1 if correct: category_scores[cat]["correct"] += 1 result = TrialResult( trial_id=trial_id, n_facts_tested=len(facts), n_confirmed_unknown=len(unknown_facts), n_training_pairs=len(all_pairs), n_regularization_pairs=n_reg_used, training_steps=train_result.get("steps", 0), training_time_s=train_time, initial_loss=train_result.get("initial_loss", 0), final_loss=train_result.get("final_loss", 0), recall_correct=recall_correct, recall_total=len(unknown_facts), general_correct=gen_correct, general_total=len(GENERAL_KNOWLEDGE), category_scores=category_scores, ) # Print trial summary print(f"\n Trial {trial_id} Results:") print(f" Recall: {recall_correct}/{len(unknown_facts)} ({recall_correct/max(1,len(unknown_facts))*100:.1f}%)") print(f" General Knowledge: {gen_correct}/{len(GENERAL_KNOWLEDGE)} ({gen_correct/len(GENERAL_KNOWLEDGE)*100:.1f}%)") print(f" Training: {result.training_steps} steps, {train_time:.1f}s, loss {result.initial_loss:.3f} → {result.final_loss:.3f}") # Print failures for debugging failures = [(f, r) for f, c, r in recall_results if not c] if failures: print(f"\n Failed recalls ({len(failures)}):") for fact, resp in failures[:10]: print(f" Q: {fact.question[:70]}") print(f" Expected keywords: {fact.keywords}") print(f" Got: {resp[:100]}") print() gen_failures = [(item, r) for item, c, r in gen_results if not c] if gen_failures: print(f" General knowledge failures ({len(gen_failures)}):") for item, resp in gen_failures: print(f" Q: {item['question']}") print(f" Expected: {item['keywords']}") print(f" Got: {resp[:100]}") return result def run_trial_prefiltered(unknown_facts: list, trial_id: int, epochs: int = TRAIN_EPOCHS) -> TrialResult: """Run a trial with pre-filtered facts (already confirmed unknown). Skips pre-testing.""" print(f"\n{'='*70}") print(f" TRIAL {trial_id}") print(f"{'='*70}") # 1. Reset adapter for clean slate print(" Resetting adapter and data buffers...") daemon_reset() time.sleep(2) # 2. Generate training pairs from unknown facts novel_pairs = [f.to_training_pair() for f in unknown_facts] # 3. Calculate regularization needed for ≥33% ratio n_reg_needed = max(1, int(len(novel_pairs) * REGULARIZATION_RATIO / (1 - REGULARIZATION_RATIO))) n_reg_used = min(n_reg_needed, len(REGULARIZATION_PAIRS)) reg_pairs = REGULARIZATION_PAIRS[:n_reg_used] all_pairs = novel_pairs + reg_pairs random.shuffle(all_pairs) print(f" Training data: {len(novel_pairs)} novel + {n_reg_used} regularization = {len(all_pairs)} total") print(f" Regularization ratio: {n_reg_used / len(all_pairs) * 100:.1f}%") # 4. Train (auto_train stays off — we train explicitly via /train) print(f" Training ({epochs} epochs max, early stopping enabled)...") t0 = time.time() train_result = daemon_inject_and_train(all_pairs, epochs=epochs) train_time = time.time() - t0 print(f" Training complete: {train_time:.1f}s") print(f" {json.dumps({k: train_result.get(k) for k in ['steps', 'final_loss', 'initial_loss', 'epochs_completed', 'early_stopped']}, default=str)}") time.sleep(2) # Let model settle # 5. Post-test: recall of unknown facts (auto_train disabled to avoid contamination) daemon_set_auto_train(False) print(f"\n Evaluating recall ({len(unknown_facts)} facts)...") recall_results = evaluate_recall(unknown_facts) recall_correct = sum(1 for _, c, _ in recall_results if c) # 6. General knowledge preservation print(f" Evaluating general knowledge ({len(GENERAL_KNOWLEDGE)} questions)...") gen_results = evaluate_general_knowledge() gen_correct = sum(1 for _, c, _ in gen_results if c) # 7. Per-category breakdown category_scores = {} for fact, correct, _ in recall_results: cat = fact.category if cat not in category_scores: category_scores[cat] = {"correct": 0, "total": 0} category_scores[cat]["total"] += 1 if correct: category_scores[cat]["correct"] += 1 result = TrialResult( trial_id=trial_id, n_facts_tested=len(unknown_facts), n_confirmed_unknown=len(unknown_facts), n_training_pairs=len(all_pairs), n_regularization_pairs=n_reg_used, training_steps=train_result.get("steps", 0), training_time_s=train_time, initial_loss=train_result.get("initial_loss", 0), final_loss=train_result.get("final_loss", 0), recall_correct=recall_correct, recall_total=len(unknown_facts), general_correct=gen_correct, general_total=len(GENERAL_KNOWLEDGE), category_scores=category_scores, ) # Print trial summary print(f"\n Trial {trial_id} Results:") print(f" Recall: {recall_correct}/{len(unknown_facts)} ({recall_correct/max(1,len(unknown_facts))*100:.1f}%)") print(f" General Knowledge: {gen_correct}/{len(GENERAL_KNOWLEDGE)} ({gen_correct/len(GENERAL_KNOWLEDGE)*100:.1f}%)") print(f" Training: {result.training_steps} steps, {train_time:.1f}s, loss {result.initial_loss:.3f} → {result.final_loss:.3f}") # Print failures for debugging failures = [(f, r) for f, c, r in recall_results if not c] if failures: print(f"\n Failed recalls ({len(failures)}):") for fact, resp in failures[:10]: print(f" Q: {fact.question[:70]}") print(f" Expected keywords: {fact.keywords}") print(f" Got: {resp[:100]}") print() gen_failures = [(item, r) for item, c, r in gen_results if not c] if gen_failures: print(f" General knowledge failures ({len(gen_failures)}):") for item, resp in gen_failures: print(f" Q: {item['question']}") print(f" Expected: {item['keywords']}") print(f" Got: {resp[:100]}") return result # ─── Multi-Trial Analysis ──────────────────────────────────────────────────── def run_evaluation(facts: list, n_trials: int = 3, epochs: int = TRAIN_EPOCHS): """Run multiple independent trials and report aggregate statistics.""" print(f"\n{'#'*70}") print(f" STATISTICAL JIT LoRA EVALUATION") print(f" Model: {daemon_status()['model_key']}") print(f" Facts available: {len(facts)}") print(f" Trials: {n_trials}") print(f" Epochs: {epochs} (with early stopping)") print(f" Regularization target: {REGULARIZATION_RATIO*100:.0f}%") print(f"{'#'*70}") # Disable auto_train during pre-testing to avoid contamination daemon_set_auto_train(False) # Pre-test once (base model is the same for all trials after reset) print(f"\n === Pre-testing all {len(facts)} facts (one-time baseline) ===") daemon_reset() time.sleep(2) unknown_facts, known_facts = pretest_facts(facts) print(f"\n Baseline: {len(unknown_facts)} confirmed unknown, {len(known_facts)} already known") print(f" Will train on {len(unknown_facts)} unknown facts across {n_trials} trials\n") if len(unknown_facts) < 10: print(" ERROR: Too few unknown facts for meaningful evaluation.") print(" The model already knows most of the dataset.") return None results = [] for trial in range(1, n_trials + 1): # Shuffle facts for each trial to avoid ordering effects trial_unknown = unknown_facts.copy() random.shuffle(trial_unknown) result = run_trial_prefiltered(trial_unknown, trial, epochs) results.append(result) # ─── Aggregate Statistics ──────────────────────────────────────────── print(f"\n{'='*70}") print(f" AGGREGATE RESULTS ({n_trials} trials)") print(f"{'='*70}") # Recall rates across trials recall_rates = [r.recall_correct / max(1, r.recall_total) for r in results] general_rates = [r.general_correct / max(1, r.general_total) for r in results] training_times = [r.training_time_s for r in results] training_steps_list = [r.training_steps for r in results] n_unknown_list = [r.n_confirmed_unknown for r in results] # Pooled counts for CI calculation pooled_recall_k = sum(r.recall_correct for r in results) pooled_recall_n = sum(r.recall_total for r in results) pooled_gen_k = sum(r.general_correct for r in results) pooled_gen_n = sum(r.general_total for r in results) recall_ci = wilson_interval(pooled_recall_k, pooled_recall_n) general_ci = wilson_interval(pooled_gen_k, pooled_gen_n) print(f"\n Confirmed unknown facts per trial: {n_unknown_list}") print(f" (facts the model verified it did NOT know before training)") print(f"\n ┌─────────────────────────────────────────────────────────────────┐") print(f" │ RECALL (post-training) │") print(f" │ Pooled: {pooled_recall_k}/{pooled_recall_n} ({pooled_recall_k/max(1,pooled_recall_n)*100:.1f}%) │") print(f" │ Per-trial rates: {[f'{r:.1%}' for r in recall_rates]}") if n_trials > 1 and len(recall_rates) > 1: print(f" │ Mean ± StdDev: {statistics.mean(recall_rates):.1%} ± {statistics.stdev(recall_rates):.1%}") print(f" │ 95% CI (Wilson): [{recall_ci[0]:.1%}, {recall_ci[1]:.1%}]") print(f" │ │") print(f" │ GENERAL KNOWLEDGE (preservation) │") print(f" │ Pooled: {pooled_gen_k}/{pooled_gen_n} ({pooled_gen_k/max(1,pooled_gen_n)*100:.1f}%) │") print(f" │ Per-trial rates: {[f'{r:.1%}' for r in general_rates]}") if n_trials > 1 and len(general_rates) > 1: print(f" │ Mean ± StdDev: {statistics.mean(general_rates):.1%} ± {statistics.stdev(general_rates):.1%}") print(f" │ 95% CI (Wilson): [{general_ci[0]:.1%}, {general_ci[1]:.1%}]") print(f" │ │") print(f" │ TRAINING │") print(f" │ Mean time: {statistics.mean(training_times):.1f}s ± {statistics.stdev(training_times) if len(training_times) > 1 else 0:.1f}s") print(f" │ Mean steps: {statistics.mean(training_steps_list):.0f}") print(f" └─────────────────────────────────────────────────────────────────┘") # Per-category aggregation all_categories = set() for r in results: all_categories.update(r.category_scores.keys()) print(f"\n Per-Category Recall (pooled across trials):") print(f" {'Category':<25} {'Correct':>8} {'Total':>8} {'Rate':>8} {'95% CI':>16}") print(f" {'-'*25} {'-'*8} {'-'*8} {'-'*8} {'-'*16}") for cat in sorted(all_categories): cat_k = sum(r.category_scores.get(cat, {}).get("correct", 0) for r in results) cat_n = sum(r.category_scores.get(cat, {}).get("total", 0) for r in results) if cat_n > 0: cat_ci = wilson_interval(cat_k, cat_n) print(f" {cat:<25} {cat_k:>8} {cat_n:>8} {cat_k/cat_n:>8.1%} [{cat_ci[0]:.1%}, {cat_ci[1]:.1%}]") # Save results to JSON output = { "model": daemon_status().get("model_key", "unknown"), "n_trials": n_trials, "epochs": epochs, "regularization_ratio": REGULARIZATION_RATIO, "aggregate": { "recall": { "pooled_correct": pooled_recall_k, "pooled_total": pooled_recall_n, "pooled_rate": pooled_recall_k / max(1, pooled_recall_n), "per_trial_rates": recall_rates, "mean": statistics.mean(recall_rates), "stdev": statistics.stdev(recall_rates) if len(recall_rates) > 1 else 0, "ci_95_lower": recall_ci[0], "ci_95_upper": recall_ci[1], }, "general_knowledge": { "pooled_correct": pooled_gen_k, "pooled_total": pooled_gen_n, "pooled_rate": pooled_gen_k / max(1, pooled_gen_n), "per_trial_rates": general_rates, "mean": statistics.mean(general_rates), "stdev": statistics.stdev(general_rates) if len(general_rates) > 1 else 0, "ci_95_lower": general_ci[0], "ci_95_upper": general_ci[1], }, "training": { "mean_time_s": statistics.mean(training_times), "stdev_time_s": statistics.stdev(training_times) if len(training_times) > 1 else 0, "mean_steps": statistics.mean(training_steps_list), "per_trial_times": training_times, }, }, "trials": [ { "trial_id": r.trial_id, "n_confirmed_unknown": r.n_confirmed_unknown, "n_training_pairs": r.n_training_pairs, "training_steps": r.training_steps, "training_time_s": r.training_time_s, "initial_loss": r.initial_loss, "final_loss": r.final_loss, "recall_correct": r.recall_correct, "recall_total": r.recall_total, "recall_rate": r.recall_correct / max(1, r.recall_total), "general_correct": r.general_correct, "general_total": r.general_total, "general_rate": r.general_correct / max(1, r.general_total), "category_scores": r.category_scores, } for r in results ], } results_path = os.path.join(os.path.dirname(__file__), "evaluation_results.json") with open(results_path, "w") as f: json.dump(output, f, indent=2) print(f"\n Results saved to: {results_path}") return output # ─── Main ──────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Statistical JIT LoRA evaluation") parser.add_argument("--facts-file", default=DEFAULT_FACTS_FILE, help="Path to raw facts file (default: raw_facts_2026.txt)") parser.add_argument("--trials", type=int, default=3, help="Number of independent trials (default: 3)") parser.add_argument("--max-facts", type=int, default=0, help="Max facts to use (0 = all, default: 0)") parser.add_argument("--epochs", type=int, default=TRAIN_EPOCHS, help=f"Training epochs per trial (default: {TRAIN_EPOCHS})") parser.add_argument("--seed", type=int, default=42, help="Random seed for reproducibility (default: 42)") args = parser.parse_args() random.seed(args.seed) # Verify daemon is running try: status = daemon_status() if not status.get("active"): print("ERROR: Daemon not active. Call /activate first.") sys.exit(1) print(f"Daemon OK: {status['model_key']}, {status.get('trainable_params', '?')} trainable params") except Exception as e: print(f"ERROR: Cannot reach daemon at {DAEMON_URL}: {e}") sys.exit(1) # Load facts if not os.path.exists(args.facts_file): print(f"ERROR: Facts file not found: {args.facts_file}") print("Generate it first by running the web scraper or provide a path.") sys.exit(1) facts = load_facts_from_file(args.facts_file) print(f"Loaded {len(facts)} facts from {args.facts_file}") # Deduplicate by question seen = set() unique_facts = [] for f in facts: key = f.question.lower().strip() if key not in seen: seen.add(key) unique_facts.append(f) facts = unique_facts print(f"After dedup: {len(facts)} unique facts") # Category distribution cats = {} for f in facts: cats[f.category] = cats.get(f.category, 0) + 1 print(f"Categories: {dict(sorted(cats.items()))}") if args.max_facts > 0 and args.max_facts < len(facts): # Sample proportionally from each category facts = random.sample(facts, args.max_facts) print(f"Sampled down to {len(facts)} facts") # Run evaluation output = run_evaluation(facts, n_trials=args.trials, epochs=args.epochs) # Final verdict recall_rate = output["aggregate"]["recall"]["mean"] gen_rate = output["aggregate"]["general_knowledge"]["mean"] print(f"\n{'='*70}") if recall_rate >= 0.50 and gen_rate >= 0.80: print(f" ✓ EVALUATION PASSED") print(f" Recall: {recall_rate:.1%} (≥50% threshold)") print(f" General Knowledge: {gen_rate:.1%} (≥80% threshold)") else: print(f" ✗ EVALUATION BELOW THRESHOLD") print(f" Recall: {recall_rate:.1%} {'✓' if recall_rate >= 0.50 else '✗ (<50%)'}") print(f" General Knowledge: {gen_rate:.1%} {'✓' if gen_rate >= 0.80 else '✗ (<80%)'}") print(f"{'='*70}") if __name__ == "__main__": main()