Spaces:
Sleeping
Sleeping
File size: 10,406 Bytes
8c391c7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Literal
Difficulty = Literal["easy", "medium", "hard"]
Severity = Literal["low", "medium", "high", "critical"]
@dataclass(frozen=True)
class VulnerabilitySpec:
id: str
filename: str
line: int
vuln_type: str
severity: Severity
title: str
rationale: str
@dataclass(frozen=True)
class TaskSpec:
id: str
title: str
difficulty: Difficulty
objective: str
max_steps: int
repository: Dict[str, str]
vulnerabilities: List[VulnerabilitySpec]
TASKS: Dict[str, TaskSpec] = {
"easy": TaskSpec(
id="easy",
title="Legacy Flask Patch Review",
difficulty="easy",
objective=(
"Audit a small Flask service before release. Identify concrete "
"security vulnerabilities with accurate location, type, and severity."
),
max_steps=12,
repository={
"app/routes.py": """from flask import Blueprint, request, jsonify\nfrom app.db import db\n\nbp = Blueprint('api', __name__)\n\n@bp.get('/user')\ndef get_user():\n user_id = request.args.get('id', '')\n query = f\"SELECT id, email, role FROM users WHERE id = '{user_id}'\"\n row = db.execute(query).fetchone()\n return jsonify(dict(row) if row else {})\n\n@bp.post('/login')\ndef login():\n payload = request.json or {}\n if payload.get('token') == 'letmein':\n return jsonify({'ok': True})\n return jsonify({'ok': False}), 401\n""",
"app/config.py": """import os\n\nclass Config:\n DEBUG = os.getenv('DEBUG', '0') == '1'\n SECRET_KEY = 'prod-secret-2026'\n DB_URL = os.getenv('DB_URL', 'postgresql://localhost/app')\n""",
"app/db.py": """import sqlite3\n\n_conn = sqlite3.connect(':memory:', check_same_thread=False)\n_conn.execute('CREATE TABLE IF NOT EXISTS users (id TEXT, email TEXT, role TEXT)')\n\ndef execute(query: str):\n return _conn.execute(query)\n\nclass DB:\n execute = staticmethod(execute)\n\ndb = DB()\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="E-01",
filename="app/routes.py",
line=8,
vuln_type="sql_injection",
severity="high",
title="Unsanitized SQL query with user-controlled id",
rationale="Direct string interpolation builds SQL using request input.",
),
VulnerabilitySpec(
id="E-02",
filename="app/config.py",
line=5,
vuln_type="hardcoded_secret",
severity="high",
title="Hardcoded production secret key",
rationale="Embedding secrets in code leaks credentials and breaks rotation.",
),
VulnerabilitySpec(
id="E-03",
filename="app/routes.py",
line=15,
vuln_type="weak_authentication",
severity="medium",
title="Backdoor static token for login",
rationale="Fixed token bypasses proper authentication controls.",
),
],
),
"medium": TaskSpec(
id="medium",
title="Payment Webhook Service",
difficulty="medium",
objective=(
"Review a webhook and export pipeline used by operations. Catch logic and "
"input-handling flaws that can be exploited in production."
),
max_steps=14,
repository={
"service/webhook.py": """import hmac\nimport hashlib\nfrom flask import request, jsonify\n\nSHARED_SECRET = 'whsec_test_123'\n\ndef verify_signature(body: bytes, signature: str) -> bool:\n digest = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()\n return digest == signature\n\ndef process_event(event: dict):\n return {'processed': event.get('id')}\n\ndef webhook_handler():\n body = request.data\n signature = request.headers.get('X-Signature', '')\n if not verify_signature(body, signature):\n return jsonify({'error': 'invalid signature'}), 401\n\n event = request.json or {}\n if event.get('debug'):\n # temporary bypass while partner migrates\n return jsonify({'ok': True, 'bypass': True})\n\n return jsonify(process_event(event))\n""",
"service/export.py": """import os\n\ndef build_export_cmd(job_name: str, output_dir: str) -> str:\n return f\"tar -czf {output_dir}/{job_name}.tgz /srv/data/{job_name}\"\n\ndef run_export(job_name: str, output_dir: str):\n cmd = build_export_cmd(job_name, output_dir)\n os.system(cmd)\n return {'status': 'ok', 'cmd': cmd}\n""",
"service/serializers.py": """import yaml\n\ndef load_template(raw: str):\n return yaml.load(raw, Loader=yaml.Loader)\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="M-01",
filename="service/webhook.py",
line=10,
vuln_type="weak_authentication",
severity="medium",
title="Timing-unsafe signature comparison",
rationale="Using == for secrets can leak comparison timing signal.",
),
VulnerabilitySpec(
id="M-02",
filename="service/webhook.py",
line=22,
vuln_type="weak_authentication",
severity="high",
title="Debug flag bypasses signature verification outcome",
rationale="Attacker-controlled debug field returns success without processing guards.",
),
VulnerabilitySpec(
id="M-03",
filename="service/export.py",
line=8,
vuln_type="command_injection",
severity="critical",
title="Unsanitized shell command execution",
rationale="User-controlled job_name/output_dir flow into os.system command string.",
),
VulnerabilitySpec(
id="M-04",
filename="service/serializers.py",
line=4,
vuln_type="insecure_deserialization",
severity="high",
title="Unsafe YAML loader",
rationale="yaml.Loader can construct arbitrary Python objects from attacker input.",
),
],
),
"hard": TaskSpec(
id="hard",
title="Enterprise Multi-Tenant API",
difficulty="hard",
objective=(
"Audit an API gateway handling tenants, files, and callback fetches. "
"Find high-impact vulnerabilities without flooding false positives."
),
max_steps=16,
repository={
"api/auth.py": """import base64\nimport json\nimport jwt\n\ndef issue_token(user_id: str, tenant_id: str):\n payload = {'sub': user_id, 'tenant': tenant_id, 'role': 'member'}\n return jwt.encode(payload, 'dev-key', algorithm='HS256')\n\ndef parse_token(token: str):\n header_b64 = token.split('.')[0] + '=='\n header = json.loads(base64.urlsafe_b64decode(header_b64).decode())\n if header.get('alg') == 'none':\n return json.loads(base64.urlsafe_b64decode(token.split('.')[1] + '==').decode())\n return jwt.decode(token, 'dev-key', algorithms=['HS256'])\n""",
"api/files.py": """from flask import request, jsonify\n\nFILES = {\n 'tenant-a': {'1': 'a-private-doc'},\n 'tenant-b': {'2': 'b-private-doc'},\n}\n\ndef get_file(user):\n file_id = request.args.get('file_id')\n tenant = request.args.get('tenant')\n data = FILES.get(tenant, {}).get(file_id)\n if not data:\n return jsonify({'error': 'not found'}), 404\n return jsonify({'file': data, 'tenant': tenant, 'user': user['sub']})\n""",
"api/fetcher.py": """import requests\n\ndef fetch_preview(url: str):\n response = requests.get(url, timeout=3)\n return {'status': response.status_code, 'body': response.text[:120]}\n""",
"api/storage.py": """from pathlib import Path\n\nBASE = Path('/srv/uploads')\n\ndef read_attachment(path_fragment: str) -> bytes:\n final_path = BASE / path_fragment\n return final_path.read_bytes()\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="H-01",
filename="api/auth.py",
line=12,
vuln_type="weak_authentication",
severity="critical",
title="Accepts unsigned JWT tokens when alg=none",
rationale="Token parser trusts attacker-controlled header and bypasses signature checks.",
),
VulnerabilitySpec(
id="H-02",
filename="api/files.py",
line=11,
vuln_type="weak_authentication",
severity="high",
title="Tenant access controlled by request parameter",
rationale="Requester can switch tenant query parameter and read cross-tenant data (IDOR).",
),
VulnerabilitySpec(
id="H-03",
filename="api/fetcher.py",
line=4,
vuln_type="ssrf",
severity="high",
title="Server-side fetch of arbitrary URL",
rationale="Attacker can query internal metadata endpoints through backend network path.",
),
VulnerabilitySpec(
id="H-04",
filename="api/storage.py",
line=6,
vuln_type="path_traversal",
severity="critical",
title="Unvalidated path join for file reads",
rationale="Path fragments containing .. can escape upload directory.",
),
],
),
}
SEVERITY_WEIGHTS = {
"low": 1.0,
"medium": 2.0,
"high": 3.0,
"critical": 4.0,
}
TARGET_CONFIDENCE = {
"low": 0.55,
"medium": 0.65,
"high": 0.8,
"critical": 0.9,
}
def get_task(task_id: str) -> TaskSpec:
if task_id not in TASKS:
raise KeyError(f"Unknown task_id '{task_id}'. Available: {', '.join(sorted(TASKS))}")
return TASKS[task_id]
def list_task_ids() -> List[str]:
return sorted(TASKS.keys())
|