""" Benchmark harness for the reasoning engine. Train/test split → teach train → evaluate on held-out test. Reports: exact match, F1, abstention rate, latency. Usage: python3 benchmark.py # all datasets, 80/20 split python3 benchmark.py --split 0.9 # 90/10 split python3 benchmark.py --dataset hotpotqa # single dataset python3 benchmark.py --test-only # skip training, test existing brain """ import sys, os, json, time, argparse, random, re from pathlib import Path from collections import defaultdict sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from brain import Brain def load_brain(db_path): """Load brain — auto-detect CSR > LMDB > SQLite.""" csr_path = os.path.join(db_path, 'cooc_csr', 'indptr.bin') lmdb_path = os.path.join(db_path, 'brain.lmdb') if os.path.exists(csr_path) and os.path.exists(lmdb_path): from brain_csr_adapter import BrainCSR return BrainCSR(db_path=db_path) if os.path.exists(lmdb_path): from brain_lmdb_adapter import BrainLMDB return BrainLMDB(db_path=db_path) return Brain(db_path=db_path) DATA_DIR = Path.home() / "webmind-research" / "data" def tokenize(text): """Simple tokenization for F1 scoring.""" return re.findall(r'\w+', text.lower()) def exact_match(prediction, gold): """Normalized exact match.""" return prediction.strip().lower() == gold.strip().lower() def f1_score(prediction, gold): """Token-level F1 between prediction and gold.""" pred_tokens = set(tokenize(prediction)) gold_tokens = set(tokenize(gold)) if not gold_tokens: return 1.0 if not pred_tokens else 0.0 if not pred_tokens: return 0.0 common = pred_tokens & gold_tokens if not common: return 0.0 precision = len(common) / len(pred_tokens) recall = len(common) / len(gold_tokens) return 2 * precision * recall / (precision + recall) def load_qa_records(dataset_path): """Load records that have questions and answers (for evaluation).""" records = [] with open(dataset_path) as f: for line in f: try: item = json.loads(line.strip()) except json.JSONDecodeError: continue question = item.get('question', item.get('text', '')).strip() answer = item.get('answer', '').strip() if question and answer: records.append({'question': question, 'answer': answer}) return records def split_data(records, train_ratio=0.8, seed=42): """Deterministic train/test split.""" rng = random.Random(seed) shuffled = list(records) rng.shuffle(shuffled) split_idx = int(len(shuffled) * train_ratio) return shuffled[:split_idx], shuffled[split_idx:] def teach_records(brain, records): """Teach training records to brain.""" for rec in records: q = rec['question'] a = rec['answer'] # Teach the answer as knowledge if len(a) < 50 and not a.endswith('.'): combined = f"{q.rstrip('?')} is {a}" else: combined = a brain.teach(combined, confidence=0.6) def evaluate(brain, test_records, verbose=False): """Evaluate brain on held-out test records.""" results = { 'total': len(test_records), 'exact_match': 0, 'f1_sum': 0.0, 'abstentions': 0, 'latency_ms': [], 'examples': [], } for rec in test_records: question = rec['question'] gold = rec['answer'] t0 = time.time() try: result = brain.ask(question, auto_learn=False) except TypeError: result = brain.ask(question) latency = (time.time() - t0) * 1000 prediction = result['answer'] strategy = result['strategy'] confidence = result['confidence'] results['latency_ms'].append(latency) if strategy == 'abstain': results['abstentions'] += 1 em = 0 f1 = 0.0 else: em = 1 if exact_match(prediction, gold) else 0 f1 = f1_score(prediction, gold) results['exact_match'] += em results['f1_sum'] += f1 if verbose and len(results['examples']) < 10: results['examples'].append({ 'question': question[:80], 'gold': gold[:80], 'prediction': prediction[:80], 'em': em, 'f1': round(f1, 3), 'confidence': round(confidence, 3), 'strategy': strategy, 'latency_ms': round(latency, 1), }) n = results['total'] results['exact_match_pct'] = round(100 * results['exact_match'] / n, 2) if n else 0 results['f1_avg'] = round(results['f1_sum'] / n, 4) if n else 0 results['abstention_rate'] = round(100 * results['abstentions'] / n, 2) if n else 0 results['latency_avg_ms'] = round(sum(results['latency_ms']) / n, 1) if n else 0 results['latency_p95_ms'] = round(sorted(results['latency_ms'])[int(n * 0.95)] if n else 0, 1) return results def rlhf_epoch(brain, test_records, epoch_num): """ One RLHF epoch: evaluate → reinforce good edges → weaken bad ones. For each test question: - Ask the brain (get answer + participating words) - Score against gold (F1) - If F1 > 0.5: boost co-occurrence edges for answer words (×1.1) - If F1 < 0.2: weaken co-occurrence edges (×0.9) - Between: no change (uncertain) Works with both BrainCSR (WAL) and dict-based Brain (_cooc). """ f1_sum = 0.0 em_count = 0 reinforced = 0 weakened = 0 # Check which backend we're using has_wal = hasattr(brain, '_wal') has_cooc = hasattr(brain, '_cooc') for rec in test_records: question = rec['question'] gold = rec['answer'] try: result = brain.ask(question, auto_learn=False) except TypeError: result = brain.ask(question) prediction = result['answer'] strategy = result['strategy'] if strategy == 'abstain': continue f1 = f1_score(prediction, gold) f1_sum += f1 if exact_match(prediction, gold): em_count += 1 # Get word indices that participated in the answer content_words = [t for t in tokenize(prediction) if t in brain._word_idx] word_indices = [brain._word_idx[w] for w in content_words] if not word_indices: continue # Reinforce or weaken based on quality if f1 > 0.5: # Good answer — reinforce edges for widx in word_indices: profile = brain._get_profile(widx) for neighbor, weight in profile.items(): if neighbor == widx: continue boost = weight * 0.5 # 50% boost (was 10%) if has_wal: brain._wal.add_edge(widx, neighbor, boost) elif has_cooc and widx in brain._cooc: brain._cooc[widx][neighbor] = weight * 1.5 reinforced += 1 elif f1 < 0.2: # Bad answer — weaken edges AND teach correct answer via Q→A map if hasattr(brain, 'correct'): brain.correct(question, gold) # direct Q→A mapping + strong edge boost for widx in word_indices: profile = brain._get_profile(widx) for neighbor, weight in profile.items(): if neighbor == widx: continue penalty = -weight * 0.5 # 50% reduction (was 10%) if has_wal: brain._wal.add_edge(widx, neighbor, penalty) elif has_cooc and widx in brain._cooc: brain._cooc[widx][neighbor] = weight * 0.5 weakened += 1 n = len(test_records) f1_avg = f1_sum / n if n else 0 em_pct = 100 * em_count / n if n else 0 print(f" Epoch {epoch_num}: F1={f1_avg:.4f} EM={em_pct:.1f}% " f"reinforced={reinforced} weakened={weakened}") return f1_avg, em_pct, reinforced, weakened def run_benchmark(args): """Main benchmark flow.""" # Find datasets if args.dataset == 'all': ds_paths = sorted(DATA_DIR.glob("*.jsonl")) else: ds_paths = [DATA_DIR / f"{args.dataset}.jsonl"] # Load all Q&A records all_records = [] ds_counts = {} for ds_path in ds_paths: if not ds_path.exists() or ds_path.stat().st_size == 0: continue records = load_qa_records(ds_path) if records: ds_counts[ds_path.stem] = len(records) all_records.extend(records) if not all_records: print("No Q&A records found.") return print(f"Loaded {len(all_records):,} Q&A records from {len(ds_counts)} datasets") for ds, count in sorted(ds_counts.items(), key=lambda x: -x[1])[:10]: print(f" {ds}: {count:,}") # Split train, test = split_data(all_records, train_ratio=args.split) print(f"\nSplit: {len(train):,} train / {len(test):,} test " f"({args.split:.0%}/{1-args.split:.0%})") # Cap test size for speed if args.test_limit and len(test) > args.test_limit: test = test[:args.test_limit] print(f"Test capped at {args.test_limit}") # Train if not args.test_only: db_path = args.db_path or '/tmp/benchmark_brain' os.makedirs(db_path, exist_ok=True) print(f"\nTraining on {len(train):,} records...") brain = Brain(db_path=db_path) brain.begin_bulk() t0 = time.time() teach_records(brain, train) brain.end_bulk() train_time = time.time() - t0 print(f"Training: {train_time:.1f}s | " f"{len(brain._words):,} words | " f"{brain.db.count():,} neurons") else: db_path = args.db_path or os.path.expanduser('~/nexus-brain') brain = load_brain(db_path) print(f"\nUsing existing brain: {len(brain._words):,} words") # RLHF epochs (if requested) if args.epochs > 0: print(f"\n=== RLHF: {args.epochs} epochs ===") # Use a subset for RLHF (faster iterations) rlhf_set = test[:min(200, len(test))] epoch_results = [] for epoch in range(1, args.epochs + 1): f1, em, reinforced, weakened = rlhf_epoch(brain, rlhf_set, epoch) epoch_results.append({'epoch': epoch, 'f1': f1, 'em': em, 'reinforced': reinforced, 'weakened': weakened}) if reinforced == 0 and weakened == 0: print(f" Converged at epoch {epoch} (no changes)") break print(f" RLHF complete. F1 improvement: " f"{epoch_results[0]['f1']:.4f} → {epoch_results[-1]['f1']:.4f}") # Evaluate (final, after RLHF if any) print(f"\nEvaluating on {len(test):,} held-out questions...") t0 = time.time() results = evaluate(brain, test, verbose=True) eval_time = time.time() - t0 # Report print(f"\n{'='*60}") print(f"BENCHMARK RESULTS") print(f"{'='*60}") print(f" Exact Match: {results['exact_match_pct']}% " f"({results['exact_match']}/{results['total']})") print(f" Token F1: {results['f1_avg']}") print(f" Abstention Rate: {results['abstention_rate']}% " f"({results['abstentions']}/{results['total']})") print(f" Latency (avg): {results['latency_avg_ms']} ms") print(f" Latency (p95): {results['latency_p95_ms']} ms") print(f" Eval time: {eval_time:.1f}s") print(f"{'='*60}") if results['examples']: print(f"\nSample predictions:") for ex in results['examples'][:5]: status = "✓" if ex['em'] else ("∅" if ex['strategy'] == 'abstain' else "✗") print(f" {status} Q: {ex['question']}") print(f" Gold: {ex['gold']}") print(f" Pred: {ex['prediction']} " f"[{ex['strategy']}, f1={ex['f1']}, {ex['latency_ms']}ms]") print() # Save results out_path = Path(db_path) / 'benchmark_results.json' with open(out_path, 'w') as f: json.dump({ 'exact_match_pct': results['exact_match_pct'], 'f1_avg': results['f1_avg'], 'abstention_rate': results['abstention_rate'], 'latency_avg_ms': results['latency_avg_ms'], 'latency_p95_ms': results['latency_p95_ms'], 'total_test': results['total'], 'total_train': len(train), 'datasets': ds_counts, 'split': args.split, }, f, indent=2) print(f"Results saved to {out_path}") brain.close() def main(): parser = argparse.ArgumentParser(description="Benchmark the reasoning engine") parser.add_argument("--dataset", default="all", help="Dataset name or 'all'") parser.add_argument("--split", type=float, default=0.8, help="Train ratio (default 0.8 = 80/20)") parser.add_argument("--test-limit", type=int, default=500, help="Max test questions for speed (0=unlimited)") parser.add_argument("--test-only", action="store_true", help="Skip training, evaluate existing brain") parser.add_argument("--db-path", default=None, help="Brain DB path (default: /tmp/benchmark_brain)") parser.add_argument("--epochs", type=int, default=5, help="RLHF epochs (0=skip, default=5)") parser.add_argument("--seed", type=int, default=42) args = parser.parse_args() run_benchmark(args) if __name__ == '__main__': main()