CodeSecure / server /tasks.py
Drac0528's picture
Upload 35 files
8c391c7 verified
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Literal
Difficulty = Literal["easy", "medium", "hard"]
Severity = Literal["low", "medium", "high", "critical"]
@dataclass(frozen=True)
class VulnerabilitySpec:
id: str
filename: str
line: int
vuln_type: str
severity: Severity
title: str
rationale: str
@dataclass(frozen=True)
class TaskSpec:
id: str
title: str
difficulty: Difficulty
objective: str
max_steps: int
repository: Dict[str, str]
vulnerabilities: List[VulnerabilitySpec]
TASKS: Dict[str, TaskSpec] = {
"easy": TaskSpec(
id="easy",
title="Legacy Flask Patch Review",
difficulty="easy",
objective=(
"Audit a small Flask service before release. Identify concrete "
"security vulnerabilities with accurate location, type, and severity."
),
max_steps=12,
repository={
"app/routes.py": """from flask import Blueprint, request, jsonify\nfrom app.db import db\n\nbp = Blueprint('api', __name__)\n\n@bp.get('/user')\ndef get_user():\n user_id = request.args.get('id', '')\n query = f\"SELECT id, email, role FROM users WHERE id = '{user_id}'\"\n row = db.execute(query).fetchone()\n return jsonify(dict(row) if row else {})\n\n@bp.post('/login')\ndef login():\n payload = request.json or {}\n if payload.get('token') == 'letmein':\n return jsonify({'ok': True})\n return jsonify({'ok': False}), 401\n""",
"app/config.py": """import os\n\nclass Config:\n DEBUG = os.getenv('DEBUG', '0') == '1'\n SECRET_KEY = 'prod-secret-2026'\n DB_URL = os.getenv('DB_URL', 'postgresql://localhost/app')\n""",
"app/db.py": """import sqlite3\n\n_conn = sqlite3.connect(':memory:', check_same_thread=False)\n_conn.execute('CREATE TABLE IF NOT EXISTS users (id TEXT, email TEXT, role TEXT)')\n\ndef execute(query: str):\n return _conn.execute(query)\n\nclass DB:\n execute = staticmethod(execute)\n\ndb = DB()\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="E-01",
filename="app/routes.py",
line=8,
vuln_type="sql_injection",
severity="high",
title="Unsanitized SQL query with user-controlled id",
rationale="Direct string interpolation builds SQL using request input.",
),
VulnerabilitySpec(
id="E-02",
filename="app/config.py",
line=5,
vuln_type="hardcoded_secret",
severity="high",
title="Hardcoded production secret key",
rationale="Embedding secrets in code leaks credentials and breaks rotation.",
),
VulnerabilitySpec(
id="E-03",
filename="app/routes.py",
line=15,
vuln_type="weak_authentication",
severity="medium",
title="Backdoor static token for login",
rationale="Fixed token bypasses proper authentication controls.",
),
],
),
"medium": TaskSpec(
id="medium",
title="Payment Webhook Service",
difficulty="medium",
objective=(
"Review a webhook and export pipeline used by operations. Catch logic and "
"input-handling flaws that can be exploited in production."
),
max_steps=14,
repository={
"service/webhook.py": """import hmac\nimport hashlib\nfrom flask import request, jsonify\n\nSHARED_SECRET = 'whsec_test_123'\n\ndef verify_signature(body: bytes, signature: str) -> bool:\n digest = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()\n return digest == signature\n\ndef process_event(event: dict):\n return {'processed': event.get('id')}\n\ndef webhook_handler():\n body = request.data\n signature = request.headers.get('X-Signature', '')\n if not verify_signature(body, signature):\n return jsonify({'error': 'invalid signature'}), 401\n\n event = request.json or {}\n if event.get('debug'):\n # temporary bypass while partner migrates\n return jsonify({'ok': True, 'bypass': True})\n\n return jsonify(process_event(event))\n""",
"service/export.py": """import os\n\ndef build_export_cmd(job_name: str, output_dir: str) -> str:\n return f\"tar -czf {output_dir}/{job_name}.tgz /srv/data/{job_name}\"\n\ndef run_export(job_name: str, output_dir: str):\n cmd = build_export_cmd(job_name, output_dir)\n os.system(cmd)\n return {'status': 'ok', 'cmd': cmd}\n""",
"service/serializers.py": """import yaml\n\ndef load_template(raw: str):\n return yaml.load(raw, Loader=yaml.Loader)\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="M-01",
filename="service/webhook.py",
line=10,
vuln_type="weak_authentication",
severity="medium",
title="Timing-unsafe signature comparison",
rationale="Using == for secrets can leak comparison timing signal.",
),
VulnerabilitySpec(
id="M-02",
filename="service/webhook.py",
line=22,
vuln_type="weak_authentication",
severity="high",
title="Debug flag bypasses signature verification outcome",
rationale="Attacker-controlled debug field returns success without processing guards.",
),
VulnerabilitySpec(
id="M-03",
filename="service/export.py",
line=8,
vuln_type="command_injection",
severity="critical",
title="Unsanitized shell command execution",
rationale="User-controlled job_name/output_dir flow into os.system command string.",
),
VulnerabilitySpec(
id="M-04",
filename="service/serializers.py",
line=4,
vuln_type="insecure_deserialization",
severity="high",
title="Unsafe YAML loader",
rationale="yaml.Loader can construct arbitrary Python objects from attacker input.",
),
],
),
"hard": TaskSpec(
id="hard",
title="Enterprise Multi-Tenant API",
difficulty="hard",
objective=(
"Audit an API gateway handling tenants, files, and callback fetches. "
"Find high-impact vulnerabilities without flooding false positives."
),
max_steps=16,
repository={
"api/auth.py": """import base64\nimport json\nimport jwt\n\ndef issue_token(user_id: str, tenant_id: str):\n payload = {'sub': user_id, 'tenant': tenant_id, 'role': 'member'}\n return jwt.encode(payload, 'dev-key', algorithm='HS256')\n\ndef parse_token(token: str):\n header_b64 = token.split('.')[0] + '=='\n header = json.loads(base64.urlsafe_b64decode(header_b64).decode())\n if header.get('alg') == 'none':\n return json.loads(base64.urlsafe_b64decode(token.split('.')[1] + '==').decode())\n return jwt.decode(token, 'dev-key', algorithms=['HS256'])\n""",
"api/files.py": """from flask import request, jsonify\n\nFILES = {\n 'tenant-a': {'1': 'a-private-doc'},\n 'tenant-b': {'2': 'b-private-doc'},\n}\n\ndef get_file(user):\n file_id = request.args.get('file_id')\n tenant = request.args.get('tenant')\n data = FILES.get(tenant, {}).get(file_id)\n if not data:\n return jsonify({'error': 'not found'}), 404\n return jsonify({'file': data, 'tenant': tenant, 'user': user['sub']})\n""",
"api/fetcher.py": """import requests\n\ndef fetch_preview(url: str):\n response = requests.get(url, timeout=3)\n return {'status': response.status_code, 'body': response.text[:120]}\n""",
"api/storage.py": """from pathlib import Path\n\nBASE = Path('/srv/uploads')\n\ndef read_attachment(path_fragment: str) -> bytes:\n final_path = BASE / path_fragment\n return final_path.read_bytes()\n""",
},
vulnerabilities=[
VulnerabilitySpec(
id="H-01",
filename="api/auth.py",
line=12,
vuln_type="weak_authentication",
severity="critical",
title="Accepts unsigned JWT tokens when alg=none",
rationale="Token parser trusts attacker-controlled header and bypasses signature checks.",
),
VulnerabilitySpec(
id="H-02",
filename="api/files.py",
line=11,
vuln_type="weak_authentication",
severity="high",
title="Tenant access controlled by request parameter",
rationale="Requester can switch tenant query parameter and read cross-tenant data (IDOR).",
),
VulnerabilitySpec(
id="H-03",
filename="api/fetcher.py",
line=4,
vuln_type="ssrf",
severity="high",
title="Server-side fetch of arbitrary URL",
rationale="Attacker can query internal metadata endpoints through backend network path.",
),
VulnerabilitySpec(
id="H-04",
filename="api/storage.py",
line=6,
vuln_type="path_traversal",
severity="critical",
title="Unvalidated path join for file reads",
rationale="Path fragments containing .. can escape upload directory.",
),
],
),
}
SEVERITY_WEIGHTS = {
"low": 1.0,
"medium": 2.0,
"high": 3.0,
"critical": 4.0,
}
TARGET_CONFIDENCE = {
"low": 0.55,
"medium": 0.65,
"high": 0.8,
"critical": 0.9,
}
def get_task(task_id: str) -> TaskSpec:
if task_id not in TASKS:
raise KeyError(f"Unknown task_id '{task_id}'. Available: {', '.join(sorted(TASKS))}")
return TASKS[task_id]
def list_task_ids() -> List[str]:
return sorted(TASKS.keys())