""" Task-specific grading logic and bug bounty report generation. Each task has a grader that computes a final score (0.0 - 1.0) based on what the agent accomplished during the episode. """ from dataclasses import dataclass @dataclass class GradeResult: score: float breakdown: dict[str, float] feedback: str report: str = "" # Bug bounty report (markdown) def generate_bug_report(bugs_found_ids: list[str], action_history: list[dict]) -> str: """Generate a structured bug bounty report for discovered bugs.""" from .bug_detector import BugDetector detector = BugDetector("security_workflows") if not bugs_found_ids: return "## API Security Assessment Report\n\nNo vulnerabilities discovered." severity_order = {"hard": 0, "medium": 1, "easy": 2} sorted_bugs = sorted(bugs_found_ids, key=lambda b: severity_order.get(detector.bugs.get(b, None) and detector.bugs[b].severity or "easy", 2)) sections = ["## API Security Assessment Report", ""] sections.append(f"**Vulnerabilities Found:** {len(bugs_found_ids)}") # Count by severity counts = {"easy": 0, "medium": 0, "hard": 0} for bid in bugs_found_ids: bug = detector.bugs.get(bid) if bug: counts[bug.severity] = counts.get(bug.severity, 0) + 1 sections.append(f"**Critical/Hard:** {counts['hard']} | **Medium:** {counts['medium']} | **Low/Easy:** {counts['easy']}") sections.append("") for bid in sorted_bugs: bug = detector.bugs.get(bid) if not bug: continue sev_label = {"easy": "LOW", "medium": "MEDIUM", "hard": "HIGH"}.get(bug.severity, "INFO") owasp = bug.owasp if bug.owasp else "Uncategorized" sections.append(f"### {sev_label}: {bug.description}") sections.append(f"- **ID:** {bid}") sections.append(f"- **OWASP:** {owasp}") sections.append(f"- **Category:** {bug.category}") sections.append(f"- **Recommendation:** {bug.recommendation}" if bug.recommendation else "") # Find the action that triggered this bug for h in action_history: if h.get("method") and h.get("endpoint"): sections.append(f"- **Triggered by:** {h['method']} {h['endpoint']}") break sections.append("") return "\n".join(sections) class TaskGrader: """Computes final scores for each task based on episode performance.""" @staticmethod def grade( task_id: str, bugs_found: set[str], coverage_pct: float, endpoints_tested: int, total_endpoints: int, method_endpoint_pairs: int, status_codes_seen: set[int], action_history: list[dict], created_resources: dict[str, list], ) -> GradeResult: if task_id == "basic_validation": return TaskGrader._grade_basic( bugs_found, coverage_pct, endpoints_tested, total_endpoints, method_endpoint_pairs, status_codes_seen, action_history, created_resources, ) elif task_id == "edge_cases": return TaskGrader._grade_edge_cases( bugs_found, coverage_pct, endpoints_tested, method_endpoint_pairs, status_codes_seen, action_history, created_resources, ) elif task_id == "security_workflows": return TaskGrader._grade_security( bugs_found, coverage_pct, action_history, created_resources, ) return GradeResult(score=0.0, breakdown={}, feedback="Unknown task") @staticmethod def _grade_basic( bugs_found, coverage_pct, endpoints_tested, total_endpoints, method_endpoint_pairs, status_codes_seen, action_history, created_resources, ) -> GradeResult: breakdown = {} # 0.25: Test all GET endpoints get_endpoints = { h.get("endpoint") for h in action_history if h.get("method", "").upper() == "GET" } get_score = min(len(get_endpoints) / 4, 1.0) * 0.25 breakdown["get_coverage"] = round(get_score, 3) # 0.20: Test POST with valid data post_success = sum( 1 for h in action_history if h.get("method", "").upper() == "POST" and h.get("response_status") == 201 ) post_score = min(post_success / 2, 1.0) * 0.20 breakdown["post_testing"] = round(post_score, 3) # 0.15: Test PUT/DELETE put_delete = sum( 1 for h in action_history if h.get("method", "").upper() in ("PUT", "DELETE") ) pd_score = min(put_delete / 2, 1.0) * 0.15 breakdown["put_delete"] = round(pd_score, 3) # 0.20: Bug discovery (easy bugs: TASK_01, TASK_02, TASK_03) easy_bugs = {"BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"} found_easy = len(bugs_found & easy_bugs) bug_score = min(found_easy / 2, 1.0) * 0.20 breakdown["bugs_found"] = round(bug_score, 3) # 0.20: Response schema validation (status codes variety) schema_score = min(len(status_codes_seen) / 4, 1.0) * 0.20 breakdown["schema_validation"] = round(schema_score, 3) score = sum(breakdown.values()) feedback_parts = [] if get_score > 0: feedback_parts.append(f"GET coverage: {len(get_endpoints)} endpoints") if post_success > 0: feedback_parts.append(f"POST success: {post_success}") if found_easy > 0: feedback_parts.append(f"Bugs found: {found_easy}/{len(easy_bugs)}") return GradeResult( score=round(min(score, 1.0), 4), breakdown=breakdown, feedback="; ".join(feedback_parts) if feedback_parts else "No significant progress", ) @staticmethod def _grade_edge_cases( bugs_found, coverage_pct, endpoints_tested, method_endpoint_pairs, status_codes_seen, action_history, created_resources, ) -> GradeResult: breakdown = {} # 0.15: Missing required fields testing missing_field_tests = sum( 1 for h in action_history if h.get("method", "").upper() == "POST" and h.get("body") is not None and isinstance(h.get("body"), dict) and not h["body"].get("title") ) breakdown["missing_fields"] = round(min(missing_field_tests / 2, 1.0) * 0.15, 3) # 0.15: Invalid data type testing invalid_tests = sum( 1 for h in action_history if h.get("body") and isinstance(h.get("body"), dict) and any( isinstance(v, (list, bool)) or v == "" for v in h["body"].values() ) ) breakdown["invalid_types"] = round(min(invalid_tests / 2, 1.0) * 0.15, 3) # 0.15: Boundary value testing (negative pages, huge limits, long strings) boundary_tests = 0 for h in action_history: qp = h.get("query_params", {}) if qp.get("page") is not None and int(str(qp.get("page", 1))) < 1: boundary_tests += 1 if qp.get("limit") is not None and int(str(qp.get("limit", 10))) > 100: boundary_tests += 1 breakdown["boundary_values"] = round(min(boundary_tests / 2, 1.0) * 0.15, 3) # 0.15: Non-existent resource testing nonexistent_tests = sum( 1 for h in action_history if h.get("method", "").upper() in ("GET", "DELETE", "PUT") and "/999" in h.get("endpoint", "") ) breakdown["nonexistent_resources"] = round(min(nonexistent_tests / 2, 1.0) * 0.15, 3) # 0.20: Bug discovery (medium bugs) medium_bugs = { "BUG_TASK_04", "BUG_TASK_05", "BUG_TASK_06", "BUG_USER_01", "BUG_USER_02", "BUG_AUTH_02", } all_relevant = medium_bugs | {"BUG_TASK_01", "BUG_TASK_02", "BUG_TASK_03"} found_relevant = len(bugs_found & all_relevant) breakdown["bugs_found"] = round(min(found_relevant / 3, 1.0) * 0.20, 3) # 0.20: Dependency chaining (create → read → update → delete) chain_score = 0.0 if any(h.get("method") == "POST" and h.get("response_status") == 201 for h in action_history): chain_score += 0.25 if created_resources.get("tasks"): task_ids = created_resources["tasks"] for tid in task_ids: gets = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "GET"] puts = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "PUT"] deletes = [h for h in action_history if h.get("endpoint") == f"/tasks/{tid}" and h.get("method") == "DELETE"] if gets: chain_score += 0.25 if puts: chain_score += 0.25 if deletes: chain_score += 0.25 break # Only need one complete chain breakdown["dependency_chaining"] = round(min(chain_score, 1.0) * 0.20, 3) score = sum(breakdown.values()) return GradeResult( score=round(min(score, 1.0), 4), breakdown=breakdown, feedback=f"Edge cases: {found_relevant} bugs found, chain score {chain_score:.0%}", ) @staticmethod def _grade_security( bugs_found, coverage_pct, action_history, created_resources, ) -> GradeResult: breakdown = {} # 0.20: Cross-user authorization testing cross_user = False login_users = set() for h in action_history: if h.get("endpoint") == "/auth/login" and h.get("response_status") == 200: body = h.get("body", {}) if body: login_users.add(body.get("username")) cross_user = len(login_users) >= 2 breakdown["cross_user_auth"] = 0.20 if cross_user else 0.0 # 0.20: Injection pattern testing injection_attempted = sum( 1 for h in action_history if h.get("body") and isinstance(h.get("body"), dict) and any( pattern.lower() in str(h["body"]).lower() for pattern in ["DROP TABLE", "