|
|
|
|
|
""" |
|
|
Run PPO experiments on multiple test datasets and compare results. |
|
|
|
|
|
This script: |
|
|
1. Creates test datasets (if they don't exist) |
|
|
2. Runs PPO on each dataset |
|
|
3. Compares PPO vs baseline (no training) |
|
|
4. Generates a summary report |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import argparse |
|
|
import subprocess |
|
|
import datetime |
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
sys.path.insert(0, str(PROJECT_ROOT / "classes")) |
|
|
|
|
|
|
|
|
TEST_DATASETS = { |
|
|
|
|
|
"add_x1_x2": { |
|
|
"formula": "x_1 + x_2", |
|
|
"n_vars": 2, |
|
|
"difficulty": "easy", |
|
|
}, |
|
|
"mul_x1_x2": { |
|
|
"formula": "x_1 * x_2", |
|
|
"n_vars": 2, |
|
|
"difficulty": "easy", |
|
|
}, |
|
|
"sub_x1_x2": { |
|
|
"formula": "x_1 - x_2", |
|
|
"n_vars": 2, |
|
|
"difficulty": "easy", |
|
|
}, |
|
|
|
|
|
"sin_x1": { |
|
|
"formula": "sin(x_1)", |
|
|
"n_vars": 1, |
|
|
"difficulty": "medium", |
|
|
}, |
|
|
"cos_x1": { |
|
|
"formula": "cos(x_1)", |
|
|
"n_vars": 1, |
|
|
"difficulty": "medium", |
|
|
}, |
|
|
"square_x1": { |
|
|
"formula": "x_1 * x_1", |
|
|
"n_vars": 1, |
|
|
"difficulty": "medium", |
|
|
}, |
|
|
|
|
|
"sin_x1_plus_x2": { |
|
|
"formula": "sin(x_1) + x_2", |
|
|
"n_vars": 2, |
|
|
"difficulty": "hard", |
|
|
}, |
|
|
"x1_mul_sin_x2": { |
|
|
"formula": "x_1 * sin(x_2)", |
|
|
"n_vars": 2, |
|
|
"difficulty": "hard", |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
def create_datasets(): |
|
|
"""Create test datasets if they don't exist.""" |
|
|
script_path = PROJECT_ROOT / "scripts" / "data" / "create_ppo_test_datasets.py" |
|
|
data_dir = PROJECT_ROOT / "data" / "ppo_test" |
|
|
|
|
|
|
|
|
if data_dir.exists() and len(list(data_dir.glob("*.csv"))) >= len(TEST_DATASETS): |
|
|
print("Test datasets already exist.") |
|
|
return |
|
|
|
|
|
print("Creating test datasets...") |
|
|
subprocess.run([sys.executable, str(script_path)], check=True) |
|
|
|
|
|
|
|
|
def run_baseline_evaluation(model_path: str, dataset_path: str, n_samples: int = 100): |
|
|
""" |
|
|
Evaluate baseline: generate expressions without PPO training. |
|
|
Returns the best R² found among random samples. |
|
|
""" |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from peft import PeftModel |
|
|
from expression import Expression |
|
|
from dataset import RegressionDataset |
|
|
import torch |
|
|
|
|
|
print(f" Running baseline evaluation ({n_samples} samples)...") |
|
|
|
|
|
|
|
|
base_model = AutoModelForCausalLM.from_pretrained("gpt2") |
|
|
tokenizer = AutoTokenizer.from_pretrained("gpt2") |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
try: |
|
|
model = PeftModel.from_pretrained(base_model, model_path) |
|
|
model = model.merge_and_unload() |
|
|
except: |
|
|
model = AutoModelForCausalLM.from_pretrained(model_path) |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = model.to(device) |
|
|
model.eval() |
|
|
|
|
|
|
|
|
dataset_path = Path(dataset_path) |
|
|
reg = RegressionDataset(str(dataset_path.parent), dataset_path.name) |
|
|
X, y = reg.get_numpy() |
|
|
n_vars = X.shape[1] |
|
|
|
|
|
|
|
|
vars_list = [f"x_{i+1}" for i in range(n_vars)] |
|
|
prompt = json.dumps({ |
|
|
"vars": vars_list, |
|
|
"ops": ["+", "-", "*", "sin", "cos"], |
|
|
"cons": None, |
|
|
"expr": "" |
|
|
})[:-3] |
|
|
|
|
|
inputs = tokenizer(prompt, return_tensors="pt").to(device) |
|
|
|
|
|
|
|
|
best_r2 = -np.inf |
|
|
best_expr = None |
|
|
valid_count = 0 |
|
|
all_r2 = [] |
|
|
|
|
|
for _ in range(n_samples): |
|
|
with torch.no_grad(): |
|
|
output = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=50, |
|
|
do_sample=True, |
|
|
top_k=50, |
|
|
top_p=0.9, |
|
|
temperature=0.7, |
|
|
pad_token_id=tokenizer.pad_token_id, |
|
|
) |
|
|
|
|
|
text = tokenizer.decode(output[0], skip_special_tokens=True) |
|
|
|
|
|
|
|
|
try: |
|
|
if '"expr": "' in text: |
|
|
expr_start = text.index('"expr": "') + len('"expr": "') |
|
|
expr_end = text.index('"', expr_start) |
|
|
expr_str = text[expr_start:expr_end].strip() |
|
|
else: |
|
|
expr_str = text.split('"expr"')[-1].strip(' ":}') |
|
|
except: |
|
|
continue |
|
|
|
|
|
|
|
|
if 'C' in expr_str or not expr_str: |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
expr = Expression(expr_str, is_prefix=False) |
|
|
if not expr.is_valid_on_dataset(X): |
|
|
continue |
|
|
|
|
|
valid_count += 1 |
|
|
y_pred = expr.evaluate(X) |
|
|
|
|
|
if not np.all(np.isfinite(y_pred)): |
|
|
continue |
|
|
|
|
|
ss_res = np.sum((y - y_pred) ** 2) |
|
|
ss_tot = np.sum((y - np.mean(y)) ** 2) |
|
|
r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 |
|
|
|
|
|
all_r2.append(r2) |
|
|
|
|
|
if r2 > best_r2: |
|
|
best_r2 = r2 |
|
|
best_expr = expr_str |
|
|
except: |
|
|
continue |
|
|
|
|
|
return { |
|
|
"best_r2": float(best_r2) if best_r2 > -np.inf else None, |
|
|
"best_expr": best_expr, |
|
|
"valid_rate": valid_count / n_samples, |
|
|
"mean_r2": float(np.mean(all_r2)) if all_r2 else None, |
|
|
"n_samples": n_samples, |
|
|
} |
|
|
|
|
|
|
|
|
def run_ppo_experiment(model_path: str, dataset_path: str, output_dir: str, |
|
|
batch_size: int = 32, epochs: int = 5): |
|
|
"""Run PPO experiment on a single dataset.""" |
|
|
from ppo_experiment import PPOSymbolicRegression |
|
|
|
|
|
print(f" Running PPO experiment...") |
|
|
|
|
|
experiment = PPOSymbolicRegression( |
|
|
model_path=model_path, |
|
|
dataset_path=dataset_path, |
|
|
output_dir=output_dir, |
|
|
batch_size=batch_size, |
|
|
learning_rate=1e-5, |
|
|
max_retries=5, |
|
|
) |
|
|
|
|
|
results = experiment.run(n_epochs=epochs, early_stop_r2=0.95) |
|
|
|
|
|
return { |
|
|
"best_r2": results["best_r2"], |
|
|
"best_expr": results["best_expression"], |
|
|
"epochs_run": len(results["epochs"]), |
|
|
"final_valid_rate": results["epochs"][-1]["valid_rate"] if results["epochs"] else 0, |
|
|
} |
|
|
|
|
|
|
|
|
def run_all_experiments(model_path: str, batch_size: int = 32, epochs: int = 5, |
|
|
baseline_samples: int = 100, skip_baseline: bool = False): |
|
|
"""Run experiments on all test datasets.""" |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
results_dir = PROJECT_ROOT / "output" / "ppo_experiments" / timestamp |
|
|
results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
all_results = { |
|
|
"timestamp": timestamp, |
|
|
"config": { |
|
|
"model_path": model_path, |
|
|
"batch_size": batch_size, |
|
|
"epochs": epochs, |
|
|
"baseline_samples": baseline_samples, |
|
|
}, |
|
|
"experiments": {}, |
|
|
} |
|
|
|
|
|
print("=" * 70) |
|
|
print("PPO SYMBOLIC REGRESSION EXPERIMENTS") |
|
|
print("=" * 70) |
|
|
print(f"Model: {model_path}") |
|
|
print(f"Output: {results_dir}") |
|
|
print(f"Datasets: {len(TEST_DATASETS)}") |
|
|
print("=" * 70) |
|
|
|
|
|
for dataset_name, dataset_info in TEST_DATASETS.items(): |
|
|
print(f"\n{'='*70}") |
|
|
print(f"DATASET: {dataset_name}") |
|
|
print(f"Ground truth: {dataset_info['formula']}") |
|
|
print(f"Difficulty: {dataset_info['difficulty']}") |
|
|
print(f"{'='*70}") |
|
|
|
|
|
dataset_path = PROJECT_ROOT / "data" / "ppo_test" / f"{dataset_name}.csv" |
|
|
|
|
|
if not dataset_path.exists(): |
|
|
print(f" ERROR: Dataset not found: {dataset_path}") |
|
|
continue |
|
|
|
|
|
exp_output_dir = results_dir / dataset_name |
|
|
|
|
|
|
|
|
if not skip_baseline: |
|
|
baseline_results = run_baseline_evaluation( |
|
|
model_path, str(dataset_path), baseline_samples |
|
|
) |
|
|
print(f" Baseline: R²={baseline_results['best_r2']:.4f}" if baseline_results['best_r2'] else " Baseline: No valid expressions") |
|
|
else: |
|
|
baseline_results = None |
|
|
|
|
|
|
|
|
ppo_results = run_ppo_experiment( |
|
|
model_path, str(dataset_path), str(exp_output_dir), |
|
|
batch_size, epochs |
|
|
) |
|
|
print(f" PPO: R²={ppo_results['best_r2']:.4f}" if ppo_results['best_r2'] else " PPO: No valid expressions") |
|
|
|
|
|
|
|
|
all_results["experiments"][dataset_name] = { |
|
|
"ground_truth": dataset_info["formula"], |
|
|
"difficulty": dataset_info["difficulty"], |
|
|
"baseline": baseline_results, |
|
|
"ppo": ppo_results, |
|
|
} |
|
|
|
|
|
if baseline_results and baseline_results["best_r2"] and ppo_results["best_r2"]: |
|
|
improvement = ppo_results["best_r2"] - baseline_results["best_r2"] |
|
|
print(f" Improvement: {improvement:+.4f}") |
|
|
all_results["experiments"][dataset_name]["improvement"] = improvement |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("SUMMARY") |
|
|
print("=" * 70) |
|
|
|
|
|
summary_table = [] |
|
|
for name, exp in all_results["experiments"].items(): |
|
|
baseline_r2 = exp["baseline"]["best_r2"] if exp.get("baseline") and exp["baseline"].get("best_r2") else "N/A" |
|
|
ppo_r2 = exp["ppo"]["best_r2"] if exp["ppo"]["best_r2"] else "N/A" |
|
|
improvement = exp.get("improvement", "N/A") |
|
|
|
|
|
if isinstance(baseline_r2, float): |
|
|
baseline_r2 = f"{baseline_r2:.4f}" |
|
|
if isinstance(ppo_r2, float): |
|
|
ppo_r2 = f"{ppo_r2:.4f}" |
|
|
if isinstance(improvement, float): |
|
|
improvement = f"{improvement:+.4f}" |
|
|
|
|
|
summary_table.append({ |
|
|
"Dataset": name, |
|
|
"Difficulty": exp["difficulty"], |
|
|
"Ground Truth": exp["ground_truth"], |
|
|
"Baseline R²": baseline_r2, |
|
|
"PPO R²": ppo_r2, |
|
|
"Improvement": improvement, |
|
|
"PPO Expression": exp["ppo"].get("best_expr", "N/A"), |
|
|
}) |
|
|
|
|
|
|
|
|
print(f"\n{'Dataset':<25} {'Diff':<8} {'Baseline':<10} {'PPO':<10} {'Improve':<10}") |
|
|
print("-" * 70) |
|
|
for row in summary_table: |
|
|
print(f"{row['Dataset']:<25} {row['Difficulty']:<8} {row['Baseline R²']:<10} {row['PPO R²']:<10} {row['Improvement']:<10}") |
|
|
|
|
|
|
|
|
results_file = results_dir / "summary.json" |
|
|
with open(results_file, 'w') as f: |
|
|
json.dump(all_results, f, indent=2) |
|
|
print(f"\nResults saved to: {results_file}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Run PPO experiments on test datasets") |
|
|
parser.add_argument("--model_path", type=str, default="./output/exp_a_json", |
|
|
help="Path to trained JSON format model") |
|
|
parser.add_argument("--batch_size", type=int, default=32, |
|
|
help="Batch size for PPO") |
|
|
parser.add_argument("--epochs", type=int, default=5, |
|
|
help="Number of PPO epochs per dataset") |
|
|
parser.add_argument("--baseline_samples", type=int, default=100, |
|
|
help="Number of samples for baseline evaluation") |
|
|
parser.add_argument("--skip_baseline", action="store_true", |
|
|
help="Skip baseline evaluation") |
|
|
parser.add_argument("--create_datasets_only", action="store_true", |
|
|
help="Only create datasets, don't run experiments") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
create_datasets() |
|
|
|
|
|
if args.create_datasets_only: |
|
|
print("Datasets created. Exiting.") |
|
|
return |
|
|
|
|
|
|
|
|
run_all_experiments( |
|
|
model_path=args.model_path, |
|
|
batch_size=args.batch_size, |
|
|
epochs=args.epochs, |
|
|
baseline_samples=args.baseline_samples, |
|
|
skip_baseline=args.skip_baseline, |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|