File size: 5,226 Bytes
f44f429
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Review Grader System.

Implements programmatic sub-scoring logic for evaluating agent
security actions against internal semantic criteria.
"""

from typing import Tuple, Dict, Any

SCORE_BUG_IDENTIFIED = 0.20
SCORE_BUG_TYPE = 0.20
SCORE_BUG_LOCATION = 0.10
SCORE_DESC_QUALITY = 0.25
SCORE_FIX_QUALITY = 0.15
SCORE_SEV_EXACT = 0.10
SCORE_SEV_PARTIAL = 0.05

KEYWORD_HIT_TARGET = 3.0
PENALTY_THRESHOLD = 0.5
PENALTY_MULTIPLIER = 0.2


def grade_action(action: Dict[str, Any], task: Dict[str, Any]) -> Tuple[float, Dict[str, float]]:
    """Evaluate an action against the task definition.
    
    Args:
        action: The structured payload proposed by the AI agent.
        task: The dictionary blueprint detailing the expected vulnerability.
        
    Returns:
        A tuple of the normalized aggregate reward and the individual component breakdown.
    """
    reward = 0.0
    breakdown: Dict[str, float] = {}

    try:
        # ── Component 1: Bug identified (0.20) ──────────────────────────────────
        if action.get("bug_identified"):
            reward += SCORE_BUG_IDENTIFIED
            breakdown["bug_identified"] = SCORE_BUG_IDENTIFIED
        else:
            breakdown["bug_identified"] = 0.00
            # No bug found β†’ no partial credit for anything else
            return max(0.0, min(1.0, reward)), breakdown

        # ── Component 2: Bug type match (0.20) ──────────────────────────────────
        action_type = action.get("bug_type", "").lower().replace("-", " ").replace("_", " ")
        task_type = task["bug_type"].lower().replace("-", " ").replace("_", " ")
        if task_type in action_type or action_type in task_type:
            reward += SCORE_BUG_TYPE
            breakdown["bug_type"] = SCORE_BUG_TYPE
        else:
            breakdown["bug_type"] = 0.00

        # ── Component 3: Bug location (0.10) ────────────────────────────────────
        action_location = action.get("bug_location", "").lower()
        location_keywords = [w for w in task["bug_location"].lower().split() if len(w) > 3]
        if location_keywords:
            matched = sum(1 for kw in location_keywords if kw in action_location)
            loc_score = round(SCORE_BUG_LOCATION * (matched / len(location_keywords)), 4)
        else:
            loc_score = 0.0
        
        reward += loc_score
        breakdown["bug_location"] = loc_score

        # ── Component 4: Description quality (0.25) ──────────────────────────────
        description = action.get("bug_description", "").lower()
        desc_score = 0.0
        if len(description) >= 20:
            task_keywords = task["keywords"]
            target = task.get("keyword_target_override", KEYWORD_HIT_TARGET)
            matched_kw = [kw for kw in task_keywords if kw in description]
            desc_score = round(min(SCORE_DESC_QUALITY, SCORE_DESC_QUALITY * (len(matched_kw) / target)), 4)
        
        breakdown["description_quality"] = desc_score
        reward += desc_score

        # ── Component 5: Fix quality (0.15) ──────────────────────────────────────
        fix = action.get("suggested_fix", "").lower()
        fix_score = 0.0
        if len(fix) >= 10:
            fix_patterns = task["fix_patterns"]
            matched_fix = [p for p in fix_patterns if p.lower() in fix]
            fix_score = round(min(SCORE_FIX_QUALITY, SCORE_FIX_QUALITY * len(matched_fix)), 4)
        
        breakdown["fix_quality"] = fix_score
        reward += fix_score

        # ── Component 6: Severity (0.10) ─────────────────────────────────────────
        action_sev = action.get("severity", "").lower()
        task_sev = task["severity"].lower()
        if action_sev == task_sev:
            sev_score = SCORE_SEV_EXACT
        elif action_sev in ("high", "critical") and task_sev in ("high", "critical"):
            sev_score = SCORE_SEV_PARTIAL
        else:
            sev_score = 0.00
        
        breakdown["severity"] = sev_score
        reward += sev_score

        # ── Global Penalty: Keyword Stuffing ────────────────────────────────────
        words = description.split()
        unique_ratio = len(set(words)) / len(words) if words else 1.0
        if unique_ratio < PENALTY_THRESHOLD:
            reward *= PENALTY_MULTIPLIER
            breakdown["stuffing_penalty_multiplier"] = PENALTY_MULTIPLIER
            for k in list(breakdown.keys()):
                if k != "stuffing_penalty_multiplier":
                    breakdown[k] = round(breakdown[k] * PENALTY_MULTIPLIER, 4)

        return max(0.0, min(1.0, round(reward, 4))), breakdown

    except KeyError as exc:
        raise RuntimeError(f"Missing mandatory schema key in task definition: {exc}") from exc