|
|
|
|
|
""" |
|
|
PPO Evaluation Script for Seriguela Block 3 |
|
|
Tests if PPO finetuning can find symbolic regression expressions |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import numpy as np |
|
|
import torch |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, StoppingCriteria, StoppingCriteriaList |
|
|
from classes.expression import Expression |
|
|
|
|
|
|
|
|
class ExpressionStoppingCriteria(StoppingCriteria): |
|
|
"""Stop generation at natural expression boundaries.""" |
|
|
def __init__(self, tokenizer, stop_sequences): |
|
|
self.tokenizer = tokenizer |
|
|
self.stop_ids = [tokenizer.encode(seq, add_special_tokens=False) |
|
|
for seq in stop_sequences] |
|
|
|
|
|
def __call__(self, input_ids, scores, **kwargs): |
|
|
|
|
|
for stop_ids in self.stop_ids: |
|
|
if len(stop_ids) > 0 and len(input_ids[0]) >= len(stop_ids): |
|
|
if input_ids[0][-len(stop_ids):].tolist() == stop_ids: |
|
|
return True |
|
|
return False |
|
|
|
|
|
class PPOEvaluator: |
|
|
"""Evaluates if PPO training works for symbolic regression""" |
|
|
|
|
|
def __init__(self, model_name: str, output_dir: str): |
|
|
self.model_name = model_name |
|
|
self.output_dir = Path(output_dir) |
|
|
self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
print(f"Loading model: {model_name}") |
|
|
|
|
|
|
|
|
print("Loading base GPT-2 model...") |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
"gpt2", |
|
|
torch_dtype=torch.float16, |
|
|
device_map="auto" |
|
|
) |
|
|
|
|
|
|
|
|
print("Configuring tokenizer with special tokens...") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained("gpt2") |
|
|
self.tokenizer.add_special_tokens({ |
|
|
"additional_special_tokens": ["<|startofex|>", "<|endofex|>"] |
|
|
}) |
|
|
|
|
|
|
|
|
print(f"Resizing embeddings from {self.model.get_input_embeddings().weight.shape[0]} to {len(self.tokenizer)}...") |
|
|
self.model.resize_token_embeddings(len(self.tokenizer)) |
|
|
|
|
|
|
|
|
print(f"Loading V2 adapter from {model_name}...") |
|
|
try: |
|
|
from peft import PeftModel |
|
|
self.model = PeftModel.from_pretrained(self.model, model_name) |
|
|
print("V2 adapter loaded successfully (LoRA weights)") |
|
|
print("Merging adapter into base model...") |
|
|
self.model = self.model.merge_and_unload() |
|
|
print("Adapter merged successfully") |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not load as PEFT model: {e}") |
|
|
print("Attempting to load as full model...") |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.float16, |
|
|
device_map="auto" |
|
|
) |
|
|
|
|
|
self.model.eval() |
|
|
|
|
|
|
|
|
self.generation_config = { |
|
|
"temperature": 0.7, |
|
|
"top_k": 0, |
|
|
"top_p": 0.8, |
|
|
"repetition_penalty": 1.0, |
|
|
"max_new_tokens": 128, |
|
|
"do_sample": True, |
|
|
"pad_token_id": self.tokenizer.eos_token_id, |
|
|
} |
|
|
|
|
|
print(f"Model loaded. Using optimal V2 configuration.") |
|
|
|
|
|
def create_synthetic_dataset(self, formula: str, n_samples: int = 100) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Create synthetic dataset from a known formula""" |
|
|
print(f"Creating dataset for formula: {formula}") |
|
|
|
|
|
|
|
|
X = np.random.uniform(-2, 2, (n_samples, 2)) |
|
|
|
|
|
|
|
|
try: |
|
|
expr = Expression(formula, is_prefix=False) |
|
|
y = expr.evaluate(X) |
|
|
return X, y |
|
|
except Exception as e: |
|
|
print(f"Error creating dataset: {e}") |
|
|
raise |
|
|
|
|
|
def test_baseline_generation(self, n_samples: int = 10) -> Dict: |
|
|
"""Test baseline: V2 generates valid expressions but not fitted to data""" |
|
|
print("\n" + "="*60) |
|
|
print("BASELINE TEST: V2 Generation Without PPO") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
X, y = self.create_synthetic_dataset("x_1 * x_2", n_samples=50) |
|
|
|
|
|
results = { |
|
|
"test": "baseline_generation", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"generations": [], |
|
|
"summary": {} |
|
|
} |
|
|
|
|
|
prompt = """vars: x_1, x_2 |
|
|
oper: *, +, -, sin, cos |
|
|
cons: C |
|
|
expr:""" |
|
|
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
|
|
|
|
|
|
|
|
stopping_criteria = StoppingCriteriaList([ |
|
|
ExpressionStoppingCriteria(self.tokenizer, ["<|endofex|>", "\n\nvars:"]) |
|
|
]) |
|
|
|
|
|
valid_count = 0 |
|
|
r2_scores = [] |
|
|
|
|
|
print(f"\nGenerating {n_samples} expressions...") |
|
|
for i in range(n_samples): |
|
|
output = self.model.generate( |
|
|
**inputs, |
|
|
**self.generation_config, |
|
|
stopping_criteria=stopping_criteria |
|
|
) |
|
|
text = self.tokenizer.decode(output[0], skip_special_tokens=False) |
|
|
|
|
|
|
|
|
if "expr:" in text: |
|
|
expr_str = text.split("expr:")[-1].strip() |
|
|
expr_str = expr_str.split("<|endofex|>")[0].strip() |
|
|
else: |
|
|
expr_str = text |
|
|
|
|
|
|
|
|
if i < 3: |
|
|
print(f"\n DEBUG Sample {i+1}:") |
|
|
print(f" Raw output: {text[:200]}") |
|
|
print(f" Extracted: {expr_str[:100]}") |
|
|
|
|
|
|
|
|
is_valid = False |
|
|
r2 = -1.0 |
|
|
|
|
|
try: |
|
|
expr = Expression(expr_str, is_prefix=False) |
|
|
|
|
|
if expr.is_valid_on_dataset(X): |
|
|
is_valid = True |
|
|
valid_count += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
r2 = expr.fit_constants(X, y) |
|
|
if np.isfinite(r2): |
|
|
r2_scores.append(r2) |
|
|
else: |
|
|
r2 = -1.0 |
|
|
except: |
|
|
r2 = -1.0 |
|
|
except: |
|
|
pass |
|
|
|
|
|
results["generations"].append({ |
|
|
"index": i + 1, |
|
|
"expression": expr_str, |
|
|
"valid": is_valid, |
|
|
"r2_score": float(r2) if r2 != -1.0 else None |
|
|
}) |
|
|
|
|
|
if (i + 1) % 5 == 0: |
|
|
print(f"Generated {i + 1}/{n_samples} - Valid: {valid_count}, Avg R²: {np.mean(r2_scores) if r2_scores else 'N/A'}") |
|
|
|
|
|
|
|
|
results["summary"] = { |
|
|
"total_generations": n_samples, |
|
|
"valid_count": valid_count, |
|
|
"valid_rate": valid_count / n_samples, |
|
|
"r2_scores": r2_scores, |
|
|
"mean_r2": float(np.mean(r2_scores)) if r2_scores else None, |
|
|
"max_r2": float(np.max(r2_scores)) if r2_scores else None, |
|
|
"conclusion": "Baseline generates valid expressions but R² is low (not fitted to target)" |
|
|
} |
|
|
|
|
|
print("\n" + "-"*60) |
|
|
print(f"BASELINE RESULTS:") |
|
|
print(f" Valid Rate: {results['summary']['valid_rate']:.1%} ({valid_count}/{n_samples})") |
|
|
print(f" Mean R²: {results['summary']['mean_r2']:.4f}" if r2_scores else " Mean R²: N/A") |
|
|
print(f" Max R²: {results['summary']['max_r2']:.4f}" if r2_scores else " Max R²: N/A") |
|
|
print(f" Interpretation: V2 generates valid expressions (good!), but doesn't fit target data (expected without PPO)") |
|
|
print("-"*60) |
|
|
|
|
|
|
|
|
output_file = self.output_dir / "baseline_results.json" |
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"\nResults saved to: {output_file}") |
|
|
|
|
|
return results |
|
|
|
|
|
def test_ppo_simulation(self, target_formula: str = "x_1 * x_2", n_iterations: int = 10) -> Dict: |
|
|
"""Simulate PPO: Generate expressions and check if best reward improves""" |
|
|
print("\n" + "="*60) |
|
|
print("PPO SIMULATION TEST: Check if Reward Can Improve") |
|
|
print("="*60) |
|
|
print(f"Target formula: {target_formula}") |
|
|
print("Note: This simulates PPO by generating multiple expressions") |
|
|
print(" and tracking best R² score. Real PPO would optimize") |
|
|
print(" the model to generate better expressions over time.") |
|
|
|
|
|
|
|
|
X, y = self.create_synthetic_dataset(target_formula, n_samples=100) |
|
|
|
|
|
prompt = """vars: x_1, x_2 |
|
|
oper: *, +, -, sin, cos |
|
|
cons: C |
|
|
expr:""" |
|
|
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
|
|
|
|
|
|
|
|
stopping_criteria = StoppingCriteriaList([ |
|
|
ExpressionStoppingCriteria(self.tokenizer, ["<|endofex|>", "\n\nvars:"]) |
|
|
]) |
|
|
|
|
|
results = { |
|
|
"test": "ppo_simulation", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"target_formula": target_formula, |
|
|
"iterations": [], |
|
|
"summary": {} |
|
|
} |
|
|
|
|
|
print(f"\nGenerating {n_iterations} expressions and tracking best R²...") |
|
|
|
|
|
best_r2 = -np.inf |
|
|
best_expr = None |
|
|
r2_history = [] |
|
|
valid_count = 0 |
|
|
|
|
|
for i in range(n_iterations): |
|
|
output = self.model.generate( |
|
|
**inputs, |
|
|
**self.generation_config, |
|
|
stopping_criteria=stopping_criteria |
|
|
) |
|
|
text = self.tokenizer.decode(output[0], skip_special_tokens=False) |
|
|
|
|
|
|
|
|
if "expr:" in text: |
|
|
expr_str = text.split("expr:")[-1].strip() |
|
|
expr_str = expr_str.split("<|endofex|>")[0].strip() |
|
|
else: |
|
|
expr_str = text |
|
|
|
|
|
|
|
|
is_valid = False |
|
|
r2 = -1.0 |
|
|
|
|
|
try: |
|
|
expr = Expression(expr_str, is_prefix=False) |
|
|
if expr.is_valid_on_dataset(X): |
|
|
is_valid = True |
|
|
valid_count += 1 |
|
|
r2 = expr.fit_constants(X, y) |
|
|
|
|
|
if np.isfinite(r2): |
|
|
r2_history.append(r2) |
|
|
if r2 > best_r2: |
|
|
best_r2 = r2 |
|
|
best_expr = expr_str |
|
|
else: |
|
|
r2 = -1.0 |
|
|
except: |
|
|
pass |
|
|
|
|
|
results["iterations"].append({ |
|
|
"iteration": i + 1, |
|
|
"expression": expr_str, |
|
|
"valid": is_valid, |
|
|
"r2": float(r2) if np.isfinite(r2) else None, |
|
|
"is_best": (r2 == best_r2) if np.isfinite(r2) else False |
|
|
}) |
|
|
|
|
|
if (i + 1) % 5 == 0: |
|
|
print(f"Iteration {i + 1}/{n_iterations} - Valid: {valid_count}, Best R²: {best_r2:.4f}") |
|
|
|
|
|
|
|
|
results["summary"] = { |
|
|
"total_iterations": n_iterations, |
|
|
"valid_count": valid_count, |
|
|
"valid_rate": valid_count / n_iterations, |
|
|
"best_r2": float(best_r2) if np.isfinite(best_r2) else None, |
|
|
"best_expression": best_expr, |
|
|
"r2_history": [float(r) for r in r2_history], |
|
|
"mean_r2": float(np.mean(r2_history)) if r2_history else None, |
|
|
"conclusion": self._analyze_ppo_simulation(best_r2, r2_history) |
|
|
} |
|
|
|
|
|
print("\n" + "-"*60) |
|
|
print("PPO SIMULATION RESULTS:") |
|
|
print(f" Valid expressions: {valid_count}/{n_iterations}") |
|
|
print(f" Best R²: {best_r2:.4f}" if np.isfinite(best_r2) else " Best R²: N/A") |
|
|
print(f" Mean R²: {results['summary']['mean_r2']:.4f}" if r2_history else " Mean R²: N/A") |
|
|
print(f" Best expression: {best_expr}") |
|
|
print(f"\n Interpretation:") |
|
|
print(f" - Baseline (Test 1) shows random expressions have low R² (~0.2)") |
|
|
print(f" - PPO should improve this by learning to generate fitted expressions") |
|
|
print(f" - Best R² of {best_r2:.4f} shows what's possible with current model") |
|
|
if best_r2 >= 0.9: |
|
|
print(f" ✅ Model CAN find high-quality solutions (R² >= 0.9)") |
|
|
elif best_r2 >= 0.5: |
|
|
print(f" ⚠️ Model can find partial solutions (R² >= 0.5)") |
|
|
else: |
|
|
print(f" ❌ Model struggles to find good solutions (R² < 0.5)") |
|
|
print("-"*60) |
|
|
|
|
|
|
|
|
output_file = self.output_dir / "ppo_simulation_results.json" |
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"\nResults saved to: {output_file}") |
|
|
|
|
|
return results |
|
|
|
|
|
def _analyze_ppo_simulation(self, best_r2: float, r2_history: List[float]) -> str: |
|
|
"""Analyze PPO simulation results""" |
|
|
if not r2_history: |
|
|
return "❌ No valid expressions generated" |
|
|
|
|
|
if best_r2 >= 0.9: |
|
|
return f"✅ EXCELLENT: Found high-quality solution (R² = {best_r2:.4f}). PPO training should work well." |
|
|
elif best_r2 >= 0.5: |
|
|
return f"⚠️ MODERATE: Found partial solution (R² = {best_r2:.4f}). PPO may help but needs tuning." |
|
|
else: |
|
|
return f"❌ POOR: Best solution is weak (R² = {best_r2:.4f}). PPO will struggle with current model." |
|
|
|
|
|
def _analyze_ppo_results(self, training_results: Dict) -> str: |
|
|
"""Analyze PPO training results and provide conclusion""" |
|
|
if "epoch_rewards" not in training_results: |
|
|
return "Unable to analyze: No reward history found" |
|
|
|
|
|
rewards = training_results["epoch_rewards"] |
|
|
initial = rewards[0] |
|
|
final = rewards[-1] |
|
|
best = max(rewards) |
|
|
improvement = final - initial |
|
|
|
|
|
if best >= 0.9: |
|
|
return f"✅ EXCELLENT: Found high-quality solution (R² = {best:.4f})" |
|
|
elif improvement > 0.2: |
|
|
return f"✅ GOOD: Significant improvement ({improvement:+.4f}), PPO is working" |
|
|
elif improvement > 0.05: |
|
|
return f"⚠️ MODERATE: Some improvement ({improvement:+.4f}), may need more epochs" |
|
|
elif improvement > 0: |
|
|
return f"⚠️ WEAK: Minimal improvement ({improvement:+.4f}), check hyperparameters" |
|
|
else: |
|
|
return f"❌ POOR: No improvement or decline ({improvement:+.4f}), PPO not working properly" |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("="*60) |
|
|
print("SERIGUELA BLOCK 3: PPO EVALUATION") |
|
|
print("="*60) |
|
|
print("Objective: Test if PPO finetuning works for symbolic regression") |
|
|
print("Model: V2 (augustocsc/Se124M_700K_infix_v2)") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
evaluator = PPOEvaluator( |
|
|
model_name="augustocsc/Se124M_700K_infix_v2", |
|
|
output_dir="./logs/ppo_evaluation" |
|
|
) |
|
|
|
|
|
|
|
|
print("\n📊 TEST 1: Baseline Generation (V2 without PPO)") |
|
|
baseline_results = evaluator.test_baseline_generation(n_samples=30) |
|
|
|
|
|
|
|
|
print("\n🎯 TEST 2: PPO Simulation (Check if reward CAN improve)") |
|
|
ppo_results = evaluator.test_ppo_simulation(target_formula="x_1 * x_2", n_iterations=50) |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("EVALUATION COMPLETE") |
|
|
print("="*60) |
|
|
print("\nResults saved to: ./logs/ppo_evaluation/") |
|
|
print("\nKey Questions Answered:") |
|
|
print("1. Does V2 generate valid expressions? Check baseline_results.json") |
|
|
print(f" Answer: {baseline_results['summary']['valid_rate']:.1%} valid rate") |
|
|
print("2. Can model find high R² expressions? Check ppo_simulation_results.json") |
|
|
best_r2 = ppo_results['summary'].get('best_r2') |
|
|
if best_r2 is None: |
|
|
best_r2 = -1.0 |
|
|
if best_r2 >= 0.9: |
|
|
print(f" Answer: YES! Best R² = {best_r2:.4f} (excellent)") |
|
|
elif best_r2 >= 0.5: |
|
|
print(f" Answer: PARTIAL. Best R² = {best_r2:.4f} (moderate)") |
|
|
else: |
|
|
print(f" Answer: NO. Best R² = {best_r2:.4f} (poor)") |
|
|
print("3. Would PPO training work?") |
|
|
if best_r2 >= 0.9: |
|
|
print(" Answer: YES - Model can find solutions, PPO should learn to find them consistently") |
|
|
elif best_r2 >= 0.5: |
|
|
print(" Answer: MAYBE - Model finds partial solutions, PPO may need tuning") |
|
|
else: |
|
|
print(" Answer: UNLIKELY - Model struggles to find solutions even randomly") |
|
|
print("\nNext steps:") |
|
|
print("- Review results to understand baseline performance") |
|
|
print("- If simulation shows high R², PPO training is worth trying") |
|
|
print("- If simulation shows low R², may need to retrain base model") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|