Spaces:
Running
Running
| """ | |
| Task-specific grading logic and bug bounty report generation. | |
| Each task has a grader that computes a final score (0.0 - 1.0) | |
| based on what the agent accomplished during the episode. | |
| """ | |
| from dataclasses import dataclass | |
| class GradeResult: | |
| score: float | |
| breakdown: dict[str, float] | |
| feedback: str | |
| report: str = "" # Bug bounty report (markdown) | |
| def generate_bug_report(bugs_found_ids: list[str], action_history: list[dict]) -> str: | |
| """Generate a structured bug bounty report for discovered bugs.""" | |
| from .bug_detector import BugDetector | |
| detector = BugDetector("security_workflows") | |
| if not bugs_found_ids: | |
| return "## API Security Assessment Report\n\nNo vulnerabilities discovered." | |
| severity_order = {"hard": 0, "medium": 1, "easy": 2} | |
| sorted_bugs = sorted(bugs_found_ids, key=lambda b: severity_order.get(detector.bugs.get(b, None) and detector.bugs[b].severity or "easy", 2)) | |
| sections = ["## API Security Assessment Report", ""] | |
| sections.append(f"**Vulnerabilities Found:** {len(bugs_found_ids)}") | |
| # Count by severity | |
| counts = {"easy": 0, "medium": 0, "hard": 0} | |
| for bid in bugs_found_ids: | |
| bug = detector.bugs.get(bid) | |
| if bug: | |
| counts[bug.severity] = counts.get(bug.severity, 0) + 1 | |
| sections.append(f"**Critical/Hard:** {counts['hard']} | **Medium:** {counts['medium']} | **Low/Easy:** {counts['easy']}") | |
| sections.append("") | |
| for bid in sorted_bugs: | |
| bug = detector.bugs.get(bid) | |
| if not bug: | |
| continue | |
| sev_label = {"easy": "LOW", "medium": "MEDIUM", "hard": "HIGH"}.get(bug.severity, "INFO") | |
| owasp = bug.owasp if bug.owasp else "Uncategorized" | |
| sections.append(f"### {sev_label}: {bug.description}") | |
| sections.append(f"- **ID:** {bid}") | |
| sections.append(f"- **OWASP:** {owasp}") | |
| sections.append(f"- **Category:** {bug.category}") | |
| sections.append(f"- **Recommendation:** {bug.recommendation}" if bug.recommendation else "") | |
| # Find the action that triggered this bug | |
| for h in action_history: | |
| if h.get("method") and h.get("endpoint"): | |
| sections.append(f"- **Triggered by:** {h['method']} {h['endpoint']}") | |
| break | |
| sections.append("") | |
| return "\n".join(sections) | |
| class TaskGrader: | |
| """Computes final scores for each task based on episode performance.""" | |
| def grade( | |
| task_id: str, | |
| bugs_found: set[str], | |
| coverage_pct: float, | |
| endpoints_tested: int, | |
| total_endpoints: int, | |
| method_endpoint_pairs: int, | |
| status_codes_seen: set[int], | |
| action_history: list[dict], | |
| created_resources: dict[str, list], | |
| ) -> GradeResult: | |
| if task_id == "basic_validation": | |
| return TaskGrader._grade_basic( | |
| bugs_found, coverage_pct, endpoints_tested, total_endpoints, | |
| method_endpoint_pairs, status_codes_seen, action_history, created_resources, | |
| ) | |
| elif task_id == "edge_cases": | |
| return TaskGrader._grade_edge_cases( | |
| bugs_found, coverage_pct, endpoints_tested, method_endpoint_pairs, | |
| status_codes_seen, action_history, created_resources, | |
| ) | |
| elif task_id == "security_workflows": | |
| return TaskGrader._grade_security( | |
| bugs_found, coverage_pct, action_history, created_resources, | |
| ) | |
| return GradeResult(score=0.0, breakdown={}, feedback="Unknown task") | |
| def _grade_basic( | |
| bugs_found, coverage_pct, endpoints_tested, total_endpoints, | |
| method_endpoint_pairs, status_codes_seen, action_history, created_resources, | |
| ) -> GradeResult: | |
| breakdown = {} | |
| # 0.25: Test all GET endpoints | |
| get_endpoints = { | |
| h.get("endpoint") for h in action_history | |
| if h.get("method", "").upper() == "GET" | |
| } | |
| get_score = min(len(get_endpoints) / 4, 1.0) * 0.25 | |
| breakdown["get_coverage"] = round(get_score, 3) | |
| # 0.20: Test POST with valid data | |
| post_success = sum( | |
| 1 for h in action_history | |
| if h.get("method", "").upper() == "POST" and h.get("response_status") == 201 | |
| ) | |
| post_score = min(post_success / 2, 1.0) * 0.20 | |
| breakdown["post_testing"] = round(post_score, 3) | |
| # 0.15: Test PUT/DELETE | |
| put_delete = sum( | |
| 1 for h in action_history | |
| if h.get("method", "").upper() in ("PUT", "DELETE") | |
| ) | |
| pd_score = min(put_delete / 2, 1.0) * 0.15 | |
| breakdown["put_delete"] = round(pd_score, 3) | |
| # 0.20: Bug discovery (easy bugs: TASK_01, TASK_02, TASK_03) | |
| easy_bugs = {"BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"} | |
| found_easy = len(bugs_found & easy_bugs) | |
| bug_score = min(found_easy / 2, 1.0) * 0.20 | |
| breakdown["bugs_found"] = round(bug_score, 3) | |
| # 0.20: Response schema validation (status codes variety) | |
| schema_score = min(len(status_codes_seen) / 4, 1.0) * 0.20 | |
| breakdown["schema_validation"] = round(schema_score, 3) | |
| score = sum(breakdown.values()) | |
| feedback_parts = [] | |
| if get_score > 0: | |
| feedback_parts.append(f"GET coverage: {len(get_endpoints)} endpoints") | |
| if post_success > 0: | |
| feedback_parts.append(f"POST success: {post_success}") | |
| if found_easy > 0: | |
| feedback_parts.append(f"Bugs found: {found_easy}/{len(easy_bugs)}") | |
| return GradeResult( | |
| score=round(min(score, 1.0), 4), | |
| breakdown=breakdown, | |
| feedback="; ".join(feedback_parts) if feedback_parts else "No significant progress", | |
| ) | |
| def _grade_edge_cases( | |
| bugs_found, coverage_pct, endpoints_tested, method_endpoint_pairs, | |
| status_codes_seen, action_history, created_resources, | |
| ) -> GradeResult: | |
| breakdown = {} | |
| # 0.15: Missing required fields testing | |
| missing_field_tests = sum( | |
| 1 for h in action_history | |
| if h.get("method", "").upper() == "POST" | |
| and h.get("body") is not None | |
| and isinstance(h.get("body"), dict) | |
| and not h["body"].get("title") | |
| ) | |
| breakdown["missing_fields"] = round(min(missing_field_tests / 2, 1.0) * 0.15, 3) | |
| # 0.15: Invalid data type testing | |
| invalid_tests = sum( | |
| 1 for h in action_history | |
| if h.get("body") and isinstance(h.get("body"), dict) | |
| and any( | |
| isinstance(v, (list, bool)) or v == "" | |
| for v in h["body"].values() | |
| ) | |
| ) | |
| breakdown["invalid_types"] = round(min(invalid_tests / 2, 1.0) * 0.15, 3) | |
| # 0.15: Boundary value testing (negative pages, huge limits, long strings) | |
| boundary_tests = 0 | |
| for h in action_history: | |
| qp = h.get("query_params", {}) | |
| if qp.get("page") is not None and int(str(qp.get("page", 1))) < 1: | |
| boundary_tests += 1 | |
| if qp.get("limit") is not None and int(str(qp.get("limit", 10))) > 100: | |
| boundary_tests += 1 | |
| breakdown["boundary_values"] = round(min(boundary_tests / 2, 1.0) * 0.15, 3) | |
| # 0.15: Non-existent resource testing | |
| nonexistent_tests = sum( | |
| 1 for h in action_history | |
| if h.get("method", "").upper() in ("GET", "DELETE", "PUT") | |
| and "/999" in h.get("endpoint", "") | |
| ) | |
| breakdown["nonexistent_resources"] = round(min(nonexistent_tests / 2, 1.0) * 0.15, 3) | |
| # 0.20: Bug discovery (medium bugs) | |
| medium_bugs = { | |
| "BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06", | |
| "BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02", | |
| } | |
| all_relevant = medium_bugs | {"BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"} | |
| found_relevant = len(bugs_found & all_relevant) | |
| breakdown["bugs_found"] = round(min(found_relevant / 3, 1.0) * 0.20, 3) | |
| # 0.20: Dependency chaining (create β read β update β delete) | |
| chain_score = 0.0 | |
| if any(h.get("method") == "POST" and h.get("response_status") == 201 for h in action_history): | |
| chain_score += 0.25 | |
| if created_resources.get("tasks"): | |
| task_ids = created_resources["tasks"] | |
| for tid in task_ids: | |
| gets = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "GET"] | |
| puts = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "PUT"] | |
| deletes = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "DELETE"] | |
| if gets: | |
| chain_score += 0.25 | |
| if puts: | |
| chain_score += 0.25 | |
| if deletes: | |
| chain_score += 0.25 | |
| break # Only need one complete chain | |
| breakdown["dependency_chaining"] = round(min(chain_score, 1.0) * 0.20, 3) | |
| score = sum(breakdown.values()) | |
| return GradeResult( | |
| score=round(min(score, 1.0), 4), | |
| breakdown=breakdown, | |
| feedback=f"Edge cases: {found_relevant} bugs found, chain score {chain_score:.0%}", | |
| ) | |
| def _grade_security( | |
| bugs_found, coverage_pct, action_history, created_resources, | |
| ) -> GradeResult: | |
| breakdown = {} | |
| # 0.20: Cross-user authorization testing | |
| cross_user = False | |
| login_users = set() | |
| for h in action_history: | |
| if h.get("endpoint") == "/auth/login" and h.get("response_status") == 200: | |
| body = h.get("body", {}) | |
| if body: | |
| login_users.add(body.get("username")) | |
| cross_user = len(login_users) >= 2 | |
| breakdown["cross_user_auth"] = 0.20 if cross_user else 0.0 | |
| # 0.20: Injection pattern testing | |
| injection_attempted = sum( | |
| 1 for h in action_history | |
| if h.get("body") and isinstance(h.get("body"), dict) | |
| and any( | |
| pattern.lower() in str(h["body"]).lower() | |
| for pattern in ["DROP TABLE", "<script>", "OR 1=1", "UNION SELECT", "'; --"] | |
| ) | |
| ) | |
| breakdown["injection_testing"] = round(min(injection_attempted / 2, 1.0) * 0.20, 3) | |
| # 0.20: Multi-step state consistency | |
| # Check if agent did: create β delete β re-fetch (stale cache test) | |
| consistency_tests = 0 | |
| for i, h in enumerate(action_history): | |
| if h.get("method") == "DELETE" and "/tasks/" in h.get("endpoint", ""): | |
| # Check if agent re-fetched the same resource after deleting | |
| deleted_endpoint = h["endpoint"] | |
| for j in range(i + 1, len(action_history)): | |
| if action_history[j].get("endpoint") == deleted_endpoint and action_history[j].get("method") == "GET": | |
| consistency_tests += 1 | |
| break | |
| breakdown["state_consistency"] = round(min(consistency_tests, 1.0) * 0.20, 3) | |
| # 0.20: Security bug discovery | |
| security_bugs = {"BUG_TASK_07", "BUG_AUTH_01", "BUG_TASK_08", "BUG_TASK_09"} | |
| found_security = len(bugs_found & security_bugs) | |
| breakdown["security_bugs"] = round(min(found_security / 2, 1.0) * 0.20, 3) | |
| # 0.20: Complete workflow coverage | |
| workflow_coverage = min(coverage_pct / 80, 1.0) # 80% coverage = full score | |
| breakdown["workflow_coverage"] = round(workflow_coverage * 0.20, 3) | |
| score = sum(breakdown.values()) | |
| return GradeResult( | |
| score=round(min(score, 1.0), 4), | |
| breakdown=breakdown, | |
| feedback=f"Security: {found_security} security bugs, {len(login_users)} users tested, {injection_attempted} injection attempts", | |
| ) | |