| |
| """ |
| Run PPO experiments on multiple test datasets and compare results. |
| |
| This script: |
| 1. Creates test datasets (if they don't exist) |
| 2. Runs PPO on each dataset |
| 3. Compares PPO vs baseline (no training) |
| 4. Generates a summary report |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import subprocess |
| import datetime |
| from pathlib import Path |
| import numpy as np |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
| sys.path.insert(0, str(PROJECT_ROOT / "classes")) |
|
|
| |
| TEST_DATASETS = { |
| |
| "add_x1_x2": { |
| "formula": "x_1 + x_2", |
| "n_vars": 2, |
| "difficulty": "easy", |
| }, |
| "mul_x1_x2": { |
| "formula": "x_1 * x_2", |
| "n_vars": 2, |
| "difficulty": "easy", |
| }, |
| "sub_x1_x2": { |
| "formula": "x_1 - x_2", |
| "n_vars": 2, |
| "difficulty": "easy", |
| }, |
| |
| "sin_x1": { |
| "formula": "sin(x_1)", |
| "n_vars": 1, |
| "difficulty": "medium", |
| }, |
| "cos_x1": { |
| "formula": "cos(x_1)", |
| "n_vars": 1, |
| "difficulty": "medium", |
| }, |
| "square_x1": { |
| "formula": "x_1 * x_1", |
| "n_vars": 1, |
| "difficulty": "medium", |
| }, |
| |
| "sin_x1_plus_x2": { |
| "formula": "sin(x_1) + x_2", |
| "n_vars": 2, |
| "difficulty": "hard", |
| }, |
| "x1_mul_sin_x2": { |
| "formula": "x_1 * sin(x_2)", |
| "n_vars": 2, |
| "difficulty": "hard", |
| }, |
| } |
|
|
|
|
| def create_datasets(): |
| """Create test datasets if they don't exist.""" |
| script_path = PROJECT_ROOT / "scripts" / "data" / "create_ppo_test_datasets.py" |
| data_dir = PROJECT_ROOT / "data" / "ppo_test" |
|
|
| |
| if data_dir.exists() and len(list(data_dir.glob("*.csv"))) >= len(TEST_DATASETS): |
| print("Test datasets already exist.") |
| return |
|
|
| print("Creating test datasets...") |
| subprocess.run([sys.executable, str(script_path)], check=True) |
|
|
|
|
| def run_baseline_evaluation(model_path: str, dataset_path: str, n_samples: int = 100): |
| """ |
| Evaluate baseline: generate expressions without PPO training. |
| Returns the best R² found among random samples. |
| """ |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| from peft import PeftModel |
| from expression import Expression |
| from dataset import RegressionDataset |
| import torch |
|
|
| print(f" Running baseline evaluation ({n_samples} samples)...") |
|
|
| |
| base_model = AutoModelForCausalLM.from_pretrained("gpt2") |
| tokenizer = AutoTokenizer.from_pretrained("gpt2") |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| try: |
| model = PeftModel.from_pretrained(base_model, model_path) |
| model = model.merge_and_unload() |
| except: |
| model = AutoModelForCausalLM.from_pretrained(model_path) |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model = model.to(device) |
| model.eval() |
|
|
| |
| dataset_path = Path(dataset_path) |
| reg = RegressionDataset(str(dataset_path.parent), dataset_path.name) |
| X, y = reg.get_numpy() |
| n_vars = X.shape[1] |
|
|
| |
| vars_list = [f"x_{i+1}" for i in range(n_vars)] |
| prompt = json.dumps({ |
| "vars": vars_list, |
| "ops": ["+", "-", "*", "sin", "cos"], |
| "cons": None, |
| "expr": "" |
| })[:-3] |
|
|
| inputs = tokenizer(prompt, return_tensors="pt").to(device) |
|
|
| |
| best_r2 = -np.inf |
| best_expr = None |
| valid_count = 0 |
| all_r2 = [] |
|
|
| for _ in range(n_samples): |
| with torch.no_grad(): |
| output = model.generate( |
| **inputs, |
| max_new_tokens=50, |
| do_sample=True, |
| top_k=50, |
| top_p=0.9, |
| temperature=0.7, |
| pad_token_id=tokenizer.pad_token_id, |
| ) |
|
|
| text = tokenizer.decode(output[0], skip_special_tokens=True) |
|
|
| |
| try: |
| if '"expr": "' in text: |
| expr_start = text.index('"expr": "') + len('"expr": "') |
| expr_end = text.index('"', expr_start) |
| expr_str = text[expr_start:expr_end].strip() |
| else: |
| expr_str = text.split('"expr"')[-1].strip(' ":}') |
| except: |
| continue |
|
|
| |
| if 'C' in expr_str or not expr_str: |
| continue |
|
|
| |
| try: |
| expr = Expression(expr_str, is_prefix=False) |
| if not expr.is_valid_on_dataset(X): |
| continue |
|
|
| valid_count += 1 |
| y_pred = expr.evaluate(X) |
|
|
| if not np.all(np.isfinite(y_pred)): |
| continue |
|
|
| ss_res = np.sum((y - y_pred) ** 2) |
| ss_tot = np.sum((y - np.mean(y)) ** 2) |
| r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 |
|
|
| all_r2.append(r2) |
|
|
| if r2 > best_r2: |
| best_r2 = r2 |
| best_expr = expr_str |
| except: |
| continue |
|
|
| return { |
| "best_r2": float(best_r2) if best_r2 > -np.inf else None, |
| "best_expr": best_expr, |
| "valid_rate": valid_count / n_samples, |
| "mean_r2": float(np.mean(all_r2)) if all_r2 else None, |
| "n_samples": n_samples, |
| } |
|
|
|
|
| def run_ppo_experiment(model_path: str, dataset_path: str, output_dir: str, |
| batch_size: int = 32, epochs: int = 5): |
| """Run PPO experiment on a single dataset.""" |
| from ppo_experiment import PPOSymbolicRegression |
|
|
| print(f" Running PPO experiment...") |
|
|
| experiment = PPOSymbolicRegression( |
| model_path=model_path, |
| dataset_path=dataset_path, |
| output_dir=output_dir, |
| batch_size=batch_size, |
| learning_rate=1e-5, |
| max_retries=5, |
| ) |
|
|
| results = experiment.run(n_epochs=epochs, early_stop_r2=0.95) |
|
|
| return { |
| "best_r2": results["best_r2"], |
| "best_expr": results["best_expression"], |
| "epochs_run": len(results["epochs"]), |
| "final_valid_rate": results["epochs"][-1]["valid_rate"] if results["epochs"] else 0, |
| } |
|
|
|
|
| def run_all_experiments(model_path: str, batch_size: int = 32, epochs: int = 5, |
| baseline_samples: int = 100, skip_baseline: bool = False): |
| """Run experiments on all test datasets.""" |
|
|
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| results_dir = PROJECT_ROOT / "output" / "ppo_experiments" / timestamp |
| results_dir.mkdir(parents=True, exist_ok=True) |
|
|
| all_results = { |
| "timestamp": timestamp, |
| "config": { |
| "model_path": model_path, |
| "batch_size": batch_size, |
| "epochs": epochs, |
| "baseline_samples": baseline_samples, |
| }, |
| "experiments": {}, |
| } |
|
|
| print("=" * 70) |
| print("PPO SYMBOLIC REGRESSION EXPERIMENTS") |
| print("=" * 70) |
| print(f"Model: {model_path}") |
| print(f"Output: {results_dir}") |
| print(f"Datasets: {len(TEST_DATASETS)}") |
| print("=" * 70) |
|
|
| for dataset_name, dataset_info in TEST_DATASETS.items(): |
| print(f"\n{'='*70}") |
| print(f"DATASET: {dataset_name}") |
| print(f"Ground truth: {dataset_info['formula']}") |
| print(f"Difficulty: {dataset_info['difficulty']}") |
| print(f"{'='*70}") |
|
|
| dataset_path = PROJECT_ROOT / "data" / "ppo_test" / f"{dataset_name}.csv" |
|
|
| if not dataset_path.exists(): |
| print(f" ERROR: Dataset not found: {dataset_path}") |
| continue |
|
|
| exp_output_dir = results_dir / dataset_name |
|
|
| |
| if not skip_baseline: |
| baseline_results = run_baseline_evaluation( |
| model_path, str(dataset_path), baseline_samples |
| ) |
| print(f" Baseline: R²={baseline_results['best_r2']:.4f}" if baseline_results['best_r2'] else " Baseline: No valid expressions") |
| else: |
| baseline_results = None |
|
|
| |
| ppo_results = run_ppo_experiment( |
| model_path, str(dataset_path), str(exp_output_dir), |
| batch_size, epochs |
| ) |
| print(f" PPO: R²={ppo_results['best_r2']:.4f}" if ppo_results['best_r2'] else " PPO: No valid expressions") |
|
|
| |
| all_results["experiments"][dataset_name] = { |
| "ground_truth": dataset_info["formula"], |
| "difficulty": dataset_info["difficulty"], |
| "baseline": baseline_results, |
| "ppo": ppo_results, |
| } |
|
|
| if baseline_results and baseline_results["best_r2"] and ppo_results["best_r2"]: |
| improvement = ppo_results["best_r2"] - baseline_results["best_r2"] |
| print(f" Improvement: {improvement:+.4f}") |
| all_results["experiments"][dataset_name]["improvement"] = improvement |
|
|
| |
| print("\n" + "=" * 70) |
| print("SUMMARY") |
| print("=" * 70) |
|
|
| summary_table = [] |
| for name, exp in all_results["experiments"].items(): |
| baseline_r2 = exp["baseline"]["best_r2"] if exp.get("baseline") and exp["baseline"].get("best_r2") else "N/A" |
| ppo_r2 = exp["ppo"]["best_r2"] if exp["ppo"]["best_r2"] else "N/A" |
| improvement = exp.get("improvement", "N/A") |
|
|
| if isinstance(baseline_r2, float): |
| baseline_r2 = f"{baseline_r2:.4f}" |
| if isinstance(ppo_r2, float): |
| ppo_r2 = f"{ppo_r2:.4f}" |
| if isinstance(improvement, float): |
| improvement = f"{improvement:+.4f}" |
|
|
| summary_table.append({ |
| "Dataset": name, |
| "Difficulty": exp["difficulty"], |
| "Ground Truth": exp["ground_truth"], |
| "Baseline R²": baseline_r2, |
| "PPO R²": ppo_r2, |
| "Improvement": improvement, |
| "PPO Expression": exp["ppo"].get("best_expr", "N/A"), |
| }) |
|
|
| |
| print(f"\n{'Dataset':<25} {'Diff':<8} {'Baseline':<10} {'PPO':<10} {'Improve':<10}") |
| print("-" * 70) |
| for row in summary_table: |
| print(f"{row['Dataset']:<25} {row['Difficulty']:<8} {row['Baseline R²']:<10} {row['PPO R²']:<10} {row['Improvement']:<10}") |
|
|
| |
| results_file = results_dir / "summary.json" |
| with open(results_file, 'w') as f: |
| json.dump(all_results, f, indent=2) |
| print(f"\nResults saved to: {results_file}") |
|
|
| return all_results |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Run PPO experiments on test datasets") |
| parser.add_argument("--model_path", type=str, default="./output/exp_a_json", |
| help="Path to trained JSON format model") |
| parser.add_argument("--batch_size", type=int, default=32, |
| help="Batch size for PPO") |
| parser.add_argument("--epochs", type=int, default=5, |
| help="Number of PPO epochs per dataset") |
| parser.add_argument("--baseline_samples", type=int, default=100, |
| help="Number of samples for baseline evaluation") |
| parser.add_argument("--skip_baseline", action="store_true", |
| help="Skip baseline evaluation") |
| parser.add_argument("--create_datasets_only", action="store_true", |
| help="Only create datasets, don't run experiments") |
|
|
| args = parser.parse_args() |
|
|
| |
| create_datasets() |
|
|
| if args.create_datasets_only: |
| print("Datasets created. Exiting.") |
| return |
|
|
| |
| run_all_experiments( |
| model_path=args.model_path, |
| batch_size=args.batch_size, |
| epochs=args.epochs, |
| baseline_samples=args.baseline_samples, |
| skip_baseline=args.skip_baseline, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|