Spaces:
Running
Running
File size: 17,509 Bytes
a4f74f3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 | """
Bug detection logic β checks if the agent's action/response pair reveals a planted bug.
Each bug has:
- A unique ID
- A severity level (easy/medium/hard)
- A detection function that checks action + response
"""
from dataclasses import dataclass
from typing import Any, Callable, Optional
import re
@dataclass
class Bug:
id: str
severity: str # "easy", "medium", "hard"
description: str
category: str # "status_code", "validation", "security", "data_integrity"
owasp: str = "" # OWASP API Security Top 10 (2023) category
recommendation: str = "" # Fix recommendation for bug bounty reports
@dataclass
class BugDetection:
bug: Bug
evidence: str # Human-readable explanation of how the bug was detected
class BugDetector:
"""Detects planted bugs based on agent actions and API responses."""
def __init__(self, task_id: str):
self.task_id = task_id
self._build_bug_registry()
def _build_bug_registry(self):
"""Define all bugs with their detection logic."""
self.bugs: dict[str, Bug] = {}
self.detectors: dict[str, Callable] = {}
# === EASY BUGS ===
self._register_bug(
Bug("BUG_TASK_01", "easy",
"GET /tasks/{id} returns 200 with null for non-existent task",
"status_code",
owasp="API8:2023 Security Misconfiguration",
recommendation="Return 404 Not Found for non-existent resources"),
self._detect_null_response_for_missing_task,
)
self._register_bug(
Bug("BUG_TASK_02", "easy",
"POST /tasks with missing title returns 500 instead of 400/422",
"validation",
owasp="API8:2023 Security Misconfiguration",
recommendation="Validate required fields and return 400/422 with descriptive error"),
self._detect_missing_field_500,
)
self._register_bug(
Bug("BUG_TASK_03", "easy",
"GET /tasks?page=-1 returns 200 instead of 400",
"validation",
owasp="API8:2023 Security Misconfiguration",
recommendation="Validate pagination parameters: page >= 1, limit > 0"),
self._detect_negative_page,
)
# === MEDIUM BUGS ===
self._register_bug(
Bug("BUG_TASK_04", "medium",
"PUT /tasks/{id} accepts invalid email format for assignee_email",
"validation",
owasp="API8:2023 Security Misconfiguration",
recommendation="Validate email format with regex before accepting"),
self._detect_invalid_email_accepted,
)
self._register_bug(
Bug("BUG_TASK_05", "medium",
"DELETE /tasks/{id} returns 200 for non-existent task",
"status_code",
owasp="API8:2023 Security Misconfiguration",
recommendation="Check resource existence before deletion, return 404 if missing"),
self._detect_delete_nonexistent_200,
)
self._register_bug(
Bug("BUG_TASK_06", "medium",
"GET /tasks?limit=999999 has no pagination cap",
"validation",
owasp="API4:2023 Unrestricted Resource Consumption",
recommendation="Cap pagination limit at 100, reject values above maximum"),
self._detect_no_pagination_cap,
)
self._register_bug(
Bug("BUG_USER_01", "medium",
"POST /users accepts invalid email format",
"validation",
owasp="API8:2023 Security Misconfiguration",
recommendation="Validate email format server-side before creating user"),
self._detect_user_invalid_email,
)
self._register_bug(
Bug("BUG_USER_02", "medium",
"POST /users response exposes password hash",
"security",
owasp="API3:2023 Broken Object Property Level Authorization",
recommendation="Never return sensitive fields (password_hash) in API responses"),
self._detect_password_hash_exposed,
)
self._register_bug(
Bug("BUG_AUTH_02", "medium",
"Login with empty password succeeds",
"security",
owasp="API2:2023 Broken Authentication",
recommendation="Validate password is non-empty and verify against stored hash"),
self._detect_empty_password_login,
)
# === HARD BUGS ===
self._register_bug(
Bug("BUG_TASK_07", "hard",
"BOLA: User A can access User B's tasks without authorization check",
"security",
owasp="API1:2023 Broken Object Level Authorization",
recommendation="Verify resource ownership: check task.owner_id matches authenticated user"),
self._detect_bola,
)
self._register_bug(
Bug("BUG_TASK_08", "hard",
"POST /tasks with very long title (>5000 chars) causes 500",
"validation",
owasp="API4:2023 Unrestricted Resource Consumption",
recommendation="Add input length validation: title max 200 chars"),
self._detect_long_input_crash,
)
self._register_bug(
Bug("BUG_TASK_09", "hard",
"SQL injection payload in title is stored verbatim (content injection)",
"security",
owasp="API8:2023 Security Misconfiguration",
recommendation="Sanitize user input before storage, escape HTML/SQL special characters"),
self._detect_content_injection,
)
self._register_bug(
Bug("BUG_AUTH_01", "hard",
"Auth tokens not user-scoped: User A's token can modify User B's tasks",
"security",
owasp="API1:2023 Broken Object Level Authorization",
recommendation="Enforce ownership check on all write operations (PUT/DELETE)"),
self._detect_broken_auth,
)
def _register_bug(self, bug: Bug, detector: Callable):
self.bugs[bug.id] = bug
self.detectors[bug.id] = detector
def get_bugs_for_task(self) -> list[Bug]:
"""Return bugs relevant to the current task."""
if self.task_id == "basic_validation":
return [self.bugs[bid] for bid in ["BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"]]
elif self.task_id == "edge_cases":
return [
self.bugs[bid]
for bid in [
"BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03",
"BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06",
"BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02",
]
]
else: # security_workflows
return list(self.bugs.values())
def check(
self,
method: str,
endpoint: str,
headers: dict,
query_params: dict,
body: Optional[dict],
expected_status: Optional[int],
response_status: int,
response_body: Any,
action_history: list[dict],
found_bugs: set[str],
) -> Optional[BugDetection]:
"""Check if this action/response reveals a bug.
Returns the first new bug detected, or None.
"""
ctx = {
"method": method.upper(),
"endpoint": endpoint,
"headers": headers,
"query_params": query_params,
"body": body,
"expected_status": expected_status,
"response_status": response_status,
"response_body": response_body,
"action_history": action_history,
}
for bug_id, detector in self.detectors.items():
if bug_id in found_bugs:
continue
# Only check bugs relevant to this task
task_bugs = {b.id for b in self.get_bugs_for_task()}
if bug_id not in task_bugs:
continue
result = detector(ctx)
if result:
return BugDetection(bug=self.bugs[bug_id], evidence=result)
return None
# === DETECTION FUNCTIONS ===
def _detect_null_response_for_missing_task(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "GET"
and re.match(r"^/tasks/\d+$", ctx["endpoint"])
and ctx["response_status"] == 200
and ctx["response_body"] is None
):
task_id = ctx["endpoint"].split("/")[-1]
return f"GET /tasks/{task_id} returned 200 with null body β should be 404"
return None
def _detect_missing_field_500(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/tasks"
and ctx["response_status"] == 500
and ctx["body"] is not None
and "title" not in ctx["body"]
):
return "POST /tasks with missing 'title' returned 500 β should be 400 or 422"
return None
def _detect_negative_page(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "GET"
and ctx["endpoint"] == "/tasks"
and ctx["query_params"].get("page") is not None
):
page = ctx["query_params"]["page"]
try:
page = int(page)
except (ValueError, TypeError):
return None
if page < 1 and ctx["response_status"] == 200:
return f"GET /tasks?page={page} returned 200 β should be 400 for invalid page"
return None
def _detect_invalid_email_accepted(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "PUT"
and re.match(r"^/tasks/\d+$", ctx["endpoint"])
and ctx["body"]
and "assignee_email" in ctx["body"]
and ctx["response_status"] in (200, 201)
):
email = ctx["body"]["assignee_email"]
if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
return f"PUT accepted invalid email '{email}' without validation"
return None
def _detect_delete_nonexistent_200(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "DELETE"
and re.match(r"^/tasks/\d+$", ctx["endpoint"])
and ctx["response_status"] == 200
):
task_id = int(ctx["endpoint"].split("/")[-1])
# Check if this task was never created (ID > 1000 is a safe bet for non-existent)
if task_id > 100:
return f"DELETE /tasks/{task_id} returned 200 for non-existent task β should be 404"
return None
def _detect_no_pagination_cap(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "GET"
and ctx["endpoint"] == "/tasks"
and ctx["response_status"] == 200
):
limit = ctx["query_params"].get("limit")
if limit is not None:
try:
limit = int(limit)
except (ValueError, TypeError):
return None
if limit > 1000:
return f"GET /tasks?limit={limit} accepted without pagination cap β potential DoS"
return None
def _detect_user_invalid_email(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/users"
and ctx["body"]
and "email" in ctx["body"]
and ctx["response_status"] == 201
):
email = ctx["body"]["email"]
if email and not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
return f"POST /users accepted invalid email '{email}'"
return None
def _detect_password_hash_exposed(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/users"
and ctx["response_status"] == 201
and isinstance(ctx["response_body"], dict)
):
if "password_hash" in ctx["response_body"]:
return "POST /users response exposes password_hash field β security vulnerability"
return None
def _detect_empty_password_login(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/auth/login"
and ctx["body"]
and ctx["response_status"] == 200
):
password = ctx["body"].get("password", "NOTEMPTY")
if password == "" or password is None:
return "Login with empty password succeeded β authentication bypass"
return None
def _detect_bola(self, ctx: dict) -> Optional[str]:
"""Detect if agent tested cross-user resource access."""
if (
ctx["method"] == "GET"
and re.match(r"^/tasks/\d+$", ctx["endpoint"])
and ctx["response_status"] == 200
and ctx["response_body"] is not None
and isinstance(ctx["response_body"], dict)
and ctx["headers"].get("Authorization")
):
# Check if the agent logged in as a different user and accessed another's task
for prev in reversed(ctx["action_history"]):
if (
prev.get("method") == "POST"
and prev.get("endpoint") == "/auth/login"
and prev.get("response_status") == 200
and isinstance(prev.get("response_body"), dict)
):
login_user_id = prev["response_body"].get("user_id")
task_owner_id = ctx["response_body"].get("owner_id")
if (
login_user_id is not None
and task_owner_id is not None
and login_user_id != task_owner_id
):
return (
f"User {login_user_id} accessed task owned by user {task_owner_id} "
f"β BOLA/IDOR vulnerability (no authorization check)"
)
return None
def _detect_long_input_crash(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/tasks"
and ctx["body"]
and ctx["response_status"] == 500
):
title = ctx["body"].get("title", "")
if isinstance(title, str) and len(title) > 5000:
return f"POST /tasks with title length {len(title)} caused 500 β no input length validation"
return None
def _detect_content_injection(self, ctx: dict) -> Optional[str]:
if (
ctx["method"] == "POST"
and ctx["endpoint"] == "/tasks"
and ctx["body"]
and ctx["response_status"] == 201
and isinstance(ctx["response_body"], dict)
):
title = ctx["body"].get("title", "")
injection_patterns = [
"DROP TABLE", "DELETE FROM", "<script>", "javascript:",
"'; --", "\" OR 1=1", "UNION SELECT",
]
for pattern in injection_patterns:
if pattern.lower() in str(title).lower():
stored_title = ctx["response_body"].get("title", "")
if pattern.lower() in str(stored_title).lower():
return (
f"Injection payload '{pattern}' in title was stored verbatim "
f"β no input sanitization (content injection)"
)
return None
def _detect_broken_auth(self, ctx: dict) -> Optional[str]:
"""Detect if agent successfully modified another user's task with their own token."""
if (
ctx["method"] in ("PUT", "DELETE")
and re.match(r"^/tasks/\d+$", ctx["endpoint"])
and ctx["response_status"] == 200
and ctx["headers"].get("Authorization")
):
for prev in reversed(ctx["action_history"]):
if (
prev.get("method") == "POST"
and prev.get("endpoint") == "/auth/login"
and prev.get("response_status") == 200
and isinstance(prev.get("response_body"), dict)
):
login_user_id = prev["response_body"].get("user_id")
# Check if the task belonged to a different user
task_id = int(ctx["endpoint"].split("/")[-1])
if isinstance(ctx["response_body"], dict):
task_owner = ctx["response_body"].get("owner_id")
if (
login_user_id is not None
and task_owner is not None
and login_user_id != task_owner
):
return (
f"User {login_user_id}'s token modified task owned by user {task_owner} "
f"β broken authorization"
)
break
return None
|