#!/usr/bin/env python3 """ Goodhart Gap Benchmark Evaluation Script Evaluate any model on the Goodhart Gap benchmark to detect the gap between understanding and execution in multi-step reasoning. Usage: # Using OpenAI API python evaluate.py --provider openai --model gpt-4o # Using Anthropic API python evaluate.py --provider anthropic --model claude-3-5-haiku-latest # Using local Ollama python evaluate.py --provider ollama --model llama3.1:8b # Using HuggingFace transformers python evaluate.py --provider huggingface --model meta-llama/Llama-3.1-8B-Instruct # Custom API endpoint python evaluate.py --provider custom --model mymodel --api-url http://localhost:8000/v1 Environment Variables: OPENAI_API_KEY - Required for OpenAI provider ANTHROPIC_API_KEY - Required for Anthropic provider HF_TOKEN - Optional for gated HuggingFace models """ import argparse import json import os import re import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional, Callable import time # Optional imports try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False @dataclass class TestResult: id: str domain: str problem: str expected: str response: str extracted_answer: str passed: bool latency_ms: float def extract_answer(response: str, expected: str) -> str: """Extract the answer from model response.""" response = response.strip() # Try to find numbers in the response numbers = re.findall(r'-?[\d,]+\.?\d*', response) # For yes/no questions if expected.lower() in ['yes', 'no']: resp_lower = response.lower() if 'yes' in resp_lower and 'no' not in resp_lower.split()[:3]: return 'yes' if 'no' in resp_lower and 'yes' not in resp_lower.split()[:3]: return 'no' if 'cannot determine' in resp_lower or 'cannot be determined' in resp_lower: return 'cannot determine' # For time answers time_match = re.search(r'(\d{1,2}:\d{2})\s*(AM|PM|am|pm)?', response) if time_match: time_str = time_match.group(1) period = time_match.group(2) or '' return f"{time_str} {period}".strip() # For ordering questions (comma-separated names) if ',' in expected and any(c.isalpha() for c in expected): # Try to extract comma-separated list parts = [p.strip() for p in response.split(',') if p.strip()] if len(parts) >= 3: return ', '.join(parts[:5]) # Return first number found if numbers: return numbers[0].replace(',', '') # Return first line or truncated response first_line = response.split('\n')[0] return first_line[:50] if len(first_line) > 50 else first_line def validate_answer(response: str, expected: str, domain: str) -> bool: """Validate if the response matches the expected answer.""" response = response.lower().strip() expected = expected.lower().strip() # Direct match if expected in response: return True # Numeric comparison expected_nums = re.findall(r'-?[\d,]+\.?\d*', expected) response_nums = re.findall(r'-?[\d,]+\.?\d*', response) if expected_nums and response_nums: try: exp_val = float(expected_nums[0].replace(',', '')) for resp_num in response_nums: resp_val = float(resp_num.replace(',', '')) # Allow small floating point tolerance if abs(exp_val - resp_val) < 0.01: return True # Check if it's within 0.5% (for rounding) if exp_val != 0 and abs(exp_val - resp_val) / abs(exp_val) < 0.005: return True except ValueError: pass # Time validation if domain == 'time': # Normalize time formats def normalize_time(t): t = t.lower().replace(' ', '') t = re.sub(r'(\d{1,2}):(\d{2})(am|pm)?', r'\1:\2\3', t) return t if normalize_time(expected) in normalize_time(response): return True # Yes/no validation if expected in ['yes', 'no', 'cannot determine']: if expected == 'yes' and 'yes' in response and 'no' not in response.split()[:5]: return True if expected == 'no' and 'no' in response and 'yes' not in response.split()[:5]: return True if expected == 'cannot determine' and ('cannot' in response or 'unable' in response): return True # Ordering validation (check sequence) if ',' in expected and domain == 'logic': expected_items = [x.strip().lower() for x in expected.split(',')] response_lower = response.lower() # Check if items appear in correct order positions = [] for item in expected_items: pos = response_lower.find(item) if pos == -1: return False positions.append(pos) return positions == sorted(positions) return False class ModelProvider: """Base class for model providers.""" def generate(self, prompt: str) -> tuple[str, float]: """Generate response. Returns (response, latency_ms).""" raise NotImplementedError class OpenAIProvider(ModelProvider): def __init__(self, model: str, api_key: Optional[str] = None): self.model = model self.api_key = api_key or os.environ.get('OPENAI_API_KEY') if not self.api_key: raise ValueError("OPENAI_API_KEY not set") def generate(self, prompt: str) -> tuple[str, float]: start = time.time() headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 200 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=payload, timeout=60 ) latency = (time.time() - start) * 1000 if response.status_code == 200: return response.json()["choices"][0]["message"]["content"].strip(), latency else: return f"ERROR: {response.status_code}", latency class AnthropicProvider(ModelProvider): def __init__(self, model: str, api_key: Optional[str] = None): self.model = model self.api_key = api_key or os.environ.get('ANTHROPIC_API_KEY') if not self.api_key: raise ValueError("ANTHROPIC_API_KEY not set") def generate(self, prompt: str) -> tuple[str, float]: start = time.time() headers = { "x-api-key": self.api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } payload = { "model": self.model, "max_tokens": 200, "messages": [{"role": "user", "content": prompt}] } response = requests.post( "https://api.anthropic.com/v1/messages", headers=headers, json=payload, timeout=60 ) latency = (time.time() - start) * 1000 if response.status_code == 200: return response.json()["content"][0]["text"].strip(), latency else: return f"ERROR: {response.status_code}", latency class OllamaProvider(ModelProvider): def __init__(self, model: str, host: str = "http://localhost:11434"): self.model = model self.host = host def generate(self, prompt: str) -> tuple[str, float]: start = time.time() payload = { "model": self.model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1} } response = requests.post( f"{self.host}/api/generate", json=payload, timeout=120 ) latency = (time.time() - start) * 1000 if response.status_code == 200: return response.json().get("response", "").strip(), latency else: return f"ERROR: {response.status_code}", latency class CustomProvider(ModelProvider): def __init__(self, model: str, api_url: str): self.model = model self.api_url = api_url def generate(self, prompt: str) -> tuple[str, float]: start = time.time() # Assume OpenAI-compatible API payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 200 } response = requests.post( f"{self.api_url}/chat/completions", json=payload, timeout=120 ) latency = (time.time() - start) * 1000 if response.status_code == 200: return response.json()["choices"][0]["message"]["content"].strip(), latency else: return f"ERROR: {response.status_code}", latency def load_dataset(path: str = "data/test.jsonl") -> list[dict]: """Load the benchmark dataset.""" problems = [] with open(path) as f: for line in f: problems.append(json.loads(line)) return problems def evaluate_model( provider: ModelProvider, problems: list[dict], verbose: bool = False ) -> tuple[list[TestResult], dict]: """Evaluate a model on the benchmark.""" results = [] domain_stats = {} for i, problem in enumerate(problems): if verbose: print(f"[{i+1}/{len(problems)}] {problem['id']}...", end=" ", flush=True) response, latency = provider.generate(problem['problem']) extracted = extract_answer(response, problem['correct_answer']) passed = validate_answer(response, problem['correct_answer'], problem['domain']) result = TestResult( id=problem['id'], domain=problem['domain'], problem=problem['problem'], expected=problem['correct_answer'], response=response[:200], extracted_answer=extracted, passed=passed, latency_ms=latency ) results.append(result) # Track domain stats domain = problem['domain'] if domain not in domain_stats: domain_stats[domain] = {'pass': 0, 'fail': 0} domain_stats[domain]['pass' if passed else 'fail'] += 1 if verbose: status = "PASS" if passed else "FAIL" print(f"{status} (got: {extracted[:20]})") # Calculate summary total_pass = sum(r.passed for r in results) total = len(results) summary = { 'total': total, 'passed': total_pass, 'failed': total - total_pass, 'pass_rate': total_pass / total if total > 0 else 0, 'by_domain': { d: { 'passed': s['pass'], 'total': s['pass'] + s['fail'], 'pass_rate': s['pass'] / (s['pass'] + s['fail']) } for d, s in domain_stats.items() }, 'avg_latency_ms': sum(r.latency_ms for r in results) / len(results) if results else 0 } return results, summary def save_results( results: list[TestResult], summary: dict, model_name: str, output_dir: str = "results" ): """Save evaluation results.""" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_model = re.sub(r'[^\w\-]', '_', model_name) # Save detailed results results_file = f"{output_dir}/{safe_model}_{timestamp}_results.jsonl" with open(results_file, 'w') as f: for r in results: f.write(json.dumps({ 'id': r.id, 'domain': r.domain, 'expected': r.expected, 'response': r.response, 'extracted': r.extracted_answer, 'passed': r.passed, 'latency_ms': r.latency_ms }) + '\n') # Save summary summary_file = f"{output_dir}/{safe_model}_{timestamp}_summary.json" summary['model'] = model_name summary['timestamp'] = timestamp with open(summary_file, 'w') as f: json.dump(summary, f, indent=2) return results_file, summary_file def print_summary(summary: dict, model_name: str): """Print evaluation summary.""" print("\n" + "=" * 60) print(f"GOODHART GAP BENCHMARK RESULTS") print(f"Model: {model_name}") print("=" * 60) print(f"\nOverall: {summary['passed']}/{summary['total']} ({summary['pass_rate']*100:.1f}%)") print(f"Average latency: {summary['avg_latency_ms']:.0f}ms") print("\nBy Domain:") print("-" * 40) for domain, stats in sorted(summary['by_domain'].items()): bar = "█" * int(stats['pass_rate'] * 10) + "░" * (10 - int(stats['pass_rate'] * 10)) print(f" {domain:<15} {stats['passed']:>2}/{stats['total']:<2} {bar} {stats['pass_rate']*100:>5.1f}%") print("\n" + "=" * 60) # Interpret results pass_rate = summary['pass_rate'] if pass_rate >= 0.9: print("Assessment: LOW GOODHART GAP - Model executes well") elif pass_rate >= 0.7: print("Assessment: MODERATE GOODHART GAP - Some execution issues") elif pass_rate >= 0.5: print("Assessment: SIGNIFICANT GOODHART GAP - Frequent execution failures") else: print("Assessment: SEVERE GOODHART GAP - Major execution problems") def main(): parser = argparse.ArgumentParser( description="Evaluate a model on the Goodhart Gap Benchmark", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument('--provider', required=True, choices=['openai', 'anthropic', 'ollama', 'custom'], help='Model provider') parser.add_argument('--model', required=True, help='Model name/identifier') parser.add_argument('--api-url', default=None, help='API URL for custom provider') parser.add_argument('--data', default='data/test.jsonl', help='Path to test data') parser.add_argument('--output', default='results', help='Output directory') parser.add_argument('--verbose', '-v', action='store_true', help='Show progress') parser.add_argument('--limit', type=int, default=None, help='Limit number of problems (for testing)') args = parser.parse_args() if not HAS_REQUESTS: print("ERROR: requests library required. Install with: pip install requests") sys.exit(1) # Create provider if args.provider == 'openai': provider = OpenAIProvider(args.model) elif args.provider == 'anthropic': provider = AnthropicProvider(args.model) elif args.provider == 'ollama': provider = OllamaProvider(args.model) elif args.provider == 'custom': if not args.api_url: print("ERROR: --api-url required for custom provider") sys.exit(1) provider = CustomProvider(args.model, args.api_url) # Load dataset print(f"Loading dataset from {args.data}...") problems = load_dataset(args.data) if args.limit: problems = problems[:args.limit] print(f"Loaded {len(problems)} problems") # Evaluate print(f"\nEvaluating {args.model}...") results, summary = evaluate_model(provider, problems, verbose=args.verbose) # Save and print results results_file, summary_file = save_results(results, summary, args.model, args.output) print_summary(summary, args.model) print(f"\nResults saved to:") print(f" {results_file}") print(f" {summary_file}") if __name__ == "__main__": main()