File size: 10,406 Bytes
8c391c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Literal

Difficulty = Literal["easy", "medium", "hard"]
Severity = Literal["low", "medium", "high", "critical"]


@dataclass(frozen=True)
class VulnerabilitySpec:
    id: str
    filename: str
    line: int
    vuln_type: str
    severity: Severity
    title: str
    rationale: str


@dataclass(frozen=True)
class TaskSpec:
    id: str
    title: str
    difficulty: Difficulty
    objective: str
    max_steps: int
    repository: Dict[str, str]
    vulnerabilities: List[VulnerabilitySpec]


TASKS: Dict[str, TaskSpec] = {
    "easy": TaskSpec(
        id="easy",
        title="Legacy Flask Patch Review",
        difficulty="easy",
        objective=(
            "Audit a small Flask service before release. Identify concrete "
            "security vulnerabilities with accurate location, type, and severity."
        ),
        max_steps=12,
        repository={
            "app/routes.py": """from flask import Blueprint, request, jsonify\nfrom app.db import db\n\nbp = Blueprint('api', __name__)\n\n@bp.get('/user')\ndef get_user():\n    user_id = request.args.get('id', '')\n    query = f\"SELECT id, email, role FROM users WHERE id = '{user_id}'\"\n    row = db.execute(query).fetchone()\n    return jsonify(dict(row) if row else {})\n\n@bp.post('/login')\ndef login():\n    payload = request.json or {}\n    if payload.get('token') == 'letmein':\n        return jsonify({'ok': True})\n    return jsonify({'ok': False}), 401\n""",
            "app/config.py": """import os\n\nclass Config:\n    DEBUG = os.getenv('DEBUG', '0') == '1'\n    SECRET_KEY = 'prod-secret-2026'\n    DB_URL = os.getenv('DB_URL', 'postgresql://localhost/app')\n""",
            "app/db.py": """import sqlite3\n\n_conn = sqlite3.connect(':memory:', check_same_thread=False)\n_conn.execute('CREATE TABLE IF NOT EXISTS users (id TEXT, email TEXT, role TEXT)')\n\ndef execute(query: str):\n    return _conn.execute(query)\n\nclass DB:\n    execute = staticmethod(execute)\n\ndb = DB()\n""",
        },
        vulnerabilities=[
            VulnerabilitySpec(
                id="E-01",
                filename="app/routes.py",
                line=8,
                vuln_type="sql_injection",
                severity="high",
                title="Unsanitized SQL query with user-controlled id",
                rationale="Direct string interpolation builds SQL using request input.",
            ),
            VulnerabilitySpec(
                id="E-02",
                filename="app/config.py",
                line=5,
                vuln_type="hardcoded_secret",
                severity="high",
                title="Hardcoded production secret key",
                rationale="Embedding secrets in code leaks credentials and breaks rotation.",
            ),
            VulnerabilitySpec(
                id="E-03",
                filename="app/routes.py",
                line=15,
                vuln_type="weak_authentication",
                severity="medium",
                title="Backdoor static token for login",
                rationale="Fixed token bypasses proper authentication controls.",
            ),
        ],
    ),
    "medium": TaskSpec(
        id="medium",
        title="Payment Webhook Service",
        difficulty="medium",
        objective=(
            "Review a webhook and export pipeline used by operations. Catch logic and "
            "input-handling flaws that can be exploited in production."
        ),
        max_steps=14,
        repository={
            "service/webhook.py": """import hmac\nimport hashlib\nfrom flask import request, jsonify\n\nSHARED_SECRET = 'whsec_test_123'\n\ndef verify_signature(body: bytes, signature: str) -> bool:\n    digest = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()\n    return digest == signature\n\ndef process_event(event: dict):\n    return {'processed': event.get('id')}\n\ndef webhook_handler():\n    body = request.data\n    signature = request.headers.get('X-Signature', '')\n    if not verify_signature(body, signature):\n        return jsonify({'error': 'invalid signature'}), 401\n\n    event = request.json or {}\n    if event.get('debug'):\n        # temporary bypass while partner migrates\n        return jsonify({'ok': True, 'bypass': True})\n\n    return jsonify(process_event(event))\n""",
            "service/export.py": """import os\n\ndef build_export_cmd(job_name: str, output_dir: str) -> str:\n    return f\"tar -czf {output_dir}/{job_name}.tgz /srv/data/{job_name}\"\n\ndef run_export(job_name: str, output_dir: str):\n    cmd = build_export_cmd(job_name, output_dir)\n    os.system(cmd)\n    return {'status': 'ok', 'cmd': cmd}\n""",
            "service/serializers.py": """import yaml\n\ndef load_template(raw: str):\n    return yaml.load(raw, Loader=yaml.Loader)\n""",
        },
        vulnerabilities=[
            VulnerabilitySpec(
                id="M-01",
                filename="service/webhook.py",
                line=10,
                vuln_type="weak_authentication",
                severity="medium",
                title="Timing-unsafe signature comparison",
                rationale="Using == for secrets can leak comparison timing signal.",
            ),
            VulnerabilitySpec(
                id="M-02",
                filename="service/webhook.py",
                line=22,
                vuln_type="weak_authentication",
                severity="high",
                title="Debug flag bypasses signature verification outcome",
                rationale="Attacker-controlled debug field returns success without processing guards.",
            ),
            VulnerabilitySpec(
                id="M-03",
                filename="service/export.py",
                line=8,
                vuln_type="command_injection",
                severity="critical",
                title="Unsanitized shell command execution",
                rationale="User-controlled job_name/output_dir flow into os.system command string.",
            ),
            VulnerabilitySpec(
                id="M-04",
                filename="service/serializers.py",
                line=4,
                vuln_type="insecure_deserialization",
                severity="high",
                title="Unsafe YAML loader",
                rationale="yaml.Loader can construct arbitrary Python objects from attacker input.",
            ),
        ],
    ),
    "hard": TaskSpec(
        id="hard",
        title="Enterprise Multi-Tenant API",
        difficulty="hard",
        objective=(
            "Audit an API gateway handling tenants, files, and callback fetches. "
            "Find high-impact vulnerabilities without flooding false positives."
        ),
        max_steps=16,
        repository={
            "api/auth.py": """import base64\nimport json\nimport jwt\n\ndef issue_token(user_id: str, tenant_id: str):\n    payload = {'sub': user_id, 'tenant': tenant_id, 'role': 'member'}\n    return jwt.encode(payload, 'dev-key', algorithm='HS256')\n\ndef parse_token(token: str):\n    header_b64 = token.split('.')[0] + '=='\n    header = json.loads(base64.urlsafe_b64decode(header_b64).decode())\n    if header.get('alg') == 'none':\n        return json.loads(base64.urlsafe_b64decode(token.split('.')[1] + '==').decode())\n    return jwt.decode(token, 'dev-key', algorithms=['HS256'])\n""",
            "api/files.py": """from flask import request, jsonify\n\nFILES = {\n    'tenant-a': {'1': 'a-private-doc'},\n    'tenant-b': {'2': 'b-private-doc'},\n}\n\ndef get_file(user):\n    file_id = request.args.get('file_id')\n    tenant = request.args.get('tenant')\n    data = FILES.get(tenant, {}).get(file_id)\n    if not data:\n        return jsonify({'error': 'not found'}), 404\n    return jsonify({'file': data, 'tenant': tenant, 'user': user['sub']})\n""",
            "api/fetcher.py": """import requests\n\ndef fetch_preview(url: str):\n    response = requests.get(url, timeout=3)\n    return {'status': response.status_code, 'body': response.text[:120]}\n""",
            "api/storage.py": """from pathlib import Path\n\nBASE = Path('/srv/uploads')\n\ndef read_attachment(path_fragment: str) -> bytes:\n    final_path = BASE / path_fragment\n    return final_path.read_bytes()\n""",
        },
        vulnerabilities=[
            VulnerabilitySpec(
                id="H-01",
                filename="api/auth.py",
                line=12,
                vuln_type="weak_authentication",
                severity="critical",
                title="Accepts unsigned JWT tokens when alg=none",
                rationale="Token parser trusts attacker-controlled header and bypasses signature checks.",
            ),
            VulnerabilitySpec(
                id="H-02",
                filename="api/files.py",
                line=11,
                vuln_type="weak_authentication",
                severity="high",
                title="Tenant access controlled by request parameter",
                rationale="Requester can switch tenant query parameter and read cross-tenant data (IDOR).",
            ),
            VulnerabilitySpec(
                id="H-03",
                filename="api/fetcher.py",
                line=4,
                vuln_type="ssrf",
                severity="high",
                title="Server-side fetch of arbitrary URL",
                rationale="Attacker can query internal metadata endpoints through backend network path.",
            ),
            VulnerabilitySpec(
                id="H-04",
                filename="api/storage.py",
                line=6,
                vuln_type="path_traversal",
                severity="critical",
                title="Unvalidated path join for file reads",
                rationale="Path fragments containing .. can escape upload directory.",
            ),
        ],
    ),
}

SEVERITY_WEIGHTS = {
    "low": 1.0,
    "medium": 2.0,
    "high": 3.0,
    "critical": 4.0,
}

TARGET_CONFIDENCE = {
    "low": 0.55,
    "medium": 0.65,
    "high": 0.8,
    "critical": 0.9,
}


def get_task(task_id: str) -> TaskSpec:
    if task_id not in TASKS:
        raise KeyError(f"Unknown task_id '{task_id}'. Available: {', '.join(sorted(TASKS))}")
    return TASKS[task_id]


def list_task_ids() -> List[str]:
    return sorted(TASKS.keys())