|
|
|
|
|
""" |
|
|
Goodhart Gap Benchmark Evaluation Script |
|
|
|
|
|
Evaluate any model on the Goodhart Gap benchmark to detect the gap |
|
|
between understanding and execution in multi-step reasoning. |
|
|
|
|
|
Usage: |
|
|
# Using OpenAI API |
|
|
python evaluate.py --provider openai --model gpt-4o |
|
|
|
|
|
# Using Anthropic API |
|
|
python evaluate.py --provider anthropic --model claude-3-5-haiku-latest |
|
|
|
|
|
# Using local Ollama |
|
|
python evaluate.py --provider ollama --model llama3.1:8b |
|
|
|
|
|
# Using HuggingFace transformers |
|
|
python evaluate.py --provider huggingface --model meta-llama/Llama-3.1-8B-Instruct |
|
|
|
|
|
# Custom API endpoint |
|
|
python evaluate.py --provider custom --model mymodel --api-url http://localhost:8000/v1 |
|
|
|
|
|
Environment Variables: |
|
|
OPENAI_API_KEY - Required for OpenAI provider |
|
|
ANTHROPIC_API_KEY - Required for Anthropic provider |
|
|
HF_TOKEN - Optional for gated HuggingFace models |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Optional, Callable |
|
|
import time |
|
|
|
|
|
|
|
|
try: |
|
|
import requests |
|
|
HAS_REQUESTS = True |
|
|
except ImportError: |
|
|
HAS_REQUESTS = False |
|
|
|
|
|
@dataclass |
|
|
class TestResult: |
|
|
id: str |
|
|
domain: str |
|
|
problem: str |
|
|
expected: str |
|
|
response: str |
|
|
extracted_answer: str |
|
|
passed: bool |
|
|
latency_ms: float |
|
|
|
|
|
def extract_answer(response: str, expected: str) -> str: |
|
|
"""Extract the answer from model response.""" |
|
|
response = response.strip() |
|
|
|
|
|
|
|
|
numbers = re.findall(r'-?[\d,]+\.?\d*', response) |
|
|
|
|
|
|
|
|
if expected.lower() in ['yes', 'no']: |
|
|
resp_lower = response.lower() |
|
|
if 'yes' in resp_lower and 'no' not in resp_lower.split()[:3]: |
|
|
return 'yes' |
|
|
if 'no' in resp_lower and 'yes' not in resp_lower.split()[:3]: |
|
|
return 'no' |
|
|
if 'cannot determine' in resp_lower or 'cannot be determined' in resp_lower: |
|
|
return 'cannot determine' |
|
|
|
|
|
|
|
|
time_match = re.search(r'(\d{1,2}:\d{2})\s*(AM|PM|am|pm)?', response) |
|
|
if time_match: |
|
|
time_str = time_match.group(1) |
|
|
period = time_match.group(2) or '' |
|
|
return f"{time_str} {period}".strip() |
|
|
|
|
|
|
|
|
if ',' in expected and any(c.isalpha() for c in expected): |
|
|
|
|
|
parts = [p.strip() for p in response.split(',') if p.strip()] |
|
|
if len(parts) >= 3: |
|
|
return ', '.join(parts[:5]) |
|
|
|
|
|
|
|
|
if numbers: |
|
|
return numbers[0].replace(',', '') |
|
|
|
|
|
|
|
|
first_line = response.split('\n')[0] |
|
|
return first_line[:50] if len(first_line) > 50 else first_line |
|
|
|
|
|
def validate_answer(response: str, expected: str, domain: str) -> bool: |
|
|
"""Validate if the response matches the expected answer.""" |
|
|
response = response.lower().strip() |
|
|
expected = expected.lower().strip() |
|
|
|
|
|
|
|
|
if expected in response: |
|
|
return True |
|
|
|
|
|
|
|
|
expected_nums = re.findall(r'-?[\d,]+\.?\d*', expected) |
|
|
response_nums = re.findall(r'-?[\d,]+\.?\d*', response) |
|
|
|
|
|
if expected_nums and response_nums: |
|
|
try: |
|
|
exp_val = float(expected_nums[0].replace(',', '')) |
|
|
for resp_num in response_nums: |
|
|
resp_val = float(resp_num.replace(',', '')) |
|
|
|
|
|
if abs(exp_val - resp_val) < 0.01: |
|
|
return True |
|
|
|
|
|
if exp_val != 0 and abs(exp_val - resp_val) / abs(exp_val) < 0.005: |
|
|
return True |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
if domain == 'time': |
|
|
|
|
|
def normalize_time(t): |
|
|
t = t.lower().replace(' ', '') |
|
|
t = re.sub(r'(\d{1,2}):(\d{2})(am|pm)?', r'\1:\2\3', t) |
|
|
return t |
|
|
|
|
|
if normalize_time(expected) in normalize_time(response): |
|
|
return True |
|
|
|
|
|
|
|
|
if expected in ['yes', 'no', 'cannot determine']: |
|
|
if expected == 'yes' and 'yes' in response and 'no' not in response.split()[:5]: |
|
|
return True |
|
|
if expected == 'no' and 'no' in response and 'yes' not in response.split()[:5]: |
|
|
return True |
|
|
if expected == 'cannot determine' and ('cannot' in response or 'unable' in response): |
|
|
return True |
|
|
|
|
|
|
|
|
if ',' in expected and domain == 'logic': |
|
|
expected_items = [x.strip().lower() for x in expected.split(',')] |
|
|
response_lower = response.lower() |
|
|
|
|
|
positions = [] |
|
|
for item in expected_items: |
|
|
pos = response_lower.find(item) |
|
|
if pos == -1: |
|
|
return False |
|
|
positions.append(pos) |
|
|
return positions == sorted(positions) |
|
|
|
|
|
return False |
|
|
|
|
|
class ModelProvider: |
|
|
"""Base class for model providers.""" |
|
|
|
|
|
def generate(self, prompt: str) -> tuple[str, float]: |
|
|
"""Generate response. Returns (response, latency_ms).""" |
|
|
raise NotImplementedError |
|
|
|
|
|
class OpenAIProvider(ModelProvider): |
|
|
def __init__(self, model: str, api_key: Optional[str] = None): |
|
|
self.model = model |
|
|
self.api_key = api_key or os.environ.get('OPENAI_API_KEY') |
|
|
if not self.api_key: |
|
|
raise ValueError("OPENAI_API_KEY not set") |
|
|
|
|
|
def generate(self, prompt: str) -> tuple[str, float]: |
|
|
start = time.time() |
|
|
headers = { |
|
|
"Authorization": f"Bearer {self.api_key}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
payload = { |
|
|
"model": self.model, |
|
|
"messages": [{"role": "user", "content": prompt}], |
|
|
"temperature": 0.1, |
|
|
"max_tokens": 200 |
|
|
} |
|
|
response = requests.post( |
|
|
"https://api.openai.com/v1/chat/completions", |
|
|
headers=headers, json=payload, timeout=60 |
|
|
) |
|
|
latency = (time.time() - start) * 1000 |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json()["choices"][0]["message"]["content"].strip(), latency |
|
|
else: |
|
|
return f"ERROR: {response.status_code}", latency |
|
|
|
|
|
class AnthropicProvider(ModelProvider): |
|
|
def __init__(self, model: str, api_key: Optional[str] = None): |
|
|
self.model = model |
|
|
self.api_key = api_key or os.environ.get('ANTHROPIC_API_KEY') |
|
|
if not self.api_key: |
|
|
raise ValueError("ANTHROPIC_API_KEY not set") |
|
|
|
|
|
def generate(self, prompt: str) -> tuple[str, float]: |
|
|
start = time.time() |
|
|
headers = { |
|
|
"x-api-key": self.api_key, |
|
|
"anthropic-version": "2023-06-01", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
payload = { |
|
|
"model": self.model, |
|
|
"max_tokens": 200, |
|
|
"messages": [{"role": "user", "content": prompt}] |
|
|
} |
|
|
response = requests.post( |
|
|
"https://api.anthropic.com/v1/messages", |
|
|
headers=headers, json=payload, timeout=60 |
|
|
) |
|
|
latency = (time.time() - start) * 1000 |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json()["content"][0]["text"].strip(), latency |
|
|
else: |
|
|
return f"ERROR: {response.status_code}", latency |
|
|
|
|
|
class OllamaProvider(ModelProvider): |
|
|
def __init__(self, model: str, host: str = "http://localhost:11434"): |
|
|
self.model = model |
|
|
self.host = host |
|
|
|
|
|
def generate(self, prompt: str) -> tuple[str, float]: |
|
|
start = time.time() |
|
|
payload = { |
|
|
"model": self.model, |
|
|
"prompt": prompt, |
|
|
"stream": False, |
|
|
"options": {"temperature": 0.1} |
|
|
} |
|
|
response = requests.post( |
|
|
f"{self.host}/api/generate", |
|
|
json=payload, timeout=120 |
|
|
) |
|
|
latency = (time.time() - start) * 1000 |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json().get("response", "").strip(), latency |
|
|
else: |
|
|
return f"ERROR: {response.status_code}", latency |
|
|
|
|
|
class CustomProvider(ModelProvider): |
|
|
def __init__(self, model: str, api_url: str): |
|
|
self.model = model |
|
|
self.api_url = api_url |
|
|
|
|
|
def generate(self, prompt: str) -> tuple[str, float]: |
|
|
start = time.time() |
|
|
|
|
|
payload = { |
|
|
"model": self.model, |
|
|
"messages": [{"role": "user", "content": prompt}], |
|
|
"temperature": 0.1, |
|
|
"max_tokens": 200 |
|
|
} |
|
|
response = requests.post( |
|
|
f"{self.api_url}/chat/completions", |
|
|
json=payload, timeout=120 |
|
|
) |
|
|
latency = (time.time() - start) * 1000 |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json()["choices"][0]["message"]["content"].strip(), latency |
|
|
else: |
|
|
return f"ERROR: {response.status_code}", latency |
|
|
|
|
|
def load_dataset(path: str = "data/test.jsonl") -> list[dict]: |
|
|
"""Load the benchmark dataset.""" |
|
|
problems = [] |
|
|
with open(path) as f: |
|
|
for line in f: |
|
|
problems.append(json.loads(line)) |
|
|
return problems |
|
|
|
|
|
def evaluate_model( |
|
|
provider: ModelProvider, |
|
|
problems: list[dict], |
|
|
verbose: bool = False |
|
|
) -> tuple[list[TestResult], dict]: |
|
|
"""Evaluate a model on the benchmark.""" |
|
|
|
|
|
results = [] |
|
|
domain_stats = {} |
|
|
|
|
|
for i, problem in enumerate(problems): |
|
|
if verbose: |
|
|
print(f"[{i+1}/{len(problems)}] {problem['id']}...", end=" ", flush=True) |
|
|
|
|
|
response, latency = provider.generate(problem['problem']) |
|
|
extracted = extract_answer(response, problem['correct_answer']) |
|
|
passed = validate_answer(response, problem['correct_answer'], problem['domain']) |
|
|
|
|
|
result = TestResult( |
|
|
id=problem['id'], |
|
|
domain=problem['domain'], |
|
|
problem=problem['problem'], |
|
|
expected=problem['correct_answer'], |
|
|
response=response[:200], |
|
|
extracted_answer=extracted, |
|
|
passed=passed, |
|
|
latency_ms=latency |
|
|
) |
|
|
results.append(result) |
|
|
|
|
|
|
|
|
domain = problem['domain'] |
|
|
if domain not in domain_stats: |
|
|
domain_stats[domain] = {'pass': 0, 'fail': 0} |
|
|
domain_stats[domain]['pass' if passed else 'fail'] += 1 |
|
|
|
|
|
if verbose: |
|
|
status = "PASS" if passed else "FAIL" |
|
|
print(f"{status} (got: {extracted[:20]})") |
|
|
|
|
|
|
|
|
total_pass = sum(r.passed for r in results) |
|
|
total = len(results) |
|
|
|
|
|
summary = { |
|
|
'total': total, |
|
|
'passed': total_pass, |
|
|
'failed': total - total_pass, |
|
|
'pass_rate': total_pass / total if total > 0 else 0, |
|
|
'by_domain': { |
|
|
d: { |
|
|
'passed': s['pass'], |
|
|
'total': s['pass'] + s['fail'], |
|
|
'pass_rate': s['pass'] / (s['pass'] + s['fail']) |
|
|
} |
|
|
for d, s in domain_stats.items() |
|
|
}, |
|
|
'avg_latency_ms': sum(r.latency_ms for r in results) / len(results) if results else 0 |
|
|
} |
|
|
|
|
|
return results, summary |
|
|
|
|
|
def save_results( |
|
|
results: list[TestResult], |
|
|
summary: dict, |
|
|
model_name: str, |
|
|
output_dir: str = "results" |
|
|
): |
|
|
"""Save evaluation results.""" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
safe_model = re.sub(r'[^\w\-]', '_', model_name) |
|
|
|
|
|
|
|
|
results_file = f"{output_dir}/{safe_model}_{timestamp}_results.jsonl" |
|
|
with open(results_file, 'w') as f: |
|
|
for r in results: |
|
|
f.write(json.dumps({ |
|
|
'id': r.id, |
|
|
'domain': r.domain, |
|
|
'expected': r.expected, |
|
|
'response': r.response, |
|
|
'extracted': r.extracted_answer, |
|
|
'passed': r.passed, |
|
|
'latency_ms': r.latency_ms |
|
|
}) + '\n') |
|
|
|
|
|
|
|
|
summary_file = f"{output_dir}/{safe_model}_{timestamp}_summary.json" |
|
|
summary['model'] = model_name |
|
|
summary['timestamp'] = timestamp |
|
|
with open(summary_file, 'w') as f: |
|
|
json.dump(summary, f, indent=2) |
|
|
|
|
|
return results_file, summary_file |
|
|
|
|
|
def print_summary(summary: dict, model_name: str): |
|
|
"""Print evaluation summary.""" |
|
|
print("\n" + "=" * 60) |
|
|
print(f"GOODHART GAP BENCHMARK RESULTS") |
|
|
print(f"Model: {model_name}") |
|
|
print("=" * 60) |
|
|
|
|
|
print(f"\nOverall: {summary['passed']}/{summary['total']} ({summary['pass_rate']*100:.1f}%)") |
|
|
print(f"Average latency: {summary['avg_latency_ms']:.0f}ms") |
|
|
|
|
|
print("\nBy Domain:") |
|
|
print("-" * 40) |
|
|
for domain, stats in sorted(summary['by_domain'].items()): |
|
|
bar = "█" * int(stats['pass_rate'] * 10) + "░" * (10 - int(stats['pass_rate'] * 10)) |
|
|
print(f" {domain:<15} {stats['passed']:>2}/{stats['total']:<2} {bar} {stats['pass_rate']*100:>5.1f}%") |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
|
|
|
|
|
|
pass_rate = summary['pass_rate'] |
|
|
if pass_rate >= 0.9: |
|
|
print("Assessment: LOW GOODHART GAP - Model executes well") |
|
|
elif pass_rate >= 0.7: |
|
|
print("Assessment: MODERATE GOODHART GAP - Some execution issues") |
|
|
elif pass_rate >= 0.5: |
|
|
print("Assessment: SIGNIFICANT GOODHART GAP - Frequent execution failures") |
|
|
else: |
|
|
print("Assessment: SEVERE GOODHART GAP - Major execution problems") |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Evaluate a model on the Goodhart Gap Benchmark", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=__doc__ |
|
|
) |
|
|
parser.add_argument('--provider', required=True, |
|
|
choices=['openai', 'anthropic', 'ollama', 'custom'], |
|
|
help='Model provider') |
|
|
parser.add_argument('--model', required=True, |
|
|
help='Model name/identifier') |
|
|
parser.add_argument('--api-url', default=None, |
|
|
help='API URL for custom provider') |
|
|
parser.add_argument('--data', default='data/test.jsonl', |
|
|
help='Path to test data') |
|
|
parser.add_argument('--output', default='results', |
|
|
help='Output directory') |
|
|
parser.add_argument('--verbose', '-v', action='store_true', |
|
|
help='Show progress') |
|
|
parser.add_argument('--limit', type=int, default=None, |
|
|
help='Limit number of problems (for testing)') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if not HAS_REQUESTS: |
|
|
print("ERROR: requests library required. Install with: pip install requests") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if args.provider == 'openai': |
|
|
provider = OpenAIProvider(args.model) |
|
|
elif args.provider == 'anthropic': |
|
|
provider = AnthropicProvider(args.model) |
|
|
elif args.provider == 'ollama': |
|
|
provider = OllamaProvider(args.model) |
|
|
elif args.provider == 'custom': |
|
|
if not args.api_url: |
|
|
print("ERROR: --api-url required for custom provider") |
|
|
sys.exit(1) |
|
|
provider = CustomProvider(args.model, args.api_url) |
|
|
|
|
|
|
|
|
print(f"Loading dataset from {args.data}...") |
|
|
problems = load_dataset(args.data) |
|
|
if args.limit: |
|
|
problems = problems[:args.limit] |
|
|
print(f"Loaded {len(problems)} problems") |
|
|
|
|
|
|
|
|
print(f"\nEvaluating {args.model}...") |
|
|
results, summary = evaluate_model(provider, problems, verbose=args.verbose) |
|
|
|
|
|
|
|
|
results_file, summary_file = save_results(results, summary, args.model, args.output) |
|
|
print_summary(summary, args.model) |
|
|
|
|
|
print(f"\nResults saved to:") |
|
|
print(f" {results_file}") |
|
|
print(f" {summary_file}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|