File size: 17,509 Bytes
a4f74f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
"""
Bug detection logic β€” checks if the agent's action/response pair reveals a planted bug.

Each bug has:
- A unique ID
- A severity level (easy/medium/hard)
- A detection function that checks action + response
"""

from dataclasses import dataclass
from typing import Any, Callable, Optional
import re


@dataclass
class Bug:
    id: str
    severity: str  # "easy", "medium", "hard"
    description: str
    category: str  # "status_code", "validation", "security", "data_integrity"
    owasp: str = ""  # OWASP API Security Top 10 (2023) category
    recommendation: str = ""  # Fix recommendation for bug bounty reports


@dataclass
class BugDetection:
    bug: Bug
    evidence: str  # Human-readable explanation of how the bug was detected


class BugDetector:
    """Detects planted bugs based on agent actions and API responses."""

    def __init__(self, task_id: str):
        self.task_id = task_id
        self._build_bug_registry()

    def _build_bug_registry(self):
        """Define all bugs with their detection logic."""
        self.bugs: dict[str, Bug] = {}
        self.detectors: dict[str, Callable] = {}

        # === EASY BUGS ===

        self._register_bug(
            Bug("BUG_TASK_01", "easy",
                "GET /tasks/{id} returns 200 with null for non-existent task",
                "status_code",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Return 404 Not Found for non-existent resources"),
            self._detect_null_response_for_missing_task,
        )
        self._register_bug(
            Bug("BUG_TASK_02", "easy",
                "POST /tasks with missing title returns 500 instead of 400/422",
                "validation",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Validate required fields and return 400/422 with descriptive error"),
            self._detect_missing_field_500,
        )
        self._register_bug(
            Bug("BUG_TASK_03", "easy",
                "GET /tasks?page=-1 returns 200 instead of 400",
                "validation",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Validate pagination parameters: page >= 1, limit > 0"),
            self._detect_negative_page,
        )

        # === MEDIUM BUGS ===

        self._register_bug(
            Bug("BUG_TASK_04", "medium",
                "PUT /tasks/{id} accepts invalid email format for assignee_email",
                "validation",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Validate email format with regex before accepting"),
            self._detect_invalid_email_accepted,
        )
        self._register_bug(
            Bug("BUG_TASK_05", "medium",
                "DELETE /tasks/{id} returns 200 for non-existent task",
                "status_code",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Check resource existence before deletion, return 404 if missing"),
            self._detect_delete_nonexistent_200,
        )
        self._register_bug(
            Bug("BUG_TASK_06", "medium",
                "GET /tasks?limit=999999 has no pagination cap",
                "validation",
                owasp="API4:2023 Unrestricted Resource Consumption",
                recommendation="Cap pagination limit at 100, reject values above maximum"),
            self._detect_no_pagination_cap,
        )
        self._register_bug(
            Bug("BUG_USER_01", "medium",
                "POST /users accepts invalid email format",
                "validation",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Validate email format server-side before creating user"),
            self._detect_user_invalid_email,
        )
        self._register_bug(
            Bug("BUG_USER_02", "medium",
                "POST /users response exposes password hash",
                "security",
                owasp="API3:2023 Broken Object Property Level Authorization",
                recommendation="Never return sensitive fields (password_hash) in API responses"),
            self._detect_password_hash_exposed,
        )
        self._register_bug(
            Bug("BUG_AUTH_02", "medium",
                "Login with empty password succeeds",
                "security",
                owasp="API2:2023 Broken Authentication",
                recommendation="Validate password is non-empty and verify against stored hash"),
            self._detect_empty_password_login,
        )

        # === HARD BUGS ===

        self._register_bug(
            Bug("BUG_TASK_07", "hard",
                "BOLA: User A can access User B's tasks without authorization check",
                "security",
                owasp="API1:2023 Broken Object Level Authorization",
                recommendation="Verify resource ownership: check task.owner_id matches authenticated user"),
            self._detect_bola,
        )
        self._register_bug(
            Bug("BUG_TASK_08", "hard",
                "POST /tasks with very long title (>5000 chars) causes 500",
                "validation",
                owasp="API4:2023 Unrestricted Resource Consumption",
                recommendation="Add input length validation: title max 200 chars"),
            self._detect_long_input_crash,
        )
        self._register_bug(
            Bug("BUG_TASK_09", "hard",
                "SQL injection payload in title is stored verbatim (content injection)",
                "security",
                owasp="API8:2023 Security Misconfiguration",
                recommendation="Sanitize user input before storage, escape HTML/SQL special characters"),
            self._detect_content_injection,
        )
        self._register_bug(
            Bug("BUG_AUTH_01", "hard",
                "Auth tokens not user-scoped: User A's token can modify User B's tasks",
                "security",
                owasp="API1:2023 Broken Object Level Authorization",
                recommendation="Enforce ownership check on all write operations (PUT/DELETE)"),
            self._detect_broken_auth,
        )

    def _register_bug(self, bug: Bug, detector: Callable):
        self.bugs[bug.id] = bug
        self.detectors[bug.id] = detector

    def get_bugs_for_task(self) -> list[Bug]:
        """Return bugs relevant to the current task."""
        if self.task_id == "basic_validation":
            return [self.bugs[bid] for bid in ["BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"]]
        elif self.task_id == "edge_cases":
            return [
                self.bugs[bid]
                for bid in [
                    "BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03",
                    "BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06",
                    "BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02",
                ]
            ]
        else:  # security_workflows
            return list(self.bugs.values())

    def check(
        self,
        method: str,
        endpoint: str,
        headers: dict,
        query_params: dict,
        body: Optional[dict],
        expected_status: Optional[int],
        response_status: int,
        response_body: Any,
        action_history: list[dict],
        found_bugs: set[str],
    ) -> Optional[BugDetection]:
        """Check if this action/response reveals a bug.

        Returns the first new bug detected, or None.
        """
        ctx = {
            "method": method.upper(),
            "endpoint": endpoint,
            "headers": headers,
            "query_params": query_params,
            "body": body,
            "expected_status": expected_status,
            "response_status": response_status,
            "response_body": response_body,
            "action_history": action_history,
        }

        for bug_id, detector in self.detectors.items():
            if bug_id in found_bugs:
                continue
            # Only check bugs relevant to this task
            task_bugs = {b.id for b in self.get_bugs_for_task()}
            if bug_id not in task_bugs:
                continue
            result = detector(ctx)
            if result:
                return BugDetection(bug=self.bugs[bug_id], evidence=result)

        return None

    # === DETECTION FUNCTIONS ===

    def _detect_null_response_for_missing_task(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "GET"
            and re.match(r"^/tasks/\d+$", ctx["endpoint"])
            and ctx["response_status"] == 200
            and ctx["response_body"] is None
        ):
            task_id = ctx["endpoint"].split("/")[-1]
            return f"GET /tasks/{task_id} returned 200 with null body β€” should be 404"
        return None

    def _detect_missing_field_500(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/tasks"
            and ctx["response_status"] == 500
            and ctx["body"] is not None
            and "title" not in ctx["body"]
        ):
            return "POST /tasks with missing 'title' returned 500 β€” should be 400 or 422"
        return None

    def _detect_negative_page(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "GET"
            and ctx["endpoint"] == "/tasks"
            and ctx["query_params"].get("page") is not None
        ):
            page = ctx["query_params"]["page"]
            try:
                page = int(page)
            except (ValueError, TypeError):
                return None
            if page < 1 and ctx["response_status"] == 200:
                return f"GET /tasks?page={page} returned 200 β€” should be 400 for invalid page"
        return None

    def _detect_invalid_email_accepted(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "PUT"
            and re.match(r"^/tasks/\d+$", ctx["endpoint"])
            and ctx["body"]
            and "assignee_email" in ctx["body"]
            and ctx["response_status"] in (200, 201)
        ):
            email = ctx["body"]["assignee_email"]
            if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
                return f"PUT accepted invalid email '{email}' without validation"
        return None

    def _detect_delete_nonexistent_200(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "DELETE"
            and re.match(r"^/tasks/\d+$", ctx["endpoint"])
            and ctx["response_status"] == 200
        ):
            task_id = int(ctx["endpoint"].split("/")[-1])
            # Check if this task was never created (ID > 1000 is a safe bet for non-existent)
            if task_id > 100:
                return f"DELETE /tasks/{task_id} returned 200 for non-existent task β€” should be 404"
        return None

    def _detect_no_pagination_cap(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "GET"
            and ctx["endpoint"] == "/tasks"
            and ctx["response_status"] == 200
        ):
            limit = ctx["query_params"].get("limit")
            if limit is not None:
                try:
                    limit = int(limit)
                except (ValueError, TypeError):
                    return None
                if limit > 1000:
                    return f"GET /tasks?limit={limit} accepted without pagination cap β€” potential DoS"
        return None

    def _detect_user_invalid_email(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/users"
            and ctx["body"]
            and "email" in ctx["body"]
            and ctx["response_status"] == 201
        ):
            email = ctx["body"]["email"]
            if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
                return f"POST /users accepted invalid email '{email}'"
        return None

    def _detect_password_hash_exposed(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/users"
            and ctx["response_status"] == 201
            and isinstance(ctx["response_body"], dict)
        ):
            if "password_hash" in ctx["response_body"]:
                return "POST /users response exposes password_hash field β€” security vulnerability"
        return None

    def _detect_empty_password_login(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/auth/login"
            and ctx["body"]
            and ctx["response_status"] == 200
        ):
            password = ctx["body"].get("password", "NOTEMPTY")
            if password == "" or password is None:
                return "Login with empty password succeeded β€” authentication bypass"
        return None

    def _detect_bola(self, ctx: dict) -> Optional[str]:
        """Detect if agent tested cross-user resource access."""
        if (
            ctx["method"] == "GET"
            and re.match(r"^/tasks/\d+$", ctx["endpoint"])
            and ctx["response_status"] == 200
            and ctx["response_body"] is not None
            and isinstance(ctx["response_body"], dict)
            and ctx["headers"].get("Authorization")
        ):
            # Check if the agent logged in as a different user and accessed another's task
            for prev in reversed(ctx["action_history"]):
                if (
                    prev.get("method") == "POST"
                    and prev.get("endpoint") == "/auth/login"
                    and prev.get("response_status") == 200
                    and isinstance(prev.get("response_body"), dict)
                ):
                    login_user_id = prev["response_body"].get("user_id")
                    task_owner_id = ctx["response_body"].get("owner_id")
                    if (
                        login_user_id is not None
                        and task_owner_id is not None
                        and login_user_id != task_owner_id
                    ):
                        return (
                            f"User {login_user_id} accessed task owned by user {task_owner_id} "
                            f"β€” BOLA/IDOR vulnerability (no authorization check)"
                        )
        return None

    def _detect_long_input_crash(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/tasks"
            and ctx["body"]
            and ctx["response_status"] == 500
        ):
            title = ctx["body"].get("title", "")
            if isinstance(title, str) and len(title) > 5000:
                return f"POST /tasks with title length {len(title)} caused 500 β€” no input length validation"
        return None

    def _detect_content_injection(self, ctx: dict) -> Optional[str]:
        if (
            ctx["method"] == "POST"
            and ctx["endpoint"] == "/tasks"
            and ctx["body"]
            and ctx["response_status"] == 201
            and isinstance(ctx["response_body"], dict)
        ):
            title = ctx["body"].get("title", "")
            injection_patterns = [
                "DROP TABLE", "DELETE FROM", "<script>", "javascript:",
                "'; --", "\" OR 1=1", "UNION SELECT",
            ]
            for pattern in injection_patterns:
                if pattern.lower() in str(title).lower():
                    stored_title = ctx["response_body"].get("title", "")
                    if pattern.lower() in str(stored_title).lower():
                        return (
                            f"Injection payload '{pattern}' in title was stored verbatim "
                            f"β€” no input sanitization (content injection)"
                        )
        return None

    def _detect_broken_auth(self, ctx: dict) -> Optional[str]:
        """Detect if agent successfully modified another user's task with their own token."""
        if (
            ctx["method"] in ("PUT", "DELETE")
            and re.match(r"^/tasks/\d+$", ctx["endpoint"])
            and ctx["response_status"] == 200
            and ctx["headers"].get("Authorization")
        ):
            for prev in reversed(ctx["action_history"]):
                if (
                    prev.get("method") == "POST"
                    and prev.get("endpoint") == "/auth/login"
                    and prev.get("response_status") == 200
                    and isinstance(prev.get("response_body"), dict)
                ):
                    login_user_id = prev["response_body"].get("user_id")
                    # Check if the task belonged to a different user
                    task_id = int(ctx["endpoint"].split("/")[-1])
                    if isinstance(ctx["response_body"], dict):
                        task_owner = ctx["response_body"].get("owner_id")
                        if (
                            login_user_id is not None
                            and task_owner is not None
                            and login_user_id != task_owner
                        ):
                            return (
                                f"User {login_user_id}'s token modified task owned by user {task_owner} "
                                f"β€” broken authorization"
                            )
                    break
        return None