File size: 15,026 Bytes
ff9fcbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
"""
Task definitions for the Code Review Environment.
"""
from __future__ import annotations
from typing import List, Dict, Any

def _issue(line: int, filename: str, itype: str, severity: str, desc: str, fix: str = "") -> dict:
    return {
        "line_number": line,
        "filename": filename,
        "issue_type": itype,
        "severity": severity,
        "description": desc,
        "fix_suggestion": fix,
    }


_UTILS_CODE = """\
def calculate_average(numbers):
    \"\"\"Calculate the average of a list of numbers.\"\"\"
    if not numbers:
        return 0
    total = 0
    for i in range(len(numbers) + 1):
        total += numbers[i]
    return total / len(numbers)


def binary_search(arr, target):
    \"\"\"Search for target in sorted array. Returns index or -1.\"\"\"
    left, right = 0, len(arr)
    while left <= right:
        mid = (left + right) // 2
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1
    return -1


def count_words(text):
    \"\"\"Count word frequency in a text string.\"\"\"
    words = text.lower().split()
    counts = {}
    for word in words:
        if word in counts:
            counts[word] += 1
        else:
            counts[word] = 0
    return counts


def reverse_string(s):
    \"\"\"Return the reversed version of a string (no bug here).\"\"\"
    return s[::-1]
"""

TASK_BUG_DETECTION: Dict[str, Any] = {
    "task_id": "bug-detection",
    "difficulty": "easy",
    "description": (
        "Review this Python utility module for logical bugs and errors.\n"
        "The code contains several functions with subtle bugs that would cause\n"
        "incorrect results or crashes. Identify all issues with exact line numbers,\n"
        "issue type, severity, and a clear description of the problem.\n\n"
        "File to review: utils.py"
    ),
    "language": "python",
    "code_files": {
        "utils.py": _UTILS_CODE,
    },
    "ground_truth_issues": [
        _issue(
            6, "utils.py", "bug", "high",
            "Off-by-one error: range(len(numbers) + 1) iterates one past the end, "
            "causing IndexError on the last iteration.",
            "Change to: range(len(numbers))"
        ),
        _issue(
            13, "utils.py", "bug", "medium",
            "Binary search upper bound is wrong: right = len(arr) causes IndexError "
            "when accessing arr[mid] on a full array.",
            "Change to: right = len(arr) - 1"
        ),
        _issue(
            33, "utils.py", "bug", "low",
            "Word count initializes new entries to 0 instead of 1, so every word's "
            "count is underreported by 1.",
            "Change to: counts[word] = 1"
        ),
    ],
    "max_steps": 15,
    "hints": [
        "Look carefully at loop boundary conditions — are they off by one?",
        "The binary_search function has an issue with its initial right bound.",
        "Check how new keys are initialized in the word count dictionary.",
    ],
}


_APP_CODE = """\
import sqlite3
import os
import subprocess
from flask import Flask, request, render_template_string

app = Flask(__name__)

SECRET_KEY = "hardcoded_secret_key_123"
DB_PASSWORD = "admin123"


def get_db():
    return sqlite3.connect('users.db')


@app.route('/user/<username>')
def get_user(username):
    db = get_db()
    query = f"SELECT * FROM users WHERE username = '{username}'"
    result = db.execute(query).fetchone()
    return str(result)


@app.route('/search')
def search():
    term = request.args.get('term', '')
    template = f"<h1>Results for: {term}</h1>"
    return render_template_string(template)


@app.route('/file')
def read_file():
    filename = request.args.get('name', '')
    filepath = os.path.join('/data', filename)
    with open(filepath, 'r') as f:
        return f.read()


@app.route('/admin/delete', methods=['POST'])
def admin_delete():
    user_id = request.form.get('user_id')
    db = get_db()
    db.execute(f"DELETE FROM users WHERE id = {user_id}")
    db.commit()
    return "Deleted"


@app.route('/ping')
def ping():
    host = request.args.get('host', '')
    result = subprocess.run(f"ping -c 1 {host}", shell=True, capture_output=True)
    return result.stdout.decode()
"""

TASK_SECURITY_AUDIT: Dict[str, Any] = {
    "task_id": "security-audit",
    "difficulty": "medium",
    "description": (
        "Perform a security audit on this Flask web application.\n"
        "The code contains multiple OWASP Top-10 security vulnerabilities.\n"
        "Identify all security issues with their exact line numbers, severity ratings,\n"
        "and recommended fixes. Consider: injection attacks, broken authentication,\n"
        "sensitive data exposure, and improper input handling.\n\n"
        "File to review: app.py"
    ),
    "language": "python",
    "code_files": {
        "app.py": _APP_CODE,
    },
    "ground_truth_issues": [
        _issue(
            8, "app.py", "security", "high",
            "Hardcoded SECRET_KEY in source code. Anyone with repo access can forge sessions.",
            "Use: SECRET_KEY = os.environ.get('SECRET_KEY') and set it as an env var."
        ),
        _issue(
            9, "app.py", "security", "high",
            "Hardcoded database password in source code. Credentials should never be in code.",
            "Use: DB_PASSWORD = os.environ.get('DB_PASSWORD')"
        ),
        _issue(
            19, "app.py", "security", "critical",
            "SQL injection: username is interpolated directly into the query string. "
            "An attacker can supply username = \\' OR 1=1 -- to dump the database.",
            "Use parameterized queries: db.execute('SELECT * FROM users WHERE username = ?', (username,))"
        ),
        _issue(
            27, "app.py", "security", "high",
            "Cross-site scripting (XSS): user-supplied 'term' is rendered directly in an "
            "HTML template via render_template_string without escaping.",
            "Use flask.escape(term) or Markup.escape(term) before interpolating into HTML."
        ),
        _issue(
            34, "app.py", "security", "high",
            "Path traversal: os.path.join('/data', filename) does not prevent filenames "
            "like '../etc/passwd' from escaping the /data directory.",
            "Use: filename = os.path.basename(filename) and validate against an allowlist."
        ),
        _issue(
            40, "app.py", "security", "critical",
            "Missing authentication: the /admin/delete endpoint has no access control. "
            "Any unauthenticated user can delete records.",
            "Add @login_required decorator and check that request.user.is_admin is True."
        ),
        _issue(
            51, "app.py", "security", "critical",
            "Command injection: user-supplied 'host' is interpolated into a shell command "
            "with shell=True. Attacker can supply 'x; rm -rf /' to execute arbitrary commands.",
            "Use: subprocess.run(['ping', '-c', '1', host], shell=False) after validating host."
        ),
    ],
    "max_steps": 20,
    "hints": [
        "Look for hardcoded credentials and secrets at the top of the file.",
        "Check every place user input (request.args, request.form) touches a database query, "
        "template, file path, or shell command.",
        "The admin endpoint is missing an authorization check.",
    ],
}


_VIEWS_CODE = """\
import threading
from django.db import transaction
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from .models import Order, Product, Cart
import hashlib

_lock = threading.Lock()


@login_required
def place_order(request):
    user = request.user
    cart_items = Cart.objects.filter(user=user)

    if not cart_items.exists():
        return JsonResponse({'error': 'Cart is empty'}, status=400)

    total = 0
    for item in cart_items:
        product = Product.objects.get(id=item.product_id)
        total += product.price * item.quantity

    for item in cart_items:
        product = Product.objects.get(id=item.product_id)
        if product.stock < item.quantity:
            return JsonResponse({'error': f'Insufficient stock for {product.name}'}, status=400)

    order = Order.objects.create(
        user=user,
        total=total,
        status='pending'
    )

    for item in cart_items:
        product = Product.objects.get(id=item.product_id)
        product.stock -= item.quantity
        product.save()

    cart_items.delete()
    return JsonResponse({'order_id': order.id, 'total': float(total)})


@login_required
def get_order_history(request):
    page = int(request.GET.get('page', 1))
    per_page = int(request.GET.get('per_page', 10))

    orders = Order.objects.filter(user=request.user)[
        (page - 1) * per_page: page * per_page
    ]

    result = []
    for order in orders:
        result.append({
            'id': order.id,
            'total': order.total,
            'status': order.status,
        })

    return JsonResponse({'orders': result})


def verify_payment(order_id, payment_hash):
    order = Order.objects.get(id=order_id)
    expected = hashlib.md5(f"{order_id}{order.total}".encode()).hexdigest()
    return expected == payment_hash
"""

_MODELS_CODE = """\
from django.db import models
import pickle


class User(models.Model):
    username = models.CharField(max_length=150)
    email = models.CharField(max_length=255)
    password = models.CharField(max_length=255)

    class Meta:
        db_table = 'users'


class Product(models.Model):
    name = models.CharField(max_length=255)
    price = models.FloatField()
    stock = models.IntegerField(default=0)
    metadata = models.BinaryField()

    class Meta:
        db_table = 'products'


class Order(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    total = models.FloatField()
    status = models.CharField(max_length=50)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = 'orders'


class Cart(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    product_id = models.IntegerField()
    quantity = models.IntegerField()

    class Meta:
        db_table = 'cart'
"""

TASK_COMPREHENSIVE: Dict[str, Any] = {
    "task_id": "comprehensive-review",
    "difficulty": "hard",
    "description": (
        "Perform a comprehensive code review of this Django e-commerce API.\n"
        "The code spans two files and contains bugs, security vulnerabilities,\n"
        "performance issues, and data modeling problems.\n"
        "Find ALL issues across BOTH files. This is a hard task — look carefully\n"
        "for subtle architectural problems, not just surface-level issues.\n\n"
        "Files to review: views.py, models.py"
    ),
    "language": "python",
    "code_files": {
        "views.py": _VIEWS_CODE,
        "models.py": _MODELS_CODE,
    },
    "ground_truth_issues": [
        _issue(
            21, "views.py", "performance", "high",
            "N+1 query: Product.objects.get() is called inside a loop, issuing one SQL "
            "query per cart item. With 100 items this means 100 DB roundtrips.",
            "Use: Product.objects.filter(id__in=[i.product_id for i in cart_items]) "
            "then build a dict for O(1) lookup."
        ),
        _issue(
            26, "views.py", "bug", "critical",
            "Race condition: the stock check and stock decrement are not atomic. "
            "Two concurrent requests can both pass the check and oversell the product.",
            "Wrap in transaction.atomic() and use Product.objects.select_for_update() "
            "to lock rows during the check."
        ),
        _issue(
            29, "views.py", "bug", "high",
            "Order is created outside a database transaction. If stock decrement fails "
            "after the order is created, the database is left in an inconsistent state.",
            "Wrap the entire order creation flow in: with transaction.atomic():"
        ),
        _issue(
            47, "views.py", "security", "medium",
            "No maximum cap on per_page: an attacker can request per_page=1000000 "
            "to dump the entire orders table in one request, causing DoS or data leak.",
            "Add: per_page = min(int(request.GET.get('per_page', 10)), 100)"
        ),
        _issue(
            66, "views.py", "security", "medium",
            "MD5 is a cryptographically broken hash function and should not be used "
            "for payment verification. Collisions can be manufactured.",
            "Use HMAC-SHA256: hmac.new(SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest()"
        ),
        _issue(
            67, "views.py", "security", "medium",
            "Timing attack: string comparison with == leaks timing information that "
            "allows an attacker to forge valid hashes byte-by-byte.",
            "Use: hmac.compare_digest(expected, payment_hash) for constant-time comparison."
        ),
        _issue(
            8, "models.py", "security", "critical",
            "Plaintext password storage: passwords are stored as raw strings in the "
            "database. Any DB breach immediately exposes all user passwords.",
            "Use Django's built-in: from django.contrib.auth.hashers import make_password, check_password"
        ),
        _issue(
            16, "models.py", "bug", "medium",
            "FloatField for monetary values causes floating-point precision errors "
            "(e.g., 0.1 + 0.2 != 0.3). This will produce wrong totals over time.",
            "Use: DecimalField(max_digits=10, decimal_places=2) for all monetary fields."
        ),
        _issue(
            18, "models.py", "security", "high",
            "BinaryField storing pickled data is dangerous: pickle.loads() on untrusted "
            "data can execute arbitrary code. Anyone who can write to this field can RCE.",
            "Use: JSONField() instead. If binary storage is required, validate/sign the data."
        ),
    ],
    "max_steps": 30,
    "hints": [
        "Look for database queries inside for loops — this is a classic N+1 problem.",
        "Check whether stock checks and order creation happen inside a database transaction.",
        "Look at models.py: how are passwords and monetary values stored?",
    ],
}


ALL_TASKS: Dict[str, Dict[str, Any]] = {
    TASK_BUG_DETECTION["task_id"]: TASK_BUG_DETECTION,
    TASK_SECURITY_AUDIT["task_id"]: TASK_SECURITY_AUDIT,
    TASK_COMPREHENSIVE["task_id"]: TASK_COMPREHENSIVE,
}

TASK_IDS: List[str] = list(ALL_TASKS.keys())


def get_task(task_id: str) -> Dict[str, Any]:
    """Return task definition by ID, raising KeyError if not found."""
    if task_id not in ALL_TASKS:
        raise KeyError(f"Unknown task_id '{task_id}'. Valid: {TASK_IDS}")
    return ALL_TASKS[task_id]