Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Literal | |
| Difficulty = Literal["easy", "medium", "hard"] | |
| Severity = Literal["low", "medium", "high", "critical"] | |
| class VulnerabilitySpec: | |
| id: str | |
| filename: str | |
| line: int | |
| vuln_type: str | |
| severity: Severity | |
| title: str | |
| rationale: str | |
| class TaskSpec: | |
| id: str | |
| title: str | |
| difficulty: Difficulty | |
| objective: str | |
| max_steps: int | |
| repository: Dict[str, str] | |
| vulnerabilities: List[VulnerabilitySpec] | |
| TASKS: Dict[str, TaskSpec] = { | |
| "easy": TaskSpec( | |
| id="easy", | |
| title="Legacy Flask Patch Review", | |
| difficulty="easy", | |
| objective=( | |
| "Audit a small Flask service before release. Identify concrete " | |
| "security vulnerabilities with accurate location, type, and severity." | |
| ), | |
| max_steps=12, | |
| repository={ | |
| "app/routes.py": """from flask import Blueprint, request, jsonify\nfrom app.db import db\n\nbp = Blueprint('api', __name__)\n\n@bp.get('/user')\ndef get_user():\n user_id = request.args.get('id', '')\n query = f\"SELECT id, email, role FROM users WHERE id = '{user_id}'\"\n row = db.execute(query).fetchone()\n return jsonify(dict(row) if row else {})\n\n@bp.post('/login')\ndef login():\n payload = request.json or {}\n if payload.get('token') == 'letmein':\n return jsonify({'ok': True})\n return jsonify({'ok': False}), 401\n""", | |
| "app/config.py": """import os\n\nclass Config:\n DEBUG = os.getenv('DEBUG', '0') == '1'\n SECRET_KEY = 'prod-secret-2026'\n DB_URL = os.getenv('DB_URL', 'postgresql://localhost/app')\n""", | |
| "app/db.py": """import sqlite3\n\n_conn = sqlite3.connect(':memory:', check_same_thread=False)\n_conn.execute('CREATE TABLE IF NOT EXISTS users (id TEXT, email TEXT, role TEXT)')\n\ndef execute(query: str):\n return _conn.execute(query)\n\nclass DB:\n execute = staticmethod(execute)\n\ndb = DB()\n""", | |
| }, | |
| vulnerabilities=[ | |
| VulnerabilitySpec( | |
| id="E-01", | |
| filename="app/routes.py", | |
| line=8, | |
| vuln_type="sql_injection", | |
| severity="high", | |
| title="Unsanitized SQL query with user-controlled id", | |
| rationale="Direct string interpolation builds SQL using request input.", | |
| ), | |
| VulnerabilitySpec( | |
| id="E-02", | |
| filename="app/config.py", | |
| line=5, | |
| vuln_type="hardcoded_secret", | |
| severity="high", | |
| title="Hardcoded production secret key", | |
| rationale="Embedding secrets in code leaks credentials and breaks rotation.", | |
| ), | |
| VulnerabilitySpec( | |
| id="E-03", | |
| filename="app/routes.py", | |
| line=15, | |
| vuln_type="weak_authentication", | |
| severity="medium", | |
| title="Backdoor static token for login", | |
| rationale="Fixed token bypasses proper authentication controls.", | |
| ), | |
| ], | |
| ), | |
| "medium": TaskSpec( | |
| id="medium", | |
| title="Payment Webhook Service", | |
| difficulty="medium", | |
| objective=( | |
| "Review a webhook and export pipeline used by operations. Catch logic and " | |
| "input-handling flaws that can be exploited in production." | |
| ), | |
| max_steps=14, | |
| repository={ | |
| "service/webhook.py": """import hmac\nimport hashlib\nfrom flask import request, jsonify\n\nSHARED_SECRET = 'whsec_test_123'\n\ndef verify_signature(body: bytes, signature: str) -> bool:\n digest = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()\n return digest == signature\n\ndef process_event(event: dict):\n return {'processed': event.get('id')}\n\ndef webhook_handler():\n body = request.data\n signature = request.headers.get('X-Signature', '')\n if not verify_signature(body, signature):\n return jsonify({'error': 'invalid signature'}), 401\n\n event = request.json or {}\n if event.get('debug'):\n # temporary bypass while partner migrates\n return jsonify({'ok': True, 'bypass': True})\n\n return jsonify(process_event(event))\n""", | |
| "service/export.py": """import os\n\ndef build_export_cmd(job_name: str, output_dir: str) -> str:\n return f\"tar -czf {output_dir}/{job_name}.tgz /srv/data/{job_name}\"\n\ndef run_export(job_name: str, output_dir: str):\n cmd = build_export_cmd(job_name, output_dir)\n os.system(cmd)\n return {'status': 'ok', 'cmd': cmd}\n""", | |
| "service/serializers.py": """import yaml\n\ndef load_template(raw: str):\n return yaml.load(raw, Loader=yaml.Loader)\n""", | |
| }, | |
| vulnerabilities=[ | |
| VulnerabilitySpec( | |
| id="M-01", | |
| filename="service/webhook.py", | |
| line=10, | |
| vuln_type="weak_authentication", | |
| severity="medium", | |
| title="Timing-unsafe signature comparison", | |
| rationale="Using == for secrets can leak comparison timing signal.", | |
| ), | |
| VulnerabilitySpec( | |
| id="M-02", | |
| filename="service/webhook.py", | |
| line=22, | |
| vuln_type="weak_authentication", | |
| severity="high", | |
| title="Debug flag bypasses signature verification outcome", | |
| rationale="Attacker-controlled debug field returns success without processing guards.", | |
| ), | |
| VulnerabilitySpec( | |
| id="M-03", | |
| filename="service/export.py", | |
| line=8, | |
| vuln_type="command_injection", | |
| severity="critical", | |
| title="Unsanitized shell command execution", | |
| rationale="User-controlled job_name/output_dir flow into os.system command string.", | |
| ), | |
| VulnerabilitySpec( | |
| id="M-04", | |
| filename="service/serializers.py", | |
| line=4, | |
| vuln_type="insecure_deserialization", | |
| severity="high", | |
| title="Unsafe YAML loader", | |
| rationale="yaml.Loader can construct arbitrary Python objects from attacker input.", | |
| ), | |
| ], | |
| ), | |
| "hard": TaskSpec( | |
| id="hard", | |
| title="Enterprise Multi-Tenant API", | |
| difficulty="hard", | |
| objective=( | |
| "Audit an API gateway handling tenants, files, and callback fetches. " | |
| "Find high-impact vulnerabilities without flooding false positives." | |
| ), | |
| max_steps=16, | |
| repository={ | |
| "api/auth.py": """import base64\nimport json\nimport jwt\n\ndef issue_token(user_id: str, tenant_id: str):\n payload = {'sub': user_id, 'tenant': tenant_id, 'role': 'member'}\n return jwt.encode(payload, 'dev-key', algorithm='HS256')\n\ndef parse_token(token: str):\n header_b64 = token.split('.')[0] + '=='\n header = json.loads(base64.urlsafe_b64decode(header_b64).decode())\n if header.get('alg') == 'none':\n return json.loads(base64.urlsafe_b64decode(token.split('.')[1] + '==').decode())\n return jwt.decode(token, 'dev-key', algorithms=['HS256'])\n""", | |
| "api/files.py": """from flask import request, jsonify\n\nFILES = {\n 'tenant-a': {'1': 'a-private-doc'},\n 'tenant-b': {'2': 'b-private-doc'},\n}\n\ndef get_file(user):\n file_id = request.args.get('file_id')\n tenant = request.args.get('tenant')\n data = FILES.get(tenant, {}).get(file_id)\n if not data:\n return jsonify({'error': 'not found'}), 404\n return jsonify({'file': data, 'tenant': tenant, 'user': user['sub']})\n""", | |
| "api/fetcher.py": """import requests\n\ndef fetch_preview(url: str):\n response = requests.get(url, timeout=3)\n return {'status': response.status_code, 'body': response.text[:120]}\n""", | |
| "api/storage.py": """from pathlib import Path\n\nBASE = Path('/srv/uploads')\n\ndef read_attachment(path_fragment: str) -> bytes:\n final_path = BASE / path_fragment\n return final_path.read_bytes()\n""", | |
| }, | |
| vulnerabilities=[ | |
| VulnerabilitySpec( | |
| id="H-01", | |
| filename="api/auth.py", | |
| line=12, | |
| vuln_type="weak_authentication", | |
| severity="critical", | |
| title="Accepts unsigned JWT tokens when alg=none", | |
| rationale="Token parser trusts attacker-controlled header and bypasses signature checks.", | |
| ), | |
| VulnerabilitySpec( | |
| id="H-02", | |
| filename="api/files.py", | |
| line=11, | |
| vuln_type="weak_authentication", | |
| severity="high", | |
| title="Tenant access controlled by request parameter", | |
| rationale="Requester can switch tenant query parameter and read cross-tenant data (IDOR).", | |
| ), | |
| VulnerabilitySpec( | |
| id="H-03", | |
| filename="api/fetcher.py", | |
| line=4, | |
| vuln_type="ssrf", | |
| severity="high", | |
| title="Server-side fetch of arbitrary URL", | |
| rationale="Attacker can query internal metadata endpoints through backend network path.", | |
| ), | |
| VulnerabilitySpec( | |
| id="H-04", | |
| filename="api/storage.py", | |
| line=6, | |
| vuln_type="path_traversal", | |
| severity="critical", | |
| title="Unvalidated path join for file reads", | |
| rationale="Path fragments containing .. can escape upload directory.", | |
| ), | |
| ], | |
| ), | |
| } | |
| SEVERITY_WEIGHTS = { | |
| "low": 1.0, | |
| "medium": 2.0, | |
| "high": 3.0, | |
| "critical": 4.0, | |
| } | |
| TARGET_CONFIDENCE = { | |
| "low": 0.55, | |
| "medium": 0.65, | |
| "high": 0.8, | |
| "critical": 0.9, | |
| } | |
| def get_task(task_id: str) -> TaskSpec: | |
| if task_id not in TASKS: | |
| raise KeyError(f"Unknown task_id '{task_id}'. Available: {', '.join(sorted(TASKS))}") | |
| return TASKS[task_id] | |
| def list_task_ids() -> List[str]: | |
| return sorted(TASKS.keys()) | |