File size: 6,580 Bytes
a1190da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
#!/usr/bin/env python3
"""
Simple quality evaluation without requiring specific dataset.
Generates expressions with random prompts and measures validity.
"""
import argparse
import json
import logging
import os
import sys
import random
from pathlib import Path
from tqdm import tqdm
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
sys.path.insert(0, str(Path(__file__).parent.parent))
from classes.expression import Expression
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Common variables and operators for symbolic regression
COMMON_VARS = ['x_1', 'x_2', 'x_3', 'x_4', 'x_5']
COMMON_OPS = ['+', '-', '*', '/', 'sin', 'cos', 'exp', 'log', 'sqrt', 'abs', 'tan']
def load_model_auto(model_path: str):
"""Load model with automatic base model detection"""
adapter_config_path = os.path.join(model_path, "adapter_config.json")
if not os.path.exists(adapter_config_path):
raise FileNotFoundError(f"No adapter_config.json in {model_path}")
with open(adapter_config_path) as f:
adapter_config = json.load(f)
base_model_name = adapter_config.get("base_model_name_or_path", "gpt2")
logger.info(f"Loading base model: {base_model_name}")
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None
)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"Loading LoRA adapter from {model_path}")
model = PeftModel.from_pretrained(model, model_path)
model = model.merge_and_unload()
model.eval()
return model, tokenizer, base_model_name
def create_random_prompt():
"""Create a random JSON prompt for expression generation"""
num_vars = random.randint(1, 3)
num_ops = random.randint(3, 7)
vars_list = random.sample(COMMON_VARS, num_vars)
ops_list = random.sample(COMMON_OPS, num_ops)
prompt = {
"vars": vars_list,
"ops": ops_list,
"cons": "C",
"expr": ""
}
prompt_str = json.dumps(prompt, ensure_ascii=False)
prompt_str = prompt_str.rsplit('"expr":', 1)[0] + '"expr": "'
return prompt_str, vars_list, ops_list
def extract_expression_json(output: str):
"""Extract expression from JSON output"""
import re
match = re.search(r'"expr":\s*"([^"]*)"', output)
if match:
return match.group(1)
match = re.search(r'"expr":\s*"([^"]+)', output)
if match:
expr = match.group(1)
expr = expr.split('"')[0].split('}')[0].strip()
return expr
return None
def evaluate_model(model, tokenizer, num_samples=500):
"""Evaluate model on random prompts"""
device = model.device
results = []
valid_count = 0
parseable_count = 0
unique_expressions = set()
random.seed(42)
logger.info(f"Evaluating on {num_samples} random prompts...")
for i in tqdm(range(num_samples), desc="Generating"):
prompt, vars_list, ops_list = create_random_prompt()
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=100,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
expr_str = extract_expression_json(generated)
is_valid = False
is_parseable = False
error_msg = None
if expr_str:
try:
expr = Expression(expr_str, is_prefix=False)
is_parseable = True
# Expression doesn't have validate() method, check if it was created successfully
is_valid = is_parseable and expr.sympy_expression is not None
if is_valid:
unique_expressions.add(expr_str)
except Exception as e:
error_msg = str(e)[:100]
else:
error_msg = "Failed to extract expression"
if is_valid:
valid_count += 1
if is_parseable:
parseable_count += 1
results.append({
"sample_idx": i,
"prompt": prompt[:200],
"generated": generated[:500],
"expression": expr_str,
"valid": is_valid,
"parseable": is_parseable,
"error": error_msg
})
total = len(results)
metrics = {
"num_samples": total,
"valid_rate": valid_count / total if total > 0 else 0,
"parseable_rate": parseable_count / total if total > 0 else 0,
"unique_expressions": len(unique_expressions),
"diversity_rate": len(unique_expressions) / total if total > 0 else 0,
}
return metrics, results
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model_path", type=str, required=True)
parser.add_argument("--num_samples", type=int, default=500)
parser.add_argument("--output_dir", type=str, default="./results")
args = parser.parse_args()
model, tokenizer, base_model_name = load_model_auto(args.model_path)
metrics, results = evaluate_model(model, tokenizer, args.num_samples)
print("\n" + "="*60)
print(f"EVALUATION RESULTS - {os.path.basename(args.model_path)}")
print("="*60)
print(f"Base model: {base_model_name}")
print(f"Valid rate: {metrics['valid_rate']*100:.1f}%")
print(f"Parseable rate: {metrics['parseable_rate']*100:.1f}%")
print(f"Unique expressions: {metrics['unique_expressions']}")
print(f"Diversity rate: {metrics['diversity_rate']*100:.1f}%")
print("="*60)
os.makedirs(args.output_dir, exist_ok=True)
model_name = os.path.basename(args.model_path)
metrics_path = os.path.join(args.output_dir, f"{model_name}_metrics.json")
with open(metrics_path, 'w') as f:
json.dump(metrics, f, indent=2)
results_path = os.path.join(args.output_dir, f"{model_name}_results.json")
with open(results_path, 'w') as f:
json.dump(results, f, indent=2)
logger.info(f"Results saved to {args.output_dir}")
if __name__ == "__main__":
main()
|