from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Literal Difficulty = Literal["easy", "medium", "hard"] Severity = Literal["low", "medium", "high", "critical"] @dataclass(frozen=True) class VulnerabilitySpec: id: str filename: str line: int vuln_type: str severity: Severity title: str rationale: str @dataclass(frozen=True) class TaskSpec: id: str title: str difficulty: Difficulty objective: str max_steps: int repository: Dict[str, str] vulnerabilities: List[VulnerabilitySpec] TASKS: Dict[str, TaskSpec] = { "easy": TaskSpec( id="easy", title="Legacy Flask Patch Review", difficulty="easy", objective=( "Audit a small Flask service before release. Identify concrete " "security vulnerabilities with accurate location, type, and severity." ), max_steps=12, repository={ "app/routes.py": """from flask import Blueprint, request, jsonify\nfrom app.db import db\n\nbp = Blueprint('api', __name__)\n\n@bp.get('/user')\ndef get_user():\n user_id = request.args.get('id', '')\n query = f\"SELECT id, email, role FROM users WHERE id = '{user_id}'\"\n row = db.execute(query).fetchone()\n return jsonify(dict(row) if row else {})\n\n@bp.post('/login')\ndef login():\n payload = request.json or {}\n if payload.get('token') == 'letmein':\n return jsonify({'ok': True})\n return jsonify({'ok': False}), 401\n""", "app/config.py": """import os\n\nclass Config:\n DEBUG = os.getenv('DEBUG', '0') == '1'\n SECRET_KEY = 'prod-secret-2026'\n DB_URL = os.getenv('DB_URL', 'postgresql://localhost/app')\n""", "app/db.py": """import sqlite3\n\n_conn = sqlite3.connect(':memory:', check_same_thread=False)\n_conn.execute('CREATE TABLE IF NOT EXISTS users (id TEXT, email TEXT, role TEXT)')\n\ndef execute(query: str):\n return _conn.execute(query)\n\nclass DB:\n execute = staticmethod(execute)\n\ndb = DB()\n""", }, vulnerabilities=[ VulnerabilitySpec( id="E-01", filename="app/routes.py", line=8, vuln_type="sql_injection", severity="high", title="Unsanitized SQL query with user-controlled id", rationale="Direct string interpolation builds SQL using request input.", ), VulnerabilitySpec( id="E-02", filename="app/config.py", line=5, vuln_type="hardcoded_secret", severity="high", title="Hardcoded production secret key", rationale="Embedding secrets in code leaks credentials and breaks rotation.", ), VulnerabilitySpec( id="E-03", filename="app/routes.py", line=15, vuln_type="weak_authentication", severity="medium", title="Backdoor static token for login", rationale="Fixed token bypasses proper authentication controls.", ), ], ), "medium": TaskSpec( id="medium", title="Payment Webhook Service", difficulty="medium", objective=( "Review a webhook and export pipeline used by operations. Catch logic and " "input-handling flaws that can be exploited in production." ), max_steps=14, repository={ "service/webhook.py": """import hmac\nimport hashlib\nfrom flask import request, jsonify\n\nSHARED_SECRET = 'whsec_test_123'\n\ndef verify_signature(body: bytes, signature: str) -> bool:\n digest = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()\n return digest == signature\n\ndef process_event(event: dict):\n return {'processed': event.get('id')}\n\ndef webhook_handler():\n body = request.data\n signature = request.headers.get('X-Signature', '')\n if not verify_signature(body, signature):\n return jsonify({'error': 'invalid signature'}), 401\n\n event = request.json or {}\n if event.get('debug'):\n # temporary bypass while partner migrates\n return jsonify({'ok': True, 'bypass': True})\n\n return jsonify(process_event(event))\n""", "service/export.py": """import os\n\ndef build_export_cmd(job_name: str, output_dir: str) -> str:\n return f\"tar -czf {output_dir}/{job_name}.tgz /srv/data/{job_name}\"\n\ndef run_export(job_name: str, output_dir: str):\n cmd = build_export_cmd(job_name, output_dir)\n os.system(cmd)\n return {'status': 'ok', 'cmd': cmd}\n""", "service/serializers.py": """import yaml\n\ndef load_template(raw: str):\n return yaml.load(raw, Loader=yaml.Loader)\n""", }, vulnerabilities=[ VulnerabilitySpec( id="M-01", filename="service/webhook.py", line=10, vuln_type="weak_authentication", severity="medium", title="Timing-unsafe signature comparison", rationale="Using == for secrets can leak comparison timing signal.", ), VulnerabilitySpec( id="M-02", filename="service/webhook.py", line=22, vuln_type="weak_authentication", severity="high", title="Debug flag bypasses signature verification outcome", rationale="Attacker-controlled debug field returns success without processing guards.", ), VulnerabilitySpec( id="M-03", filename="service/export.py", line=8, vuln_type="command_injection", severity="critical", title="Unsanitized shell command execution", rationale="User-controlled job_name/output_dir flow into os.system command string.", ), VulnerabilitySpec( id="M-04", filename="service/serializers.py", line=4, vuln_type="insecure_deserialization", severity="high", title="Unsafe YAML loader", rationale="yaml.Loader can construct arbitrary Python objects from attacker input.", ), ], ), "hard": TaskSpec( id="hard", title="Enterprise Multi-Tenant API", difficulty="hard", objective=( "Audit an API gateway handling tenants, files, and callback fetches. " "Find high-impact vulnerabilities without flooding false positives." ), max_steps=16, repository={ "api/auth.py": """import base64\nimport json\nimport jwt\n\ndef issue_token(user_id: str, tenant_id: str):\n payload = {'sub': user_id, 'tenant': tenant_id, 'role': 'member'}\n return jwt.encode(payload, 'dev-key', algorithm='HS256')\n\ndef parse_token(token: str):\n header_b64 = token.split('.')[0] + '=='\n header = json.loads(base64.urlsafe_b64decode(header_b64).decode())\n if header.get('alg') == 'none':\n return json.loads(base64.urlsafe_b64decode(token.split('.')[1] + '==').decode())\n return jwt.decode(token, 'dev-key', algorithms=['HS256'])\n""", "api/files.py": """from flask import request, jsonify\n\nFILES = {\n 'tenant-a': {'1': 'a-private-doc'},\n 'tenant-b': {'2': 'b-private-doc'},\n}\n\ndef get_file(user):\n file_id = request.args.get('file_id')\n tenant = request.args.get('tenant')\n data = FILES.get(tenant, {}).get(file_id)\n if not data:\n return jsonify({'error': 'not found'}), 404\n return jsonify({'file': data, 'tenant': tenant, 'user': user['sub']})\n""", "api/fetcher.py": """import requests\n\ndef fetch_preview(url: str):\n response = requests.get(url, timeout=3)\n return {'status': response.status_code, 'body': response.text[:120]}\n""", "api/storage.py": """from pathlib import Path\n\nBASE = Path('/srv/uploads')\n\ndef read_attachment(path_fragment: str) -> bytes:\n final_path = BASE / path_fragment\n return final_path.read_bytes()\n""", }, vulnerabilities=[ VulnerabilitySpec( id="H-01", filename="api/auth.py", line=12, vuln_type="weak_authentication", severity="critical", title="Accepts unsigned JWT tokens when alg=none", rationale="Token parser trusts attacker-controlled header and bypasses signature checks.", ), VulnerabilitySpec( id="H-02", filename="api/files.py", line=11, vuln_type="weak_authentication", severity="high", title="Tenant access controlled by request parameter", rationale="Requester can switch tenant query parameter and read cross-tenant data (IDOR).", ), VulnerabilitySpec( id="H-03", filename="api/fetcher.py", line=4, vuln_type="ssrf", severity="high", title="Server-side fetch of arbitrary URL", rationale="Attacker can query internal metadata endpoints through backend network path.", ), VulnerabilitySpec( id="H-04", filename="api/storage.py", line=6, vuln_type="path_traversal", severity="critical", title="Unvalidated path join for file reads", rationale="Path fragments containing .. can escape upload directory.", ), ], ), } SEVERITY_WEIGHTS = { "low": 1.0, "medium": 2.0, "high": 3.0, "critical": 4.0, } TARGET_CONFIDENCE = { "low": 0.55, "medium": 0.65, "high": 0.8, "critical": 0.9, } def get_task(task_id: str) -> TaskSpec: if task_id not in TASKS: raise KeyError(f"Unknown task_id '{task_id}'. Available: {', '.join(sorted(TASKS))}") return TASKS[task_id] def list_task_ids() -> List[str]: return sorted(TASKS.keys())