api-testing-env / training /evaluate.py
Mayank022's picture
Upload folder using huggingface_hub
a4f74f3 verified
#!/usr/bin/env python3
"""
Evaluation and rollout runner.
- run_rollout(): Run a single episode with a HuggingFace model
- run_baseline_local(): Run baseline agents against the local environment
- run_baseline(): Run baseline agents against a remote server
- main(): CLI for running baselines
"""
import argparse
import asyncio
import logging
import random
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
from models import APITestAction, HTTPMethod
from server.environment import APITestEnvironment
from .prompts import (
PLAN_SYSTEM_PROMPT, format_plan_prompt,
parse_action, parse_test_plan,
)
from .agents import AGENTS
def run_rollout(
model,
tokenizer,
task_id: str = "basic_validation",
seed: int = 42,
max_steps: int | None = None,
) -> dict:
"""Run a single episode with a HuggingFace model.
Uses PLAN mode: the model generates a full test plan (JSON array) in one shot,
then all actions are executed sequentially. This matches how training works.
Falls back to multi-turn mode if the model can't produce a valid plan.
"""
import torch
import time as _time
# Force GPU if available
if torch.cuda.is_available():
device = torch.device("cuda")
# Move model to GPU if it's on CPU
if next(model.parameters()).device.type == "cpu":
logger.info(" Moving model to GPU...")
model = model.to(device)
else:
device = next(model.parameters()).device
env = APITestEnvironment()
obs = env.reset(seed=seed, task_id=task_id)
actual_max = max_steps or obs.max_steps
logger.info(f" Rollout: {task_id} | max_steps={actual_max} | device={device}")
# --- Try plan mode first (matches training) ---
plan_prompt = format_plan_prompt(obs)
messages = [
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": plan_prompt},
]
# Qwen3 thinking support
chat_kwargs = {}
if "qwen3" in str(getattr(model, "name_or_path", "") or "").lower():
chat_kwargs["enable_thinking"] = True
prompt_text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True, **chat_kwargs,
)
inputs = tokenizer(prompt_text, return_tensors="pt").to(device)
gen_start = _time.time()
print(f" Generating test plan...", end="", flush=True)
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=4096, # Match training max_completion_length
temperature=0.7,
do_sample=True,
pad_token_id=tokenizer.pad_token_id or tokenizer.eos_token_id,
)
completion = tokenizer.decode(output[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
gen_time = _time.time() - gen_start
print(f" done ({gen_time:.1f}s, {len(completion)} chars)")
# Parse the plan
actions = parse_test_plan(completion)
if actions:
logger.info(f" Plan generated: {len(actions)} actions")
else:
# Fallback: try single action parse
single = parse_action(completion)
if single:
actions = [single]
logger.info(" Plan parse failed, got 1 action from fallback")
else:
logger.warning(" Failed to parse any actions from model output")
# Print first 500 chars of completion for debugging
preview = completion[:500].replace("\n", " ")
logger.warning(f" Model output preview: {preview}...")
actions = []
# Limit to max_steps
actions = actions[:actual_max]
# Execute all actions
total_reward = 0.0
for i, action in enumerate(actions):
try:
obs = env.step(action)
total_reward += obs.reward or 0.0
method_str = action.method.value if hasattr(action.method, "value") else str(action.method)
print(f" Step {i+1}/{len(actions)}: {method_str} {action.endpoint} -> "
f"{obs.status_code} | reward={obs.reward:.3f} | bugs={obs.bugs_found_so_far}")
except Exception as e:
print(f" Step {i+1}/{len(actions)}: ERROR - {e}")
# If no actions were generated, show that
if not actions:
print(" (no valid actions generated)")
state = env.state
return {
"task_id": task_id,
"seed": seed,
"steps": len(actions),
"total_reward": round(total_reward, 4),
"bugs_found": state.bugs_found,
"total_bugs": state.total_bugs,
"coverage_pct": state.coverage_pct,
"bugs_found_ids": state.bugs_found_ids,
}
def run_baseline_local(
agent_name: str = "all",
task_id: str = "all",
seed: int = 42,
) -> list[dict]:
"""Run baseline agents against the local environment (no server needed).
Args:
agent_name: "random", "sequential", "smart", or "all"
task_id: task ID or "all"
seed: random seed
Returns:
List of result dicts with agent, task_id, total_reward, bugs_found, etc.
"""
tasks = ["basic_validation", "edge_cases", "security_workflows"] if task_id == "all" else [task_id]
agents = list(AGENTS.items()) if agent_name == "all" else [(agent_name, AGENTS[agent_name])]
results = []
for tid in tasks:
for aname, agent_cls in agents:
random.seed(seed)
agent = agent_cls()
env = APITestEnvironment()
obs = env.reset(seed=seed, task_id=tid)
total_reward = 0.0
step = 0
while not obs.done and step < obs.max_steps:
obs_dict = {
"status_code": obs.status_code,
"response_body": obs.response_body,
"feedback": obs.feedback,
"bugs_found_so_far": obs.bugs_found_so_far,
"coverage_summary": obs.coverage_summary,
"known_resource_ids": obs.known_resource_ids,
"auth_tokens": obs.auth_tokens,
"steps_taken": obs.steps_taken,
"max_steps": obs.max_steps,
}
action = agent.act(obs_dict)
obs = env.step(action)
total_reward += obs.reward or 0.0
step += 1
state = env.state
result = {
"agent": aname,
"task_id": tid,
"seed": seed,
"steps": step,
"total_reward": round(total_reward, 4),
"bugs_found": state.bugs_found,
"total_bugs": state.total_bugs,
"coverage_pct": state.coverage_pct,
"bugs_found_ids": state.bugs_found_ids,
}
results.append(result)
logger.info(
f" [{aname}] {tid}: reward={result['total_reward']:.4f}, "
f"bugs={result['bugs_found']}/{result['total_bugs']}, "
f"coverage={result['coverage_pct']:.1f}%"
)
return results
# =====================================================================
# Remote baseline runner (against server via WebSocket client)
# =====================================================================
async def run_episode(url: str, task_id: str, agent_cls, seed: int = 42) -> dict:
"""Run one baseline episode against a remote server."""
from client import APITestEnv
random.seed(seed)
agent = agent_cls()
async with APITestEnv(base_url=url) as env:
result = await env.reset(task_id=task_id)
obs = result.observation
logger.info(f"Starting {agent.name} agent on task '{task_id}'")
total_reward = 0.0
step = 0
while not result.done:
obs_dict = {
"status_code": obs.status_code,
"response_body": obs.response_body,
"feedback": obs.feedback,
"bugs_found_so_far": obs.bugs_found_so_far,
"coverage_summary": obs.coverage_summary,
"known_resource_ids": obs.known_resource_ids,
"auth_tokens": obs.auth_tokens,
"steps_taken": obs.steps_taken,
"max_steps": obs.max_steps,
}
action = agent.act(obs_dict)
result = await env.step(action)
obs = result.observation
total_reward += result.reward or 0
step += 1
method = action.method.value if hasattr(action.method, "value") else str(action.method)
logger.info(
f" Step {step}: {method} {action.endpoint} -> "
f"{obs.status_code} | reward={result.reward:.4f} | bugs={obs.bugs_found_so_far}"
)
state = await env.state()
return {
"task_id": task_id,
"agent": agent.name,
"total_reward": round(total_reward, 4),
"bugs_found": state.bugs_found,
"total_bugs": state.total_bugs,
"coverage_pct": state.coverage_pct,
"steps": step,
}
async def main_async(args):
tasks = ["basic_validation", "edge_cases", "security_workflows"] if args.task == "all" else [args.task]
agents = list(AGENTS.values()) if args.agent == "all" else [AGENTS[args.agent]]
results = []
for task_id in tasks:
for agent_cls in agents:
try:
result = await run_episode(args.url, task_id, agent_cls, seed=args.seed)
results.append(result)
logger.info(
f"\nRESULT: {result['agent']} on {result['task_id']}: "
f"reward={result['total_reward']}, bugs={result['bugs_found']}/{result['total_bugs']}, "
f"coverage={result['coverage_pct']:.1f}%"
)
except Exception as e:
logger.error(f"Error running {agent_cls.name} on {task_id}: {e}", exc_info=True)
if results:
print("\n" + "=" * 80)
print("BASELINE RESULTS SUMMARY")
print("=" * 80)
print(f"{'Agent':<15} {'Task':<25} {'Score':<10} {'Bugs':<10} {'Coverage':<10}")
print("-" * 80)
for r in results:
print(
f"{r['agent']:<15} {r['task_id']:<25} "
f"{r['total_reward']:<10.4f} "
f"{r['bugs_found']}/{r['total_bugs']:<8} "
f"{r['coverage_pct']:<10.1f}%"
)
print("=" * 80)
return results
def main():
parser = argparse.ArgumentParser(description="Baseline agents for API Testing Environment")
parser.add_argument("--url", default="http://localhost:8000", help="Environment server URL")
parser.add_argument("--task", default="all",
choices=["basic_validation", "edge_cases", "security_workflows", "all"])
parser.add_argument("--agent", default="all", choices=["random", "sequential", "smart", "all"])
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()