Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Evaluation and rollout runner. | |
| - run_rollout(): Run a single episode with a HuggingFace model | |
| - run_baseline_local(): Run baseline agents against the local environment | |
| - run_baseline(): Run baseline agents against a remote server | |
| - main(): CLI for running baselines | |
| """ | |
| import argparse | |
| import asyncio | |
| import logging | |
| import random | |
| import sys | |
| import os | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| logger = logging.getLogger(__name__) | |
| from models import APITestAction, HTTPMethod | |
| from server.environment import APITestEnvironment | |
| from .prompts import ( | |
| PLAN_SYSTEM_PROMPT, format_plan_prompt, | |
| parse_action, parse_test_plan, | |
| ) | |
| from .agents import AGENTS | |
| def run_rollout( | |
| model, | |
| tokenizer, | |
| task_id: str = "basic_validation", | |
| seed: int = 42, | |
| max_steps: int | None = None, | |
| ) -> dict: | |
| """Run a single episode with a HuggingFace model. | |
| Uses PLAN mode: the model generates a full test plan (JSON array) in one shot, | |
| then all actions are executed sequentially. This matches how training works. | |
| Falls back to multi-turn mode if the model can't produce a valid plan. | |
| """ | |
| import torch | |
| import time as _time | |
| # Force GPU if available | |
| if torch.cuda.is_available(): | |
| device = torch.device("cuda") | |
| # Move model to GPU if it's on CPU | |
| if next(model.parameters()).device.type == "cpu": | |
| logger.info(" Moving model to GPU...") | |
| model = model.to(device) | |
| else: | |
| device = next(model.parameters()).device | |
| env = APITestEnvironment() | |
| obs = env.reset(seed=seed, task_id=task_id) | |
| actual_max = max_steps or obs.max_steps | |
| logger.info(f" Rollout: {task_id} | max_steps={actual_max} | device={device}") | |
| # --- Try plan mode first (matches training) --- | |
| plan_prompt = format_plan_prompt(obs) | |
| messages = [ | |
| {"role": "system", "content": PLAN_SYSTEM_PROMPT}, | |
| {"role": "user", "content": plan_prompt}, | |
| ] | |
| # Qwen3 thinking support | |
| chat_kwargs = {} | |
| if "qwen3" in str(getattr(model, "name_or_path", "") or "").lower(): | |
| chat_kwargs["enable_thinking"] = True | |
| prompt_text = tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True, **chat_kwargs, | |
| ) | |
| inputs = tokenizer(prompt_text, return_tensors="pt").to(device) | |
| gen_start = _time.time() | |
| print(f" Generating test plan...", end="", flush=True) | |
| with torch.no_grad(): | |
| output = model.generate( | |
| **inputs, | |
| max_new_tokens=4096, # Match training max_completion_length | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=tokenizer.pad_token_id or tokenizer.eos_token_id, | |
| ) | |
| completion = tokenizer.decode(output[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
| gen_time = _time.time() - gen_start | |
| print(f" done ({gen_time:.1f}s, {len(completion)} chars)") | |
| # Parse the plan | |
| actions = parse_test_plan(completion) | |
| if actions: | |
| logger.info(f" Plan generated: {len(actions)} actions") | |
| else: | |
| # Fallback: try single action parse | |
| single = parse_action(completion) | |
| if single: | |
| actions = [single] | |
| logger.info(" Plan parse failed, got 1 action from fallback") | |
| else: | |
| logger.warning(" Failed to parse any actions from model output") | |
| # Print first 500 chars of completion for debugging | |
| preview = completion[:500].replace("\n", " ") | |
| logger.warning(f" Model output preview: {preview}...") | |
| actions = [] | |
| # Limit to max_steps | |
| actions = actions[:actual_max] | |
| # Execute all actions | |
| total_reward = 0.0 | |
| for i, action in enumerate(actions): | |
| try: | |
| obs = env.step(action) | |
| total_reward += obs.reward or 0.0 | |
| method_str = action.method.value if hasattr(action.method, "value") else str(action.method) | |
| print(f" Step {i+1}/{len(actions)}: {method_str} {action.endpoint} -> " | |
| f"{obs.status_code} | reward={obs.reward:.3f} | bugs={obs.bugs_found_so_far}") | |
| except Exception as e: | |
| print(f" Step {i+1}/{len(actions)}: ERROR - {e}") | |
| # If no actions were generated, show that | |
| if not actions: | |
| print(" (no valid actions generated)") | |
| state = env.state | |
| return { | |
| "task_id": task_id, | |
| "seed": seed, | |
| "steps": len(actions), | |
| "total_reward": round(total_reward, 4), | |
| "bugs_found": state.bugs_found, | |
| "total_bugs": state.total_bugs, | |
| "coverage_pct": state.coverage_pct, | |
| "bugs_found_ids": state.bugs_found_ids, | |
| } | |
| def run_baseline_local( | |
| agent_name: str = "all", | |
| task_id: str = "all", | |
| seed: int = 42, | |
| ) -> list[dict]: | |
| """Run baseline agents against the local environment (no server needed). | |
| Args: | |
| agent_name: "random", "sequential", "smart", or "all" | |
| task_id: task ID or "all" | |
| seed: random seed | |
| Returns: | |
| List of result dicts with agent, task_id, total_reward, bugs_found, etc. | |
| """ | |
| tasks = ["basic_validation", "edge_cases", "security_workflows"] if task_id == "all" else [task_id] | |
| agents = list(AGENTS.items()) if agent_name == "all" else [(agent_name, AGENTS[agent_name])] | |
| results = [] | |
| for tid in tasks: | |
| for aname, agent_cls in agents: | |
| random.seed(seed) | |
| agent = agent_cls() | |
| env = APITestEnvironment() | |
| obs = env.reset(seed=seed, task_id=tid) | |
| total_reward = 0.0 | |
| step = 0 | |
| while not obs.done and step < obs.max_steps: | |
| obs_dict = { | |
| "status_code": obs.status_code, | |
| "response_body": obs.response_body, | |
| "feedback": obs.feedback, | |
| "bugs_found_so_far": obs.bugs_found_so_far, | |
| "coverage_summary": obs.coverage_summary, | |
| "known_resource_ids": obs.known_resource_ids, | |
| "auth_tokens": obs.auth_tokens, | |
| "steps_taken": obs.steps_taken, | |
| "max_steps": obs.max_steps, | |
| } | |
| action = agent.act(obs_dict) | |
| obs = env.step(action) | |
| total_reward += obs.reward or 0.0 | |
| step += 1 | |
| state = env.state | |
| result = { | |
| "agent": aname, | |
| "task_id": tid, | |
| "seed": seed, | |
| "steps": step, | |
| "total_reward": round(total_reward, 4), | |
| "bugs_found": state.bugs_found, | |
| "total_bugs": state.total_bugs, | |
| "coverage_pct": state.coverage_pct, | |
| "bugs_found_ids": state.bugs_found_ids, | |
| } | |
| results.append(result) | |
| logger.info( | |
| f" [{aname}] {tid}: reward={result['total_reward']:.4f}, " | |
| f"bugs={result['bugs_found']}/{result['total_bugs']}, " | |
| f"coverage={result['coverage_pct']:.1f}%" | |
| ) | |
| return results | |
| # ===================================================================== | |
| # Remote baseline runner (against server via WebSocket client) | |
| # ===================================================================== | |
| async def run_episode(url: str, task_id: str, agent_cls, seed: int = 42) -> dict: | |
| """Run one baseline episode against a remote server.""" | |
| from client import APITestEnv | |
| random.seed(seed) | |
| agent = agent_cls() | |
| async with APITestEnv(base_url=url) as env: | |
| result = await env.reset(task_id=task_id) | |
| obs = result.observation | |
| logger.info(f"Starting {agent.name} agent on task '{task_id}'") | |
| total_reward = 0.0 | |
| step = 0 | |
| while not result.done: | |
| obs_dict = { | |
| "status_code": obs.status_code, | |
| "response_body": obs.response_body, | |
| "feedback": obs.feedback, | |
| "bugs_found_so_far": obs.bugs_found_so_far, | |
| "coverage_summary": obs.coverage_summary, | |
| "known_resource_ids": obs.known_resource_ids, | |
| "auth_tokens": obs.auth_tokens, | |
| "steps_taken": obs.steps_taken, | |
| "max_steps": obs.max_steps, | |
| } | |
| action = agent.act(obs_dict) | |
| result = await env.step(action) | |
| obs = result.observation | |
| total_reward += result.reward or 0 | |
| step += 1 | |
| method = action.method.value if hasattr(action.method, "value") else str(action.method) | |
| logger.info( | |
| f" Step {step}: {method} {action.endpoint} -> " | |
| f"{obs.status_code} | reward={result.reward:.4f} | bugs={obs.bugs_found_so_far}" | |
| ) | |
| state = await env.state() | |
| return { | |
| "task_id": task_id, | |
| "agent": agent.name, | |
| "total_reward": round(total_reward, 4), | |
| "bugs_found": state.bugs_found, | |
| "total_bugs": state.total_bugs, | |
| "coverage_pct": state.coverage_pct, | |
| "steps": step, | |
| } | |
| async def main_async(args): | |
| tasks = ["basic_validation", "edge_cases", "security_workflows"] if args.task == "all" else [args.task] | |
| agents = list(AGENTS.values()) if args.agent == "all" else [AGENTS[args.agent]] | |
| results = [] | |
| for task_id in tasks: | |
| for agent_cls in agents: | |
| try: | |
| result = await run_episode(args.url, task_id, agent_cls, seed=args.seed) | |
| results.append(result) | |
| logger.info( | |
| f"\nRESULT: {result['agent']} on {result['task_id']}: " | |
| f"reward={result['total_reward']}, bugs={result['bugs_found']}/{result['total_bugs']}, " | |
| f"coverage={result['coverage_pct']:.1f}%" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error running {agent_cls.name} on {task_id}: {e}", exc_info=True) | |
| if results: | |
| print("\n" + "=" * 80) | |
| print("BASELINE RESULTS SUMMARY") | |
| print("=" * 80) | |
| print(f"{'Agent':<15} {'Task':<25} {'Score':<10} {'Bugs':<10} {'Coverage':<10}") | |
| print("-" * 80) | |
| for r in results: | |
| print( | |
| f"{r['agent']:<15} {r['task_id']:<25} " | |
| f"{r['total_reward']:<10.4f} " | |
| f"{r['bugs_found']}/{r['total_bugs']:<8} " | |
| f"{r['coverage_pct']:<10.1f}%" | |
| ) | |
| print("=" * 80) | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Baseline agents for API Testing Environment") | |
| parser.add_argument("--url", default="http://localhost:8000", help="Environment server URL") | |
| parser.add_argument("--task", default="all", | |
| choices=["basic_validation", "edge_cases", "security_workflows", "all"]) | |
| parser.add_argument("--agent", default="all", choices=["random", "sequential", "smart", "all"]) | |
| parser.add_argument("--seed", type=int, default=42) | |
| args = parser.parse_args() | |
| asyncio.run(main_async(args)) | |
| if __name__ == "__main__": | |
| main() | |