Spaces:
Running
Running
| """ | |
| Bug detection logic β checks if the agent's action/response pair reveals a planted bug. | |
| Each bug has: | |
| - A unique ID | |
| - A severity level (easy/medium/hard) | |
| - A detection function that checks action + response | |
| """ | |
| from dataclasses import dataclass | |
| from typing import Any, Callable, Optional | |
| import re | |
| class Bug: | |
| id: str | |
| severity: str # "easy", "medium", "hard" | |
| description: str | |
| category: str # "status_code", "validation", "security", "data_integrity" | |
| owasp: str = "" # OWASP API Security Top 10 (2023) category | |
| recommendation: str = "" # Fix recommendation for bug bounty reports | |
| class BugDetection: | |
| bug: Bug | |
| evidence: str # Human-readable explanation of how the bug was detected | |
| class BugDetector: | |
| """Detects planted bugs based on agent actions and API responses.""" | |
| def __init__(self, task_id: str): | |
| self.task_id = task_id | |
| self._build_bug_registry() | |
| def _build_bug_registry(self): | |
| """Define all bugs with their detection logic.""" | |
| self.bugs: dict[str, Bug] = {} | |
| self.detectors: dict[str, Callable] = {} | |
| # === EASY BUGS === | |
| self._register_bug( | |
| Bug("BUG_TASK_01", "easy", | |
| "GET /tasks/{id} returns 200 with null for non-existent task", | |
| "status_code", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Return 404 Not Found for non-existent resources"), | |
| self._detect_null_response_for_missing_task, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_02", "easy", | |
| "POST /tasks with missing title returns 500 instead of 400/422", | |
| "validation", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Validate required fields and return 400/422 with descriptive error"), | |
| self._detect_missing_field_500, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_03", "easy", | |
| "GET /tasks?page=-1 returns 200 instead of 400", | |
| "validation", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Validate pagination parameters: page >= 1, limit > 0"), | |
| self._detect_negative_page, | |
| ) | |
| # === MEDIUM BUGS === | |
| self._register_bug( | |
| Bug("BUG_TASK_04", "medium", | |
| "PUT /tasks/{id} accepts invalid email format for assignee_email", | |
| "validation", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Validate email format with regex before accepting"), | |
| self._detect_invalid_email_accepted, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_05", "medium", | |
| "DELETE /tasks/{id} returns 200 for non-existent task", | |
| "status_code", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Check resource existence before deletion, return 404 if missing"), | |
| self._detect_delete_nonexistent_200, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_06", "medium", | |
| "GET /tasks?limit=999999 has no pagination cap", | |
| "validation", | |
| owasp="API4:2023 Unrestricted Resource Consumption", | |
| recommendation="Cap pagination limit at 100, reject values above maximum"), | |
| self._detect_no_pagination_cap, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_USER_01", "medium", | |
| "POST /users accepts invalid email format", | |
| "validation", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Validate email format server-side before creating user"), | |
| self._detect_user_invalid_email, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_USER_02", "medium", | |
| "POST /users response exposes password hash", | |
| "security", | |
| owasp="API3:2023 Broken Object Property Level Authorization", | |
| recommendation="Never return sensitive fields (password_hash) in API responses"), | |
| self._detect_password_hash_exposed, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_AUTH_02", "medium", | |
| "Login with empty password succeeds", | |
| "security", | |
| owasp="API2:2023 Broken Authentication", | |
| recommendation="Validate password is non-empty and verify against stored hash"), | |
| self._detect_empty_password_login, | |
| ) | |
| # === HARD BUGS === | |
| self._register_bug( | |
| Bug("BUG_TASK_07", "hard", | |
| "BOLA: User A can access User B's tasks without authorization check", | |
| "security", | |
| owasp="API1:2023 Broken Object Level Authorization", | |
| recommendation="Verify resource ownership: check task.owner_id matches authenticated user"), | |
| self._detect_bola, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_08", "hard", | |
| "POST /tasks with very long title (>5000 chars) causes 500", | |
| "validation", | |
| owasp="API4:2023 Unrestricted Resource Consumption", | |
| recommendation="Add input length validation: title max 200 chars"), | |
| self._detect_long_input_crash, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_TASK_09", "hard", | |
| "SQL injection payload in title is stored verbatim (content injection)", | |
| "security", | |
| owasp="API8:2023 Security Misconfiguration", | |
| recommendation="Sanitize user input before storage, escape HTML/SQL special characters"), | |
| self._detect_content_injection, | |
| ) | |
| self._register_bug( | |
| Bug("BUG_AUTH_01", "hard", | |
| "Auth tokens not user-scoped: User A's token can modify User B's tasks", | |
| "security", | |
| owasp="API1:2023 Broken Object Level Authorization", | |
| recommendation="Enforce ownership check on all write operations (PUT/DELETE)"), | |
| self._detect_broken_auth, | |
| ) | |
| def _register_bug(self, bug: Bug, detector: Callable): | |
| self.bugs[bug.id] = bug | |
| self.detectors[bug.id] = detector | |
| def get_bugs_for_task(self) -> list[Bug]: | |
| """Return bugs relevant to the current task.""" | |
| if self.task_id == "basic_validation": | |
| return [self.bugs[bid] for bid in ["BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"]] | |
| elif self.task_id == "edge_cases": | |
| return [ | |
| self.bugs[bid] | |
| for bid in [ | |
| "BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03", | |
| "BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06", | |
| "BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02", | |
| ] | |
| ] | |
| else: # security_workflows | |
| return list(self.bugs.values()) | |
| def check( | |
| self, | |
| method: str, | |
| endpoint: str, | |
| headers: dict, | |
| query_params: dict, | |
| body: Optional[dict], | |
| expected_status: Optional[int], | |
| response_status: int, | |
| response_body: Any, | |
| action_history: list[dict], | |
| found_bugs: set[str], | |
| ) -> Optional[BugDetection]: | |
| """Check if this action/response reveals a bug. | |
| Returns the first new bug detected, or None. | |
| """ | |
| ctx = { | |
| "method": method.upper(), | |
| "endpoint": endpoint, | |
| "headers": headers, | |
| "query_params": query_params, | |
| "body": body, | |
| "expected_status": expected_status, | |
| "response_status": response_status, | |
| "response_body": response_body, | |
| "action_history": action_history, | |
| } | |
| for bug_id, detector in self.detectors.items(): | |
| if bug_id in found_bugs: | |
| continue | |
| # Only check bugs relevant to this task | |
| task_bugs = {b.id for b in self.get_bugs_for_task()} | |
| if bug_id not in task_bugs: | |
| continue | |
| result = detector(ctx) | |
| if result: | |
| return BugDetection(bug=self.bugs[bug_id], evidence=result) | |
| return None | |
| # === DETECTION FUNCTIONS === | |
| def _detect_null_response_for_missing_task(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "GET" | |
| and re.match(r"^/tasks/\d+$", ctx["endpoint"]) | |
| and ctx["response_status"] == 200 | |
| and ctx["response_body"] is None | |
| ): | |
| task_id = ctx["endpoint"].split("/")[-1] | |
| return f"GET /tasks/{task_id} returned 200 with null body β should be 404" | |
| return None | |
| def _detect_missing_field_500(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/tasks" | |
| and ctx["response_status"] == 500 | |
| and ctx["body"] is not None | |
| and "title" not in ctx["body"] | |
| ): | |
| return "POST /tasks with missing 'title' returned 500 β should be 400 or 422" | |
| return None | |
| def _detect_negative_page(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "GET" | |
| and ctx["endpoint"] == "/tasks" | |
| and ctx["query_params"].get("page") is not None | |
| ): | |
| page = ctx["query_params"]["page"] | |
| try: | |
| page = int(page) | |
| except (ValueError, TypeError): | |
| return None | |
| if page < 1 and ctx["response_status"] == 200: | |
| return f"GET /tasks?page={page} returned 200 β should be 400 for invalid page" | |
| return None | |
| def _detect_invalid_email_accepted(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "PUT" | |
| and re.match(r"^/tasks/\d+$", ctx["endpoint"]) | |
| and ctx["body"] | |
| and "assignee_email" in ctx["body"] | |
| and ctx["response_status"] in (200, 201) | |
| ): | |
| email = ctx["body"]["assignee_email"] | |
| if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email): | |
| return f"PUT accepted invalid email '{email}' without validation" | |
| return None | |
| def _detect_delete_nonexistent_200(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "DELETE" | |
| and re.match(r"^/tasks/\d+$", ctx["endpoint"]) | |
| and ctx["response_status"] == 200 | |
| ): | |
| task_id = int(ctx["endpoint"].split("/")[-1]) | |
| # Check if this task was never created (ID > 1000 is a safe bet for non-existent) | |
| if task_id > 100: | |
| return f"DELETE /tasks/{task_id} returned 200 for non-existent task β should be 404" | |
| return None | |
| def _detect_no_pagination_cap(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "GET" | |
| and ctx["endpoint"] == "/tasks" | |
| and ctx["response_status"] == 200 | |
| ): | |
| limit = ctx["query_params"].get("limit") | |
| if limit is not None: | |
| try: | |
| limit = int(limit) | |
| except (ValueError, TypeError): | |
| return None | |
| if limit > 1000: | |
| return f"GET /tasks?limit={limit} accepted without pagination cap β potential DoS" | |
| return None | |
| def _detect_user_invalid_email(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/users" | |
| and ctx["body"] | |
| and "email" in ctx["body"] | |
| and ctx["response_status"] == 201 | |
| ): | |
| email = ctx["body"]["email"] | |
| if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email): | |
| return f"POST /users accepted invalid email '{email}'" | |
| return None | |
| def _detect_password_hash_exposed(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/users" | |
| and ctx["response_status"] == 201 | |
| and isinstance(ctx["response_body"], dict) | |
| ): | |
| if "password_hash" in ctx["response_body"]: | |
| return "POST /users response exposes password_hash field β security vulnerability" | |
| return None | |
| def _detect_empty_password_login(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/auth/login" | |
| and ctx["body"] | |
| and ctx["response_status"] == 200 | |
| ): | |
| password = ctx["body"].get("password", "NOTEMPTY") | |
| if password == "" or password is None: | |
| return "Login with empty password succeeded β authentication bypass" | |
| return None | |
| def _detect_bola(self, ctx: dict) -> Optional[str]: | |
| """Detect if agent tested cross-user resource access.""" | |
| if ( | |
| ctx["method"] == "GET" | |
| and re.match(r"^/tasks/\d+$", ctx["endpoint"]) | |
| and ctx["response_status"] == 200 | |
| and ctx["response_body"] is not None | |
| and isinstance(ctx["response_body"], dict) | |
| and ctx["headers"].get("Authorization") | |
| ): | |
| # Check if the agent logged in as a different user and accessed another's task | |
| for prev in reversed(ctx["action_history"]): | |
| if ( | |
| prev.get("method") == "POST" | |
| and prev.get("endpoint") == "/auth/login" | |
| and prev.get("response_status") == 200 | |
| and isinstance(prev.get("response_body"), dict) | |
| ): | |
| login_user_id = prev["response_body"].get("user_id") | |
| task_owner_id = ctx["response_body"].get("owner_id") | |
| if ( | |
| login_user_id is not None | |
| and task_owner_id is not None | |
| and login_user_id != task_owner_id | |
| ): | |
| return ( | |
| f"User {login_user_id} accessed task owned by user {task_owner_id} " | |
| f"β BOLA/IDOR vulnerability (no authorization check)" | |
| ) | |
| return None | |
| def _detect_long_input_crash(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/tasks" | |
| and ctx["body"] | |
| and ctx["response_status"] == 500 | |
| ): | |
| title = ctx["body"].get("title", "") | |
| if isinstance(title, str) and len(title) > 5000: | |
| return f"POST /tasks with title length {len(title)} caused 500 β no input length validation" | |
| return None | |
| def _detect_content_injection(self, ctx: dict) -> Optional[str]: | |
| if ( | |
| ctx["method"] == "POST" | |
| and ctx["endpoint"] == "/tasks" | |
| and ctx["body"] | |
| and ctx["response_status"] == 201 | |
| and isinstance(ctx["response_body"], dict) | |
| ): | |
| title = ctx["body"].get("title", "") | |
| injection_patterns = [ | |
| "DROP TABLE", "DELETE FROM", "<script>", "javascript:", | |
| "'; --", "\" OR 1=1", "UNION SELECT", | |
| ] | |
| for pattern in injection_patterns: | |
| if pattern.lower() in str(title).lower(): | |
| stored_title = ctx["response_body"].get("title", "") | |
| if pattern.lower() in str(stored_title).lower(): | |
| return ( | |
| f"Injection payload '{pattern}' in title was stored verbatim " | |
| f"β no input sanitization (content injection)" | |
| ) | |
| return None | |
| def _detect_broken_auth(self, ctx: dict) -> Optional[str]: | |
| """Detect if agent successfully modified another user's task with their own token.""" | |
| if ( | |
| ctx["method"] in ("PUT", "DELETE") | |
| and re.match(r"^/tasks/\d+$", ctx["endpoint"]) | |
| and ctx["response_status"] == 200 | |
| and ctx["headers"].get("Authorization") | |
| ): | |
| for prev in reversed(ctx["action_history"]): | |
| if ( | |
| prev.get("method") == "POST" | |
| and prev.get("endpoint") == "/auth/login" | |
| and prev.get("response_status") == 200 | |
| and isinstance(prev.get("response_body"), dict) | |
| ): | |
| login_user_id = prev["response_body"].get("user_id") | |
| # Check if the task belonged to a different user | |
| task_id = int(ctx["endpoint"].split("/")[-1]) | |
| if isinstance(ctx["response_body"], dict): | |
| task_owner = ctx["response_body"].get("owner_id") | |
| if ( | |
| login_user_id is not None | |
| and task_owner is not None | |
| and login_user_id != task_owner | |
| ): | |
| return ( | |
| f"User {login_user_id}'s token modified task owned by user {task_owner} " | |
| f"β broken authorization" | |
| ) | |
| break | |
| return None | |