""" Bug detection logic — checks if the agent's action/response pair reveals a planted bug. Each bug has: - A unique ID - A severity level (easy/medium/hard) - A detection function that checks action + response """ from dataclasses import dataclass from typing import Any, Callable, Optional import re @dataclass class Bug: id: str severity: str # "easy", "medium", "hard" description: str category: str # "status_code", "validation", "security", "data_integrity" owasp: str = "" # OWASP API Security Top 10 (2023) category recommendation: str = "" # Fix recommendation for bug bounty reports @dataclass class BugDetection: bug: Bug evidence: str # Human-readable explanation of how the bug was detected class BugDetector: """Detects planted bugs based on agent actions and API responses.""" def __init__(self, task_id: str): self.task_id = task_id self._build_bug_registry() def _build_bug_registry(self): """Define all bugs with their detection logic.""" self.bugs: dict[str, Bug] = {} self.detectors: dict[str, Callable] = {} # === EASY BUGS === self._register_bug( Bug("BUG_TASK_01", "easy", "GET /tasks/{id} returns 200 with null for non-existent task", "status_code", owasp="API8:2023 Security Misconfiguration", recommendation="Return 404 Not Found for non-existent resources"), self._detect_null_response_for_missing_task, ) self._register_bug( Bug("BUG_TASK_02", "easy", "POST /tasks with missing title returns 500 instead of 400/422", "validation", owasp="API8:2023 Security Misconfiguration", recommendation="Validate required fields and return 400/422 with descriptive error"), self._detect_missing_field_500, ) self._register_bug( Bug("BUG_TASK_03", "easy", "GET /tasks?page=-1 returns 200 instead of 400", "validation", owasp="API8:2023 Security Misconfiguration", recommendation="Validate pagination parameters: page >= 1, limit > 0"), self._detect_negative_page, ) # === MEDIUM BUGS === self._register_bug( Bug("BUG_TASK_04", "medium", "PUT /tasks/{id} accepts invalid email format for assignee_email", "validation", owasp="API8:2023 Security Misconfiguration", recommendation="Validate email format with regex before accepting"), self._detect_invalid_email_accepted, ) self._register_bug( Bug("BUG_TASK_05", "medium", "DELETE /tasks/{id} returns 200 for non-existent task", "status_code", owasp="API8:2023 Security Misconfiguration", recommendation="Check resource existence before deletion, return 404 if missing"), self._detect_delete_nonexistent_200, ) self._register_bug( Bug("BUG_TASK_06", "medium", "GET /tasks?limit=999999 has no pagination cap", "validation", owasp="API4:2023 Unrestricted Resource Consumption", recommendation="Cap pagination limit at 100, reject values above maximum"), self._detect_no_pagination_cap, ) self._register_bug( Bug("BUG_USER_01", "medium", "POST /users accepts invalid email format", "validation", owasp="API8:2023 Security Misconfiguration", recommendation="Validate email format server-side before creating user"), self._detect_user_invalid_email, ) self._register_bug( Bug("BUG_USER_02", "medium", "POST /users response exposes password hash", "security", owasp="API3:2023 Broken Object Property Level Authorization", recommendation="Never return sensitive fields (password_hash) in API responses"), self._detect_password_hash_exposed, ) self._register_bug( Bug("BUG_AUTH_02", "medium", "Login with empty password succeeds", "security", owasp="API2:2023 Broken Authentication", recommendation="Validate password is non-empty and verify against stored hash"), self._detect_empty_password_login, ) # === HARD BUGS === self._register_bug( Bug("BUG_TASK_07", "hard", "BOLA: User A can access User B's tasks without authorization check", "security", owasp="API1:2023 Broken Object Level Authorization", recommendation="Verify resource ownership: check task.owner_id matches authenticated user"), self._detect_bola, ) self._register_bug( Bug("BUG_TASK_08", "hard", "POST /tasks with very long title (>5000 chars) causes 500", "validation", owasp="API4:2023 Unrestricted Resource Consumption", recommendation="Add input length validation: title max 200 chars"), self._detect_long_input_crash, ) self._register_bug( Bug("BUG_TASK_09", "hard", "SQL injection payload in title is stored verbatim (content injection)", "security", owasp="API8:2023 Security Misconfiguration", recommendation="Sanitize user input before storage, escape HTML/SQL special characters"), self._detect_content_injection, ) self._register_bug( Bug("BUG_AUTH_01", "hard", "Auth tokens not user-scoped: User A's token can modify User B's tasks", "security", owasp="API1:2023 Broken Object Level Authorization", recommendation="Enforce ownership check on all write operations (PUT/DELETE)"), self._detect_broken_auth, ) def _register_bug(self, bug: Bug, detector: Callable): self.bugs[bug.id] = bug self.detectors[bug.id] = detector def get_bugs_for_task(self) -> list[Bug]: """Return bugs relevant to the current task.""" if self.task_id == "basic_validation": return [self.bugs[bid] for bid in ["BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"]] elif self.task_id == "edge_cases": return [ self.bugs[bid] for bid in [ "BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03", "BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06", "BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02", ] ] else: # security_workflows return list(self.bugs.values()) def check( self, method: str, endpoint: str, headers: dict, query_params: dict, body: Optional[dict], expected_status: Optional[int], response_status: int, response_body: Any, action_history: list[dict], found_bugs: set[str], ) -> Optional[BugDetection]: """Check if this action/response reveals a bug. Returns the first new bug detected, or None. """ ctx = { "method": method.upper(), "endpoint": endpoint, "headers": headers, "query_params": query_params, "body": body, "expected_status": expected_status, "response_status": response_status, "response_body": response_body, "action_history": action_history, } for bug_id, detector in self.detectors.items(): if bug_id in found_bugs: continue # Only check bugs relevant to this task task_bugs = {b.id for b in self.get_bugs_for_task()} if bug_id not in task_bugs: continue result = detector(ctx) if result: return BugDetection(bug=self.bugs[bug_id], evidence=result) return None # === DETECTION FUNCTIONS === def _detect_null_response_for_missing_task(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "GET" and re.match(r"^/tasks/\d+$", ctx["endpoint"]) and ctx["response_status"] == 200 and ctx["response_body"] is None ): task_id = ctx["endpoint"].split("/")[-1] return f"GET /tasks/{task_id} returned 200 with null body — should be 404" return None def _detect_missing_field_500(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/tasks" and ctx["response_status"] == 500 and ctx["body"] is not None and "title" not in ctx["body"] ): return "POST /tasks with missing 'title' returned 500 — should be 400 or 422" return None def _detect_negative_page(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "GET" and ctx["endpoint"] == "/tasks" and ctx["query_params"].get("page") is not None ): page = ctx["query_params"]["page"] try: page = int(page) except (ValueError, TypeError): return None if page < 1 and ctx["response_status"] == 200: return f"GET /tasks?page={page} returned 200 — should be 400 for invalid page" return None def _detect_invalid_email_accepted(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "PUT" and re.match(r"^/tasks/\d+$", ctx["endpoint"]) and ctx["body"] and "assignee_email" in ctx["body"] and ctx["response_status"] in (200, 201) ): email = ctx["body"]["assignee_email"] if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email): return f"PUT accepted invalid email '{email}' without validation" return None def _detect_delete_nonexistent_200(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "DELETE" and re.match(r"^/tasks/\d+$", ctx["endpoint"]) and ctx["response_status"] == 200 ): task_id = int(ctx["endpoint"].split("/")[-1]) # Check if this task was never created (ID > 1000 is a safe bet for non-existent) if task_id > 100: return f"DELETE /tasks/{task_id} returned 200 for non-existent task — should be 404" return None def _detect_no_pagination_cap(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "GET" and ctx["endpoint"] == "/tasks" and ctx["response_status"] == 200 ): limit = ctx["query_params"].get("limit") if limit is not None: try: limit = int(limit) except (ValueError, TypeError): return None if limit > 1000: return f"GET /tasks?limit={limit} accepted without pagination cap — potential DoS" return None def _detect_user_invalid_email(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/users" and ctx["body"] and "email" in ctx["body"] and ctx["response_status"] == 201 ): email = ctx["body"]["email"] if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email): return f"POST /users accepted invalid email '{email}'" return None def _detect_password_hash_exposed(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/users" and ctx["response_status"] == 201 and isinstance(ctx["response_body"], dict) ): if "password_hash" in ctx["response_body"]: return "POST /users response exposes password_hash field — security vulnerability" return None def _detect_empty_password_login(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/auth/login" and ctx["body"] and ctx["response_status"] == 200 ): password = ctx["body"].get("password", "NOTEMPTY") if password == "" or password is None: return "Login with empty password succeeded — authentication bypass" return None def _detect_bola(self, ctx: dict) -> Optional[str]: """Detect if agent tested cross-user resource access.""" if ( ctx["method"] == "GET" and re.match(r"^/tasks/\d+$", ctx["endpoint"]) and ctx["response_status"] == 200 and ctx["response_body"] is not None and isinstance(ctx["response_body"], dict) and ctx["headers"].get("Authorization") ): # Check if the agent logged in as a different user and accessed another's task for prev in reversed(ctx["action_history"]): if ( prev.get("method") == "POST" and prev.get("endpoint") == "/auth/login" and prev.get("response_status") == 200 and isinstance(prev.get("response_body"), dict) ): login_user_id = prev["response_body"].get("user_id") task_owner_id = ctx["response_body"].get("owner_id") if ( login_user_id is not None and task_owner_id is not None and login_user_id != task_owner_id ): return ( f"User {login_user_id} accessed task owned by user {task_owner_id} " f"— BOLA/IDOR vulnerability (no authorization check)" ) return None def _detect_long_input_crash(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/tasks" and ctx["body"] and ctx["response_status"] == 500 ): title = ctx["body"].get("title", "") if isinstance(title, str) and len(title) > 5000: return f"POST /tasks with title length {len(title)} caused 500 — no input length validation" return None def _detect_content_injection(self, ctx: dict) -> Optional[str]: if ( ctx["method"] == "POST" and ctx["endpoint"] == "/tasks" and ctx["body"] and ctx["response_status"] == 201 and isinstance(ctx["response_body"], dict) ): title = ctx["body"].get("title", "") injection_patterns = [ "DROP TABLE", "DELETE FROM", "