| import asyncio |
| import json |
| import os |
| import re |
| import sys |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Tuple |
|
|
| from openai import OpenAI |
|
|
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
| try: |
| from server.environment import RedTeamPentestEnvironment |
| except Exception: |
| from .server.environment import RedTeamPentestEnvironment |
|
|
| try: |
| from models import RedTeamAction |
| except Exception: |
| from .models import RedTeamAction |
|
|
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:11434/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") |
| API_KEY = os.getenv("API_KEY", os.getenv("HF_TOKEN", "ollama")) |
|
|
| BENCHMARK = "redteam_pentest" |
| TASK_TOKENS = ["alpha", "bravo", "charlie"] |
| STEP_TOKENS = [ |
| "one", |
| "two", |
| "three", |
| "four", |
| "five", |
| "six", |
| "seven", |
| "eight", |
| "nine", |
| "ten", |
| "eleven", |
| "twelve", |
| "thirteen", |
| ] |
|
|
| TASKS: List[Dict[str, object]] = [ |
| {"index": 0, "required_steps": ["scan", "enumerate", "exploit"]}, |
| {"index": 1, "required_steps": ["scan", "enumerate", "exploit", "escalate"]}, |
| {"index": 2, "required_steps": ["scan", "enumerate", "exploit", "escalate", "c2", "cleanup"]}, |
| ] |
|
|
|
|
| def _normalize_reward(value: object) -> float: |
| try: |
| reward = float(value) |
| except (TypeError, ValueError): |
| return 0.10 |
| if reward != reward: |
| return 0.10 |
| return max(0.10, min(0.90, reward)) |
|
|
|
|
| def _normalize_error(error: Optional[str]) -> str: |
| if not error: |
| return "null" |
| return "_".join(str(error).strip().split()) or "null" |
|
|
|
|
| def log_start(task_id: str, env_name: str, model_name: str) -> None: |
| print(f"[START] task={task_id} env={env_name} model={model_name}", flush=True) |
|
|
|
|
| def log_step(step_label: str, action: str, reward: float, done: bool, error: Optional[str] = None) -> None: |
| err = _normalize_error(error) |
| print( |
| f"[STEP] step={step_label} action={action} reward={_normalize_reward(reward):.2f} " |
| f"done={str(done).lower()} error={err}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end(success: bool, rewards: List[float]) -> None: |
| safe_rewards = rewards if rewards else [0.10] |
| rewards_str = ",".join(f"{_normalize_reward(r):.2f}" for r in safe_rewards) |
| print(f"[END] success={str(success).lower()} rewards={rewards_str}", flush=True) |
|
|
|
|
| async def run_task( |
| client: Optional[OpenAI], |
| env: RedTeamPentestEnvironment, |
| task_meta: Dict[str, object], |
| global_step: int, |
| ) -> Tuple[List[float], int, bool, Dict[str, object]]: |
| task_id = TASK_TOKENS[int(task_meta["index"])] |
| episode_id = f"episode-{task_id}" |
| log_start(task_id, BENCHMARK, MODEL_NAME) |
|
|
| task_rewards: List[float] = [] |
| task_success = False |
| actions_taken: List[str] = [] |
| states_seen: List[str] = [] |
| flags_found: List[str] = [] |
|
|
| try: |
| env.task_index = int(task_meta["index"]) |
| env.reset(task_index=int(task_meta["index"]), episode_id=episode_id) |
| completed_steps: List[str] = [] |
| required_steps = list(task_meta["required_steps"]) |
| max_steps = len(required_steps) + 2 |
|
|
| for _ in range(max_steps): |
| remaining = [a for a in required_steps if a not in completed_steps] |
| if not remaining: |
| task_success = True |
| break |
|
|
| action_str = remaining[0] |
|
|
| if client is not None: |
| try: |
| user_prompt = f"Next pentest phase from {remaining}. Reply with one word only." |
| client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| { |
| "role": "system", |
| "content": "You are a penetration tester. Reply with one action word only.", |
| }, |
| {"role": "user", "content": user_prompt}, |
| ], |
| temperature=0.0, |
| max_tokens=16, |
| timeout=8, |
| ) |
| except Exception: |
| pass |
|
|
| action_str = remaining[0] |
| obs = env.step(RedTeamAction(action=action_str), episode_id=episode_id) |
|
|
| reward = 0.10 |
| try: |
| if getattr(obs, "reward", None) is not None: |
| reward = float(obs.reward) |
| reward = max(0.10, min(0.90, reward)) |
| except (TypeError, ValueError): |
| reward = 0.10 |
|
|
| done = bool(getattr(obs, "done", False)) |
| current_state = str(getattr(obs, "current_state", "")) |
| output_text = str(getattr(obs, "output", "")) |
|
|
| for flag in re.findall(r"FLAG\{[^\}]+\}", output_text): |
| if flag not in flags_found: |
| flags_found.append(flag) |
|
|
| if current_state not in ("INVALID", "ORDER_VIOLATION", "REPEAT") and action_str not in completed_steps: |
| completed_steps.append(action_str) |
| actions_taken.append(action_str) |
| states_seen.append(current_state) |
|
|
| step_label = STEP_TOKENS[min(global_step - 1, len(STEP_TOKENS) - 1)] |
| log_step(step_label, action_str, reward, done) |
| task_rewards.append(_normalize_reward(reward)) |
| global_step += 1 |
|
|
| if done: |
| task_success = True |
| break |
|
|
| except Exception as e: |
| print(f"# task error: {e}", flush=True) |
|
|
| log_end(task_success, task_rewards if task_rewards else [0.10]) |
| task_report = { |
| "task_id": task_id, |
| "episode_id": episode_id, |
| "required_steps": required_steps if "required_steps" in locals() else [], |
| "actions_taken": actions_taken, |
| "states_seen": states_seen, |
| "rewards": task_rewards if task_rewards else [0.10], |
| "success": task_success, |
| "ctf_solved": len(flags_found) > 0, |
| "flags_found": flags_found, |
| } |
| return task_rewards if task_rewards else [0.10], global_step, task_success, task_report |
|
|
|
|
| async def main() -> None: |
| client: Optional[OpenAI] |
| try: |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY, timeout=15) |
| except Exception: |
| client = None |
|
|
| env = RedTeamPentestEnvironment() |
| global_step = 1 |
| report_tasks: List[Dict[str, object]] = [] |
|
|
| for task_meta in TASKS: |
| try: |
| _, global_step, _, task_report = await run_task(client, env, task_meta, global_step) |
| report_tasks.append(task_report) |
| except Exception as e: |
| task_idx = int(task_meta.get("index", 0)) |
| fallback_task_id = TASK_TOKENS[task_idx] |
| log_start(fallback_task_id, BENCHMARK, MODEL_NAME) |
| print(f"# task wrapper error: {e}", flush=True) |
| log_end(False, [0.10]) |
| report_tasks.append( |
| { |
| "task_id": fallback_task_id, |
| "episode_id": f"episode-{fallback_task_id}", |
| "required_steps": list(task_meta.get("required_steps", [])), |
| "actions_taken": [], |
| "states_seen": [], |
| "rewards": [0.10], |
| "success": False, |
| "ctf_solved": False, |
| "flags_found": [], |
| } |
| ) |
|
|
| summary = { |
| "environment": "redteampentestlab", |
| "benchmark": BENCHMARK, |
| "model": MODEL_NAME, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "tasks": report_tasks, |
| "overall": { |
| "tasks_total": len(report_tasks), |
| "tasks_success": sum(1 for t in report_tasks if t.get("success") is True), |
| "ctf_solved": sum(1 for t in report_tasks if t.get("ctf_solved") is True), |
| "total_reward": round(sum(sum(float(r) for r in t.get("rewards", [])) for t in report_tasks), 4), |
| }, |
| } |
|
|
| with open("pentest_report.json", "w", encoding="utf-8") as f: |
| json.dump(summary, f, indent=2) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |