File size: 6,578 Bytes
92e5c18
 
d1cfa81
 
 
 
92e5c18
 
 
 
 
 
 
 
 
 
 
 
 
0bbb422
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92e5c18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0bbb422
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92e5c18
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from dataclasses import dataclass

try:
    from ..models import ISSUE_TAXONOMY
except ImportError:
    from models import ISSUE_TAXONOMY


@dataclass(frozen=True)
class Task:
    task_id: str
    difficulty: str
    description: str
    file_name: str
    code: str
    planted_issues: list[str]


TASKS: dict[str, Task] = {
    "task_extra_easy": Task(
        task_id="task_extra_easy",
        difficulty="extra_easy",
        description=(
            "Review this Python function for a single common bug. "
            f"Allowed tags: {', '.join(ISSUE_TAXONOMY)}."
        ),
        file_name="data_utils.py",
        code=(
            "def get_last_element(items):\n"
            "    \"\"\"Return the last element of a list.\"\"\"\n"
            "    # BUG: does not check if list is empty first\n"
            "    last = items[len(items)]  # off-by-one: should be len(items) - 1\n"
            "    return last\n"
            "\n"
            "\n"
            "def compute_average(scores):\n"
            "    \"\"\"Compute average of a list of scores.\"\"\"\n"
            "    total = 0\n"
            "    for i in range(len(scores) + 1):  # iterates one past the end\n"
            "        total += scores[i]\n"
            "    return total / len(scores)\n"
        ),
        planted_issues=["index_out_of_bounds"],
    ),
    "task_easy": Task(
        task_id="task_easy",
        difficulty="easy",
        description=(
            "Review this Python function and identify issues using only taxonomy tags. "
            f"Allowed tags: {', '.join(ISSUE_TAXONOMY)}."
        ),
        file_name="user_service.py",
        code=(
            "def get_user_age(user):\n"
            "    # Returns age in years from user profile dict\n"
            "    birthdate = user.get(\"birthdate\")\n"
            "    if user.get(\"is_active\"):\n"
            "        account_label = f\"active:{user.get('id')}\"\n"
            "    else:\n"
            "        account_label = \"inactive\"\n"
            "\n"
            "    age = (datetime.now() - birthdate).days // 365\n"
            "    profile = {\"label\": account_label, \"age\": age}\n"
            "    # TODO: return something\n"
        ),
        planted_issues=["null_pointer", "missing_return"],
    ),
    "task_medium": Task(
        task_id="task_medium",
        difficulty="medium",
        description=(
            "Review this authentication module and return security-relevant tags from the taxonomy only. "
            f"Allowed tags: {', '.join(ISSUE_TAXONOMY)}."
        ),
        file_name="auth.py",
        code=(
            "SECRET_KEY = \"supersecret123\"   # used for JWT signing\n"
            "\n"
            "def authenticate_user(db_conn, username, password):\n"
            "    query = f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\"\n"
            "    result = db_conn.execute(query)\n"
            "    user = result.fetchone()\n"
            "\n"
            "    if user:\n"
            "        audit_line = f\"auth ok for {username}\"\n"
            "        token = jwt.encode({\"user_id\": user.id}, SECRET_KEY)\n"
            "        return token\n"
        ),
        planted_issues=["sql_injection", "hardcoded_secret"],
    ),
    "task_hard": Task(
        task_id="task_hard",
        difficulty="hard",
        description=(
            "Review this payment processing code for subtle concurrency, error-handling, and security flaws. "
            f"Use only taxonomy tags: {', '.join(ISSUE_TAXONOMY)}."
        ),
        file_name="payments.py",
        code=(
            "def process_payment(user_id, amount, card_token):\n"
            "    user = db.get_user(user_id)\n"
            "    if user.balance >= amount:\n"
            "        user.balance -= amount    # checked and modified non-atomically\n"
            "        db.save_user(user)\n"
            "\n"
            "        try:\n"
            "            charge_result = payment_gateway.charge(card_token, amount)\n"
            "        except:\n"
            "            pass  # silently swallow all payment errors\n"
            "\n"
            "        expected = db.get_token_hash(card_token)\n"
            "        actual = hash(card_token)\n"
            "        if expected == actual:    # non-constant-time comparison\n"
            "            return {\"status\": \"success\", \"charge\": charge_result}\n"
        ),
        planted_issues=["race_condition", "improper_error_handling", "timing_attack"],
    ),
    "task_expert": Task(
        task_id="task_expert",
        difficulty="expert",
        description=(
            "Review this file-processing pipeline for security, type-safety, and input-validation flaws. "
            "This task requires identifying multiple subtle interacting issues. "
            f"Use only taxonomy tags: {', '.join(ISSUE_TAXONOMY)}."
        ),
        file_name="file_processor.py",
        code=(
            "import os\n"
            "\n"
            "MAX_FILE_SIZE = 2 ** 31  # 2 GB limit\n"
            "\n"
            "def process_upload(user_input_path, file_size_str, content):\n"
            "    \"\"\"Process an uploaded file from the user.\"\"\"\n"
            "    # Construct output path directly from user input\n"
            "    output_path = os.path.join('/var/data/uploads', user_input_path)\n"
            "    # No check: user_input_path could be '../../etc/passwd'\n"
            "\n"
            "    # Parse file size from string header without validation\n"
            "    file_size = int(file_size_str)  # crashes on non-numeric input\n"
            "\n"
            "    # Integer overflow: if file_size_str is very large, wraps around\n"
            "    remaining_quota = MAX_FILE_SIZE - file_size  # can go negative\n"
            "    if remaining_quota > 0:\n"
            "        # No validation on content type or structure\n"
            "        with open(output_path, 'wb') as f:\n"
            "            f.write(content)  # writes arbitrary bytes without sanitization\n"
            "\n"
            "    total_written = file_size + len(content)  # may overflow for huge files\n"
            "    return {'path': output_path, 'bytes_written': total_written}\n"
        ),
        planted_issues=["path_traversal", "integer_overflow", "missing_input_validation", "type_error"],
    ),
}


def get_task(task_id: str) -> Task:
    """Return task by id, defaulting to task_easy for unknown ids."""
    return TASKS.get(task_id, TASKS["task_easy"])