Mayank022's picture
Upload folder using huggingface_hub
a4f74f3 verified
#!/usr/bin/env python3
"""
GRPO Training Script for the API Testing Environment.
Trains a small LLM (Qwen 1.7B) to become an intelligent API tester
using Group Relative Policy Optimization (GRPO).
The environment IS the dataset — each reset(seed=N) creates a unique
episode with different users, tasks, and data. No external dataset needed.
Features:
- Auto-push trained model weights to HuggingFace Hub
- Weights & Biases logging for metrics, loss, rewards
- Baseline agent evaluation before GRPO (random, sequential, smart)
- Base model evaluation before GRPO for comparison
- Post-training evaluation with delta reporting
- Saves metrics, comparison tables, and plots to output dir
Usage:
# Quick test (CPU, 2 minutes)
python -m training.grpo --test-mode
# Real training (GPU required)
python -m training.grpo --model-id Qwen/Qwen3-1.7B --num-episodes 100
# With HF Hub push
python -m training.grpo --push-to-hub --hf-repo-id your-username/api-tester-grpo
# With Weights & Biases
python -m training.grpo --use-wandb --wandb-project api-testing-grpo
# See what prompts look like (no GPU needed)
SHOW_PROMPTS=1 python -m training.grpo
# Resume from checkpoint
python -m training.grpo --model-id ./checkpoints/step_50
"""
import argparse
import json
import logging
import os
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# --- Suppress noisy HTTP/download logs ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
for _noisy in ["httpx", "httpcore", "urllib3", "huggingface_hub", "filelock",
"transformers.configuration_utils", "transformers.modeling_utils"]:
logging.getLogger(_noisy).setLevel(logging.WARNING)
# --- MONKEY PATCH FOR LLM-BLENDER ---
# llm-blender requires TRANSFORMERS_CACHE which was removed in transformers 4.42+
try:
import transformers.utils.hub
if not hasattr(transformers.utils.hub, "TRANSFORMERS_CACHE"):
transformers.utils.hub.TRANSFORMERS_CACHE = os.getenv("HF_HOME", os.path.expanduser("~/.cache/huggingface/hub"))
except ImportError:
pass
# ------------------------------------
from server.environment import APITestEnvironment
from .prompts import PLAN_SYSTEM_PROMPT, format_plan_prompt
from .rewards import format_reward_fn, plan_reward_fn, diversity_reward_fn
from .evaluate import run_rollout, run_baseline_local
def build_training_prompts(
num_episodes: int = 50,
task_ids: list[str] | None = None,
) -> list[dict]:
"""Generate training prompts for GRPO plan-based training.
Each prompt asks the model to output a COMPLETE TEST PLAN (JSON array of actions).
The reward function will execute the plan on a fresh environment and score it.
"""
if task_ids is None:
task_ids = ["basic_validation", "edge_cases", "security_workflows"]
prompts = []
env = APITestEnvironment()
for i in range(num_episodes):
task_id = task_ids[i % len(task_ids)]
seed = i * 1000 + 42
obs = env.reset(seed=seed, task_id=task_id)
user_message = format_plan_prompt(obs)
prompt_messages = [
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": user_message},
]
prompts.append({
"prompt": prompt_messages,
"task_id": task_id,
"seed": seed,
})
logger.info(f"Generated {len(prompts)} training prompts across tasks: {task_ids}")
return prompts
def run_baseline_evaluation(seed: int = 9999) -> dict:
"""Run all baseline agents and return results for comparison.
Returns:
dict with structure: {agent_name: {task_id: result_dict}}
"""
logger.info("=" * 60)
logger.info("Running BASELINE AGENT evaluation...")
logger.info("=" * 60)
results = run_baseline_local(agent_name="all", task_id="all", seed=seed)
# Organize by agent -> task
organized = {}
for r in results:
agent = r["agent"]
if agent not in organized:
organized[agent] = {}
organized[agent][r["task_id"]] = r
# Print summary table
print("\n" + "=" * 90)
print("BASELINE AGENT RESULTS")
print("=" * 90)
print(f"{'Agent':<15} {'Task':<25} {'Reward':<10} {'Bugs':<12} {'Coverage':<10}")
print("-" * 90)
for agent_name in ["random", "sequential", "smart"]:
if agent_name not in organized:
continue
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = organized[agent_name].get(task_id, {})
print(
f"{agent_name:<15} {task_id:<25} "
f"{r.get('total_reward', 0):<10.4f} "
f"{r.get('bugs_found', 0)}/{r.get('total_bugs', 0):<10} "
f"{r.get('coverage_pct', 0):<10.1f}%"
)
print("-" * 90)
print("=" * 90 + "\n")
return organized
def save_metrics(
output_dir: str,
baseline_results: dict,
base_model_results: dict,
trained_model_results: dict,
training_args: dict,
training_time_s: float,
):
"""Save all metrics and comparison data to output_dir/metrics/."""
metrics_dir = os.path.join(output_dir, "metrics")
os.makedirs(metrics_dir, exist_ok=True)
# Full results JSON
all_results = {
"training_args": training_args,
"training_time_seconds": round(training_time_s, 1),
"baseline_agents": {},
"base_model": base_model_results,
"trained_model": trained_model_results,
}
# Flatten baseline results
for agent_name, tasks in baseline_results.items():
all_results["baseline_agents"][agent_name] = {}
for task_id, r in tasks.items():
all_results["baseline_agents"][agent_name][task_id] = {
"total_reward": r.get("total_reward", 0),
"bugs_found": r.get("bugs_found", 0),
"total_bugs": r.get("total_bugs", 0),
"coverage_pct": r.get("coverage_pct", 0),
}
with open(os.path.join(metrics_dir, "results.json"), "w") as f:
json.dump(all_results, f, indent=2)
# Comparison table as markdown
md_lines = ["# Training Results\n"]
md_lines.append(f"**Model**: {training_args.get('model_id', 'unknown')}")
md_lines.append(f"**Training time**: {training_time_s / 60:.1f} minutes")
md_lines.append(f"**Episodes**: {training_args.get('num_episodes', 0)}")
md_lines.append(f"**Max steps**: {training_args.get('max_steps', 0)}\n")
md_lines.append("## Comparison Table\n")
md_lines.append("| Agent/Model | Task | Reward | Bugs | Coverage |")
md_lines.append("|---|---|---|---|---|")
# Baselines
for agent_name in ["random", "sequential", "smart"]:
if agent_name not in baseline_results:
continue
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = baseline_results[agent_name].get(task_id, {})
md_lines.append(
f"| {agent_name} | {task_id} | "
f"{r.get('total_reward', 0):.4f} | "
f"{r.get('bugs_found', 0)}/{r.get('total_bugs', 0)} | "
f"{r.get('coverage_pct', 0):.1f}% |"
)
# Base model
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = base_model_results.get(task_id, {})
md_lines.append(
f"| **base model** | {task_id} | "
f"{r.get('total_reward', 0):.4f} | "
f"{r.get('bugs_found', 0)}/{r.get('total_bugs', 0)} | "
f"{r.get('coverage_pct', 0):.1f}% |"
)
# Trained model
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = trained_model_results.get(task_id, {})
base = base_model_results.get(task_id, {})
delta = r.get("total_reward", 0) - base.get("total_reward", 0)
md_lines.append(
f"| **GRPO trained** | {task_id} | "
f"{r.get('total_reward', 0):.4f} ({delta:+.4f}) | "
f"{r.get('bugs_found', 0)}/{r.get('total_bugs', 0)} | "
f"{r.get('coverage_pct', 0):.1f}% |"
)
md_lines.append("")
with open(os.path.join(metrics_dir, "results.md"), "w") as f:
f.write("\n".join(md_lines))
logger.info(f"Metrics saved to {metrics_dir}/")
def save_plots(output_dir: str, baseline_results: dict, base_model_results: dict, trained_model_results: dict):
"""Generate and save comparison plots."""
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
except ImportError:
logger.warning("matplotlib not installed — skipping plot generation. pip install matplotlib")
return
plots_dir = os.path.join(output_dir, "metrics", "plots")
os.makedirs(plots_dir, exist_ok=True)
tasks = ["basic_validation", "edge_cases", "security_workflows"]
task_labels = ["Basic", "Edge Cases", "Security"]
# --- Plot 1: Reward comparison bar chart ---
fig, ax = plt.subplots(figsize=(12, 6))
x = np.arange(len(tasks))
width = 0.15
agents_to_plot = []
for agent_name in ["random", "sequential", "smart"]:
if agent_name in baseline_results:
rewards = [baseline_results[agent_name].get(t, {}).get("total_reward", 0) for t in tasks]
agents_to_plot.append((agent_name, rewards))
base_rewards = [base_model_results.get(t, {}).get("total_reward", 0) for t in tasks]
agents_to_plot.append(("Base Model", base_rewards))
trained_rewards = [trained_model_results.get(t, {}).get("total_reward", 0) for t in tasks]
agents_to_plot.append(("GRPO Trained", trained_rewards))
colors = ["#95a5a6", "#3498db", "#e67e22", "#9b59b6", "#2ecc71"]
for i, (name, rewards) in enumerate(agents_to_plot):
offset = (i - len(agents_to_plot) / 2 + 0.5) * width
bars = ax.bar(x + offset, rewards, width, label=name, color=colors[i % len(colors)])
for bar, val in zip(bars, rewards):
if val > 0.01:
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
f"{val:.2f}", ha="center", va="bottom", fontsize=7)
ax.set_xlabel("Task")
ax.set_ylabel("Total Reward")
ax.set_title("Reward Comparison: Baselines vs Base Model vs GRPO Trained")
ax.set_xticks(x)
ax.set_xticklabels(task_labels)
ax.legend()
ax.set_ylim(bottom=0)
plt.tight_layout()
fig.savefig(os.path.join(plots_dir, "reward_comparison.png"), dpi=150)
plt.close(fig)
# --- Plot 2: Bugs found comparison ---
fig, ax = plt.subplots(figsize=(12, 6))
for i, (name, _) in enumerate(agents_to_plot):
if name in baseline_results:
bugs = [baseline_results[name].get(t, {}).get("bugs_found", 0) for t in tasks]
elif name == "Base Model":
bugs = [base_model_results.get(t, {}).get("bugs_found", 0) for t in tasks]
else:
bugs = [trained_model_results.get(t, {}).get("bugs_found", 0) for t in tasks]
offset = (i - len(agents_to_plot) / 2 + 0.5) * width
ax.bar(x + offset, bugs, width, label=name, color=colors[i % len(colors)])
total_bugs = [base_model_results.get(t, {}).get("total_bugs", 0) or
trained_model_results.get(t, {}).get("total_bugs", 0) for t in tasks]
ax.plot(x, total_bugs, "k--", marker="D", label="Total Bugs", linewidth=1.5)
ax.set_xlabel("Task")
ax.set_ylabel("Bugs Found")
ax.set_title("Bug Discovery: Baselines vs Base Model vs GRPO Trained")
ax.set_xticks(x)
ax.set_xticklabels(task_labels)
ax.legend()
ax.set_ylim(bottom=0)
plt.tight_layout()
fig.savefig(os.path.join(plots_dir, "bugs_comparison.png"), dpi=150)
plt.close(fig)
# --- Plot 3: Coverage comparison ---
fig, ax = plt.subplots(figsize=(12, 6))
for i, (name, _) in enumerate(agents_to_plot):
if name in baseline_results:
cov = [baseline_results[name].get(t, {}).get("coverage_pct", 0) for t in tasks]
elif name == "Base Model":
cov = [base_model_results.get(t, {}).get("coverage_pct", 0) for t in tasks]
else:
cov = [trained_model_results.get(t, {}).get("coverage_pct", 0) for t in tasks]
offset = (i - len(agents_to_plot) / 2 + 0.5) * width
ax.bar(x + offset, cov, width, label=name, color=colors[i % len(colors)])
ax.set_xlabel("Task")
ax.set_ylabel("Coverage %")
ax.set_title("API Coverage: Baselines vs Base Model vs GRPO Trained")
ax.set_xticks(x)
ax.set_xticklabels(task_labels)
ax.legend()
ax.set_ylim(0, 105)
plt.tight_layout()
fig.savefig(os.path.join(plots_dir, "coverage_comparison.png"), dpi=150)
plt.close(fig)
logger.info(f"Plots saved to {plots_dir}/")
def train_grpo(args):
"""Run GRPO training with TRL."""
try:
from datasets import Dataset
from peft import LoraConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import GRPOConfig, GRPOTrainer
# --- MONKEY PATCH FOR TRL GRPOTrainer ---
# trl 0.15 lacks `dataset` argument in `_get_train_sampler` required by transformers 4.57+
import inspect
if hasattr(GRPOTrainer, "_get_train_sampler"):
sig = inspect.signature(GRPOTrainer._get_train_sampler)
if "dataset" not in sig.parameters:
_old_sampler = GRPOTrainer._get_train_sampler
def _new_sampler(self, dataset=None, **kwargs):
return _old_sampler(self)
GRPOTrainer._get_train_sampler = _new_sampler
# ----------------------------------------
except ImportError as e:
logger.error(
f"Missing dependency: {e}\n"
"Install with: pip install trl transformers peft datasets torch"
)
sys.exit(1)
# --- W&B setup ---
wandb_run = None
report_to = "none"
if args.use_wandb:
try:
import wandb
wandb_run = wandb.init(
project=args.wandb_project,
name=args.wandb_run_name or f"grpo-{args.model_id.split('/')[-1]}-{int(time.time())}",
config={
"model_id": args.model_id,
"num_episodes": args.num_episodes,
"num_generations": args.num_generations,
"max_steps": args.max_steps,
"learning_rate": args.learning_rate,
"batch_size": args.batch_size,
"max_completion_length": args.max_completion_length,
"lora_r": 16,
"lora_alpha": 32,
},
)
report_to = "wandb"
logger.info(f"W&B initialized: project={args.wandb_project}, run={wandb_run.name}")
except ImportError:
logger.warning("wandb not installed — skipping W&B logging. pip install wandb")
args.use_wandb = False
training_args_dict = {
"model_id": args.model_id,
"num_episodes": args.num_episodes,
"num_generations": args.num_generations,
"max_steps": args.max_steps,
"learning_rate": args.learning_rate,
"batch_size": args.batch_size,
"max_completion_length": args.max_completion_length,
"output_dir": args.output_dir,
"test_mode": args.test_mode,
}
# ================================================================
# PIPELINE OVERVIEW
# ================================================================
total_pipeline_steps = 11
def _step(n, msg):
bar = "█" * n + "░" * (total_pipeline_steps - n)
print(f"\n{'='*70}")
print(f" [{bar}] Step {n}/{total_pipeline_steps}: {msg}")
print(f"{'='*70}\n")
# --- Step 1: Run baseline agent evaluation ---
_step(1, "Running baseline agents (random, sequential, smart)")
baseline_results = run_baseline_evaluation(seed=9999)
if args.use_wandb and wandb_run:
import wandb
for agent_name, tasks in baseline_results.items():
for task_id, r in tasks.items():
wandb.log({
f"baseline/{agent_name}/{task_id}/reward": r["total_reward"],
f"baseline/{agent_name}/{task_id}/bugs": r["bugs_found"],
f"baseline/{agent_name}/{task_id}/coverage": r["coverage_pct"],
})
# --- Step 2: Load model and tokenizer ---
_step(2, f"Loading model: {args.model_id}")
print(" Downloading tokenizer...", flush=True)
tokenizer = AutoTokenizer.from_pretrained(args.model_id, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
print(" Tokenizer loaded.", flush=True)
import torch
# --- Force GPU detection ---
if torch.cuda.is_available():
device_map = "auto"
dtype = torch.bfloat16
gpu_name = torch.cuda.get_device_name(0)
gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1e9
print(f" GPU: {gpu_name} ({gpu_mem:.1f} GB)", flush=True)
print(f" CUDA version: {torch.version.cuda}", flush=True)
elif torch.backends.mps.is_available():
device_map = "auto"
dtype = torch.float16
print(" Device: Apple MPS", flush=True)
else:
# Still try to use GPU — sometimes torch.cuda.is_available() is False
# because of driver issues but CUDA can still work
device_map = None
dtype = torch.float32
print(" !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", flush=True)
print(" !! WARNING: No GPU detected — running on CPU !!", flush=True)
print(" !! Training will be EXTREMELY slow. !!", flush=True)
print(" !! Check: python -c 'import torch; print(torch.cuda.is_available())'", flush=True)
print(" !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", flush=True)
print(" Downloading model weights...", flush=True)
model = AutoModelForCausalLM.from_pretrained(
args.model_id,
trust_remote_code=True,
torch_dtype=dtype,
device_map=device_map,
)
# Verify model is actually on GPU
actual_device = next(model.parameters()).device
param_count = sum(p.numel() for p in model.parameters()) / 1e6
print(f" Model loaded: {param_count:.0f}M parameters on {actual_device}", flush=True)
if torch.cuda.is_available() and actual_device.type != "cuda":
print(" Model not on GPU — forcing move to CUDA...", flush=True)
model = model.to("cuda")
print(f" Moved to: {next(model.parameters()).device}", flush=True)
# --- Step 3: Evaluate base model BEFORE training ---
_step(3, f"Evaluating BASE model (before GRPO, max {args.eval_max_steps} steps/task)")
base_results = {}
if not args.skip_eval:
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
result = run_rollout(model, tokenizer, task_id=task_id, seed=9999, max_steps=args.eval_max_steps)
base_results[task_id] = result
logger.info(
f" [BASE] {task_id}: reward={result['total_reward']:.3f}, "
f"bugs={result['bugs_found']}/{result['total_bugs']}, "
f"coverage={result['coverage_pct']:.1f}%"
)
if args.use_wandb and wandb_run:
import wandb
wandb.log({
f"base_model/{task_id}/reward": result["total_reward"],
f"base_model/{task_id}/bugs": result["bugs_found"],
f"base_model/{task_id}/coverage": result["coverage_pct"],
})
else:
logger.info("Skipping base model evaluation (--skip-eval)")
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
base_results[task_id] = {"total_reward": 0, "bugs_found": 0, "total_bugs": 0, "coverage_pct": 0}
# --- Step 4: LoRA config ---
_step(4, "Configuring LoRA adapters")
lora_config = LoraConfig(
r=16,
lora_alpha=32,
lora_dropout=0.05,
target_modules=["q_proj", "v_proj"],
task_type="CAUSAL_LM",
)
print(f" LoRA: r=16, alpha=32, targets=q_proj+v_proj", flush=True)
# --- Step 5: Generate training prompts ---
_step(5, f"Generating {args.num_episodes} training episodes")
raw_prompts = build_training_prompts(num_episodes=args.num_episodes)
print(f" {len(raw_prompts)} prompts across 3 tasks (each with unique seed)", flush=True)
# Qwen3 thinking mode: let the model reason before outputting JSON
# Requires higher max_completion_length (~2048) to fit <think>...</think> + JSON
chat_template_kwargs = {}
if "qwen3" in args.model_id.lower():
chat_template_kwargs["enable_thinking"] = True
logger.info("Qwen3 detected — thinking mode ENABLED (model will reason before acting)")
formatted_prompts = []
for p in raw_prompts:
text = tokenizer.apply_chat_template(
p["prompt"], tokenize=False, add_generation_prompt=True,
**chat_template_kwargs,
)
formatted_prompts.append({"prompt": text, "task_id": p["task_id"], "seed": p["seed"]})
dataset = Dataset.from_list(formatted_prompts)
# Store prompt metadata for the reward function to create fresh envs
prompts_meta = [{"seed": p["seed"], "task_id": p["task_id"]} for p in raw_prompts]
# Combined reward: format (valid JSON array?) + plan (execute all actions) + diversity (varied requests?)
# Each generation gets a FRESH environment — no shared state pollution
def combined_reward_fn(completions, **kwargs):
fmt = format_reward_fn(completions)
plan = plan_reward_fn(completions, prompts_meta=prompts_meta)
div = diversity_reward_fn(completions)
return [f + p + d for f, p, d in zip(fmt, plan, div)]
# --- Step 6: GRPO training ---
_step(6, f"GRPO training ({args.max_steps} steps, {args.num_generations} generations/prompt)")
config = GRPOConfig(
output_dir=args.output_dir,
num_generations=args.num_generations,
max_completion_length=args.max_completion_length,
learning_rate=args.learning_rate,
per_device_train_batch_size=args.batch_size,
num_train_epochs=1,
max_steps=args.max_steps,
logging_steps=5,
save_steps=50,
save_total_limit=3,
report_to=report_to,
temperature=0.8,
)
trainer = GRPOTrainer(
model=model,
args=config,
reward_funcs=[combined_reward_fn],
train_dataset=dataset,
peft_config=lora_config,
processing_class=tokenizer,
)
print(f" Config: lr={args.learning_rate}, batch={args.batch_size}, "
f"max_completion={args.max_completion_length}, temp=0.8", flush=True)
print(f" Rewards: format_reward + plan_reward + diversity_reward", flush=True)
print(f" Training begins... (progress bar below)\n", flush=True)
train_start = time.time()
trainer.train()
training_time = time.time() - train_start
print(f"\n Training completed in {training_time / 60:.1f} minutes", flush=True)
# --- Step 7: Save model locally ---
_step(7, f"Saving model to {args.output_dir}")
trainer.save_model(args.output_dir)
tokenizer.save_pretrained(args.output_dir)
print(f" Model + tokenizer saved.", flush=True)
# --- Step 8: Push to HuggingFace Hub ---
_step(8, "Pushing to HuggingFace Hub" if args.push_to_hub else "HF Hub push (skipped — use --push-to-hub)")
if args.push_to_hub:
hf_repo = args.hf_repo_id
if not hf_repo:
logger.error("--hf-repo-id is required when using --push-to-hub")
else:
try:
logger.info(f"Pushing model to HuggingFace Hub: {hf_repo}")
trainer.push_to_hub(repo_id=hf_repo, commit_message="GRPO trained API testing agent")
tokenizer.push_to_hub(repo_id=hf_repo, commit_message="GRPO trained API testing agent")
logger.info(f"Model pushed to https://huggingface.co/{hf_repo}")
except Exception as e:
logger.error(f"Failed to push to HF Hub: {e}")
logger.info("Make sure you're logged in: huggingface-cli login")
# --- Step 9: Evaluate AFTER training ---
_step(9, f"Evaluating TRAINED model (max {args.eval_max_steps} steps/task)")
trained_results = {}
if not args.skip_eval:
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
result = run_rollout(model, tokenizer, task_id=task_id, seed=9999, max_steps=args.eval_max_steps)
trained_results[task_id] = result
base = base_results[task_id]
reward_delta = result["total_reward"] - base.get("total_reward", 0)
bug_delta = result["bugs_found"] - base.get("bugs_found", 0)
cov_delta = result["coverage_pct"] - base.get("coverage_pct", 0)
logger.info(
f" [TRAINED] {task_id}: reward={result['total_reward']:.3f} ({reward_delta:+.3f}), "
f"bugs={result['bugs_found']}/{result['total_bugs']} ({bug_delta:+d}), "
f"coverage={result['coverage_pct']:.1f}% ({cov_delta:+.1f}%)"
)
if args.use_wandb and wandb_run:
import wandb
wandb.log({
f"trained_model/{task_id}/reward": result["total_reward"],
f"trained_model/{task_id}/bugs": result["bugs_found"],
f"trained_model/{task_id}/coverage": result["coverage_pct"],
f"delta/{task_id}/reward": reward_delta,
f"delta/{task_id}/bugs": bug_delta,
f"delta/{task_id}/coverage": cov_delta,
})
else:
logger.info("Skipping trained model evaluation (--skip-eval)")
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
trained_results[task_id] = {"total_reward": 0, "bugs_found": 0, "total_bugs": 0, "coverage_pct": 0}
# --- Step 10: Print final comparison table ---
_step(10, "Results comparison table")
print("=" * 95)
print("FINAL COMPARISON: All Agents & Models")
print("=" * 95)
print(f"{'Agent/Model':<18} {'Task':<25} {'Reward':<10} {'Bugs':<12} {'Coverage':<10}")
print("-" * 95)
for agent_name in ["random", "sequential", "smart"]:
if agent_name in baseline_results:
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = baseline_results[agent_name].get(task_id, {})
print(
f"{agent_name:<18} {task_id:<25} "
f"{r.get('total_reward', 0):<10.4f} "
f"{r.get('bugs_found', 0)}/{r.get('total_bugs', 0):<10} "
f"{r.get('coverage_pct', 0):<10.1f}%"
)
print("-" * 95)
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = base_results[task_id]
print(
f"{'Base Model':<18} {task_id:<25} "
f"{r['total_reward']:<10.4f} "
f"{r['bugs_found']}/{r['total_bugs']:<10} "
f"{r['coverage_pct']:<10.1f}%"
)
print("-" * 95)
for task_id in ["basic_validation", "edge_cases", "security_workflows"]:
r = trained_results[task_id]
base = base_results[task_id]
delta = r["total_reward"] - base["total_reward"]
print(
f"{'GRPO Trained':<18} {task_id:<25} "
f"{r['total_reward']:<10.4f} "
f"{r['bugs_found']}/{r['total_bugs']:<10} "
f"{r['coverage_pct']:<10.1f}% ({delta:+.4f})"
)
print("=" * 95)
# --- Step 11: Save metrics & plots ---
_step(11, "Saving metrics, plots, and finalizing")
save_metrics(
output_dir=args.output_dir,
baseline_results=baseline_results,
base_model_results=base_results,
trained_model_results=trained_results,
training_args=training_args_dict,
training_time_s=training_time,
)
save_plots(
output_dir=args.output_dir,
baseline_results=baseline_results,
base_model_results=base_results,
trained_model_results=trained_results,
)
# --- Finalize W&B ---
if args.use_wandb and wandb_run:
import wandb
# Log plots as artifacts
plots_dir = os.path.join(args.output_dir, "metrics", "plots")
if os.path.exists(plots_dir):
for fname in os.listdir(plots_dir):
if fname.endswith(".png"):
wandb.log({f"plots/{fname.replace('.png', '')}": wandb.Image(os.path.join(plots_dir, fname))})
wandb.finish()
# ================================================================
print(f"\n{'='*70}")
print(f" PIPELINE COMPLETE")
print(f" Training time: {training_time / 60:.1f} minutes")
print(f" Model saved to: {args.output_dir}")
print(f" Metrics: {args.output_dir}/metrics/")
print(f" Plots: {args.output_dir}/metrics/plots/")
if args.use_wandb:
print(f" W&B: https://wandb.ai/{args.wandb_project}")
if args.push_to_hub and args.hf_repo_id:
print(f" HF Hub: https://huggingface.co/{args.hf_repo_id}")
print(f"{'='*70}\n")
def main():
parser = argparse.ArgumentParser(description="GRPO Training for API Testing Agent")
# Model & training
parser.add_argument("--model-id", default="Qwen/Qwen3-1.7B", help="Base model to fine-tune")
parser.add_argument("--output-dir", default="./checkpoints/grpo_api_tester")
parser.add_argument("--num-episodes", type=int, default=50, help="Number of training episodes")
parser.add_argument("--num-generations", type=int, default=4, help="GRPO parallel rollouts per prompt")
parser.add_argument("--max-completion-length", type=int, default=4096,
help="Max tokens per generation. 4096 needed for Qwen3 thinking + JSON plan")
parser.add_argument("--max-steps", type=int, default=200, help="Max training steps")
parser.add_argument("--learning-rate", type=float, default=2e-5)
parser.add_argument("--batch-size", type=int, default=4)
parser.add_argument("--test-mode", action="store_true", help="Quick test with tiny config")
# HuggingFace Hub
parser.add_argument("--push-to-hub", action="store_true", help="Push trained model to HF Hub")
parser.add_argument("--hf-repo-id", type=str, default=None,
help="HF Hub repo ID (e.g., your-username/api-tester-grpo)")
# Evaluation
parser.add_argument("--skip-eval", action="store_true", help="Skip base/trained model evaluation")
parser.add_argument("--eval-max-steps", type=int, default=10,
help="Max steps per task during evaluation (default: 10, reduces eval time)")
# Weights & Biases
parser.add_argument("--use-wandb", action="store_true", help="Enable Weights & Biases logging")
parser.add_argument("--wandb-project", type=str, default="api-testing-grpo",
help="W&B project name")
parser.add_argument("--wandb-run-name", type=str, default=None,
help="W&B run name (auto-generated if not set)")
args = parser.parse_args()
if args.test_mode:
logger.info("=== TEST MODE — quick sanity check ===")
args.num_episodes = 3
args.num_generations = 4
args.batch_size = 2
args.max_steps = 10
args.max_completion_length = 2048
if os.environ.get("SHOW_PROMPTS"):
prompts = build_training_prompts(num_episodes=3)
for p in prompts:
print(f"\n{'='*60}")
print(f"Task: {p['task_id']} | Seed: {p['seed']}")
print(f"{'='*60}")
for msg in p["prompt"]:
print(f"[{msg['role']}]: {msg['content'][:300]}...")
return
train_grpo(args)
if __name__ == "__main__":
main()