gpt2_medium_prefix_682k / scripts /evaluate_nguyen_benchmarks.py
augustocsc's picture
GPT-2 Medium trained on prefix dataset (682K)
a1190da verified
#!/usr/bin/env python3
"""
Evaluate models on Nguyen benchmarks with R² scoring.
Generates candidate expressions and calculates fit quality.
"""
import argparse
import json
import logging
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
sys.path.insert(0, str(Path(__file__).parent.parent))
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
from classes.expression import Expression
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def load_model_auto(model_path: str):
"""Load model with automatic base model detection"""
adapter_config_path = os.path.join(model_path, "adapter_config.json")
if not os.path.exists(adapter_config_path):
raise FileNotFoundError(f"No adapter_config.json in {model_path}")
with open(adapter_config_path) as f:
adapter_config = json.load(f)
base_model_name = adapter_config.get("base_model_name_or_path", "gpt2")
logger.info(f"Loading base model: {base_model_name}")
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None
)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"Loading LoRA adapter from {model_path}")
model = PeftModel.from_pretrained(model, model_path)
model = model.merge_and_unload()
model.eval()
return model, tokenizer, base_model_name
def load_nguyen_benchmark(csv_path: str):
"""Load Nguyen benchmark data"""
df = pd.read_csv(csv_path)
# Extract X and y
y_col = 'y'
x_cols = [col for col in df.columns if col != y_col]
X = df[x_cols].values
y = df[y_col].values
# Read metadata if available
meta_path = csv_path.replace('.csv', '.meta.txt')
true_formula = None
if os.path.exists(meta_path):
with open(meta_path) as f:
for line in f:
if 'formula:' in line.lower() or 'expression:' in line.lower():
true_formula = line.split(':', 1)[1].strip()
break
return X, y, x_cols, true_formula
def create_json_prompt(x_cols, operators=None):
"""Create JSON format prompt for expression generation"""
if operators is None:
operators = ["+", "-", "*", "/", "sin", "cos", "exp", "log", "sqrt", "abs"]
prompt = {
"vars": x_cols,
"ops": operators,
"cons": "C",
"expr": ""
}
prompt_str = json.dumps(prompt, ensure_ascii=False)
prompt_str = prompt_str.rsplit('"expr":', 1)[0] + '"expr": "'
return prompt_str
def extract_expression_json(output: str):
"""Extract expression from JSON output"""
import re
# Try to find complete JSON "expr": "..."
match = re.search(r'"expr":\s*"([^"]*)"', output)
if match:
return match.group(1)
# Try to find partial JSON
match = re.search(r'"expr":\s*"([^"]+)', output)
if match:
expr = match.group(1)
expr = expr.split('"')[0].split('}')[0].strip()
return expr
return None
def evaluate_model_on_benchmark(model, tokenizer, X, y, x_cols, num_samples=100):
"""Evaluate model on a single benchmark"""
device = model.device
results = {
"expressions": [],
"valid_count": 0,
"r2_scores": [],
"best_r2": -float('inf'),
"best_expression": None
}
logger.info(f"Generating {num_samples} candidate expressions...")
for i in tqdm(range(num_samples), desc="Generating"):
prompt = create_json_prompt(x_cols)
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=100,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
expr_str = extract_expression_json(generated)
is_valid = False
r2 = -float('inf')
error_msg = None
if expr_str:
try:
expr = Expression(expr_str, is_prefix=False)
# Check if expression is valid
if expr.sympy_expression is not None:
# Try to evaluate on dataset
try:
if expr.is_valid_on_dataset(X):
is_valid = True
results["valid_count"] += 1
# Fit constants and compute R²
try:
r2 = expr.fit_constants(X, y)
if np.isfinite(r2):
results["r2_scores"].append(r2)
if r2 > results["best_r2"]:
results["best_r2"] = r2
results["best_expression"] = expr_str
else:
r2 = -float('inf')
error_msg = "Non-finite R²"
except Exception as e:
error_msg = f"Fit error: {str(e)[:100]}"
else:
error_msg = "Invalid on dataset"
except Exception as e:
error_msg = f"Evaluation error: {str(e)[:100]}"
except Exception as e:
error_msg = f"Parse error: {str(e)[:100]}"
else:
error_msg = "Failed to extract expression"
results["expressions"].append({
"index": i,
"expression": expr_str,
"valid": is_valid,
"r2": float(r2) if np.isfinite(r2) else None,
"error": error_msg
})
# Compute summary statistics
valid_rate = results["valid_count"] / num_samples if num_samples > 0 else 0
r2_scores = results["r2_scores"]
results["summary"] = {
"num_samples": num_samples,
"valid_count": results["valid_count"],
"valid_rate": valid_rate,
"num_with_r2": len(r2_scores),
"best_r2": float(results["best_r2"]) if np.isfinite(results["best_r2"]) else None,
"mean_r2": float(np.mean(r2_scores)) if r2_scores else None,
"median_r2": float(np.median(r2_scores)) if r2_scores else None,
"std_r2": float(np.std(r2_scores)) if r2_scores else None,
"best_expression": results["best_expression"]
}
return results
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model_path", type=str, required=True, help="Path to model")
parser.add_argument("--benchmark_csv", type=str, required=True, help="Path to Nguyen benchmark CSV")
parser.add_argument("--num_samples", type=int, default=100, help="Number of candidate expressions to generate")
parser.add_argument("--output_file", type=str, required=True, help="Output JSON file")
args = parser.parse_args()
logger.info("="*60)
logger.info(f"Evaluating: {os.path.basename(args.model_path)}")
logger.info(f"Benchmark: {os.path.basename(args.benchmark_csv)}")
logger.info("="*60)
# Load model
model, tokenizer, base_model_name = load_model_auto(args.model_path)
# Load benchmark
X, y, x_cols, true_formula = load_nguyen_benchmark(args.benchmark_csv)
logger.info(f"Loaded benchmark: {X.shape[0]} samples, {len(x_cols)} variables")
if true_formula:
logger.info(f"True formula: {true_formula}")
# Evaluate
results = evaluate_model_on_benchmark(model, tokenizer, X, y, x_cols, args.num_samples)
# Add metadata
results["metadata"] = {
"model_path": args.model_path,
"base_model": base_model_name,
"benchmark_csv": args.benchmark_csv,
"true_formula": true_formula,
"num_variables": len(x_cols),
"num_data_points": len(y)
}
# Save results
os.makedirs(os.path.dirname(args.output_file), exist_ok=True)
with open(args.output_file, 'w') as f:
json.dump(results, f, indent=2)
# Print summary
logger.info("\n" + "="*60)
logger.info("RESULTS SUMMARY")
logger.info("="*60)
logger.info(f"Valid expressions: {results['summary']['valid_count']}/{results['summary']['num_samples']} ({results['summary']['valid_rate']*100:.1f}%)")
logger.info(f"Expressions with R²: {results['summary']['num_with_r2']}")
if results['summary']['best_r2'] is not None:
logger.info(f"Best R²: {results['summary']['best_r2']:.6f}")
logger.info(f"Mean R²: {results['summary']['mean_r2']:.6f}")
logger.info(f"Median R²: {results['summary']['median_r2']:.6f}")
logger.info(f"Best expression: {results['summary']['best_expression']}")
else:
logger.info("No valid R² scores obtained")
logger.info(f"\nResults saved to: {args.output_file}")
if __name__ == "__main__":
main()