File size: 9,417 Bytes
2c4ca2f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 | #!/usr/bin/env python3
"""
Evaluate models on Nguyen benchmarks with R² scoring.
Generates candidate expressions and calculates fit quality.
"""
import argparse
import json
import logging
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
sys.path.insert(0, str(Path(__file__).parent.parent))
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
from classes.expression import Expression
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def load_model_auto(model_path: str):
"""Load model with automatic base model detection"""
adapter_config_path = os.path.join(model_path, "adapter_config.json")
if not os.path.exists(adapter_config_path):
raise FileNotFoundError(f"No adapter_config.json in {model_path}")
with open(adapter_config_path) as f:
adapter_config = json.load(f)
base_model_name = adapter_config.get("base_model_name_or_path", "gpt2")
logger.info(f"Loading base model: {base_model_name}")
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None
)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"Loading LoRA adapter from {model_path}")
model = PeftModel.from_pretrained(model, model_path)
model = model.merge_and_unload()
model.eval()
return model, tokenizer, base_model_name
def load_nguyen_benchmark(csv_path: str):
"""Load Nguyen benchmark data"""
df = pd.read_csv(csv_path)
# Extract X and y
y_col = 'y'
x_cols = [col for col in df.columns if col != y_col]
X = df[x_cols].values
y = df[y_col].values
# Read metadata if available
meta_path = csv_path.replace('.csv', '.meta.txt')
true_formula = None
if os.path.exists(meta_path):
with open(meta_path) as f:
for line in f:
if 'formula:' in line.lower() or 'expression:' in line.lower():
true_formula = line.split(':', 1)[1].strip()
break
return X, y, x_cols, true_formula
def create_json_prompt(x_cols, operators=None):
"""Create JSON format prompt for expression generation"""
if operators is None:
operators = ["+", "-", "*", "/", "sin", "cos", "exp", "log", "sqrt", "abs"]
prompt = {
"vars": x_cols,
"ops": operators,
"cons": "C",
"expr": ""
}
prompt_str = json.dumps(prompt, ensure_ascii=False)
prompt_str = prompt_str.rsplit('"expr":', 1)[0] + '"expr": "'
return prompt_str
def extract_expression_json(output: str):
"""Extract expression from JSON output"""
import re
# Try to find complete JSON "expr": "..."
match = re.search(r'"expr":\s*"([^"]*)"', output)
if match:
return match.group(1)
# Try to find partial JSON
match = re.search(r'"expr":\s*"([^"]+)', output)
if match:
expr = match.group(1)
expr = expr.split('"')[0].split('}')[0].strip()
return expr
return None
def evaluate_model_on_benchmark(model, tokenizer, X, y, x_cols, num_samples=100):
"""Evaluate model on a single benchmark"""
device = model.device
results = {
"expressions": [],
"valid_count": 0,
"r2_scores": [],
"best_r2": -float('inf'),
"best_expression": None
}
logger.info(f"Generating {num_samples} candidate expressions...")
for i in tqdm(range(num_samples), desc="Generating"):
prompt = create_json_prompt(x_cols)
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=100,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
expr_str = extract_expression_json(generated)
is_valid = False
r2 = -float('inf')
error_msg = None
if expr_str:
try:
expr = Expression(expr_str, is_prefix=False)
# Check if expression is valid
if expr.sympy_expression is not None:
# Try to evaluate on dataset
try:
if expr.is_valid_on_dataset(X):
is_valid = True
results["valid_count"] += 1
# Fit constants and compute R²
try:
r2 = expr.fit_constants(X, y)
if np.isfinite(r2):
results["r2_scores"].append(r2)
if r2 > results["best_r2"]:
results["best_r2"] = r2
results["best_expression"] = expr_str
else:
r2 = -float('inf')
error_msg = "Non-finite R²"
except Exception as e:
error_msg = f"Fit error: {str(e)[:100]}"
else:
error_msg = "Invalid on dataset"
except Exception as e:
error_msg = f"Evaluation error: {str(e)[:100]}"
except Exception as e:
error_msg = f"Parse error: {str(e)[:100]}"
else:
error_msg = "Failed to extract expression"
results["expressions"].append({
"index": i,
"expression": expr_str,
"valid": is_valid,
"r2": float(r2) if np.isfinite(r2) else None,
"error": error_msg
})
# Compute summary statistics
valid_rate = results["valid_count"] / num_samples if num_samples > 0 else 0
r2_scores = results["r2_scores"]
results["summary"] = {
"num_samples": num_samples,
"valid_count": results["valid_count"],
"valid_rate": valid_rate,
"num_with_r2": len(r2_scores),
"best_r2": float(results["best_r2"]) if np.isfinite(results["best_r2"]) else None,
"mean_r2": float(np.mean(r2_scores)) if r2_scores else None,
"median_r2": float(np.median(r2_scores)) if r2_scores else None,
"std_r2": float(np.std(r2_scores)) if r2_scores else None,
"best_expression": results["best_expression"]
}
return results
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model_path", type=str, required=True, help="Path to model")
parser.add_argument("--benchmark_csv", type=str, required=True, help="Path to Nguyen benchmark CSV")
parser.add_argument("--num_samples", type=int, default=100, help="Number of candidate expressions to generate")
parser.add_argument("--output_file", type=str, required=True, help="Output JSON file")
args = parser.parse_args()
logger.info("="*60)
logger.info(f"Evaluating: {os.path.basename(args.model_path)}")
logger.info(f"Benchmark: {os.path.basename(args.benchmark_csv)}")
logger.info("="*60)
# Load model
model, tokenizer, base_model_name = load_model_auto(args.model_path)
# Load benchmark
X, y, x_cols, true_formula = load_nguyen_benchmark(args.benchmark_csv)
logger.info(f"Loaded benchmark: {X.shape[0]} samples, {len(x_cols)} variables")
if true_formula:
logger.info(f"True formula: {true_formula}")
# Evaluate
results = evaluate_model_on_benchmark(model, tokenizer, X, y, x_cols, args.num_samples)
# Add metadata
results["metadata"] = {
"model_path": args.model_path,
"base_model": base_model_name,
"benchmark_csv": args.benchmark_csv,
"true_formula": true_formula,
"num_variables": len(x_cols),
"num_data_points": len(y)
}
# Save results
os.makedirs(os.path.dirname(args.output_file), exist_ok=True)
with open(args.output_file, 'w') as f:
json.dump(results, f, indent=2)
# Print summary
logger.info("\n" + "="*60)
logger.info("RESULTS SUMMARY")
logger.info("="*60)
logger.info(f"Valid expressions: {results['summary']['valid_count']}/{results['summary']['num_samples']} ({results['summary']['valid_rate']*100:.1f}%)")
logger.info(f"Expressions with R²: {results['summary']['num_with_r2']}")
if results['summary']['best_r2'] is not None:
logger.info(f"Best R²: {results['summary']['best_r2']:.6f}")
logger.info(f"Mean R²: {results['summary']['mean_r2']:.6f}")
logger.info(f"Median R²: {results['summary']['median_r2']:.6f}")
logger.info(f"Best expression: {results['summary']['best_expression']}")
else:
logger.info("No valid R² scores obtained")
logger.info(f"\nResults saved to: {args.output_file}")
if __name__ == "__main__":
main()
|