File size: 7,163 Bytes
4ec75cf
 
72b3e8d
 
 
 
 
 
4ec75cf
699f953
4ec75cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72b3e8d
 
 
 
 
 
 
 
4ec75cf
 
 
 
72b3e8d
4ec75cf
72b3e8d
 
 
 
 
 
 
 
 
 
 
 
4ec75cf
 
 
72b3e8d
 
4ec75cf
 
 
 
 
72b3e8d
 
 
 
 
 
 
 
 
4ec75cf
 
 
72b3e8d
6f95f2a
 
 
 
 
 
 
 
 
 
4ec75cf
 
72b3e8d
 
4ec75cf
72b3e8d
 
 
 
 
 
 
4ec75cf
6f95f2a
4ec75cf
6f95f2a
 
72b3e8d
6f95f2a
 
72b3e8d
 
 
 
 
 
 
 
6f95f2a
72b3e8d
 
6f95f2a
72b3e8d
 
 
 
 
4ec75cf
 
 
72b3e8d
 
 
 
 
 
 
 
 
 
4ec75cf
 
 
 
72b3e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ec75cf
72b3e8d
 
4ec75cf
72b3e8d
 
4ec75cf
 
 
 
 
 
 
 
 
 
 
72b3e8d
4ec75cf
 
699f953
 
 
 
 
 
4ec75cf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# server/graders/security_grader.py
# Grader for MCP Security Sandbox tasks (sec_easy, sec_medium, sec_hard).
#
# FIX SUMMARY:
# 1. _score_identify: CVSS partial credit was too generous (Β±3.0 range β†’ Β±1.5)
# 2. _score_propose: floor raised from 0.0 to 0.15, but explanation scoring tightened
# 3. _score_revise: floor raised from 0.20 to 0.10 β€” revise should be hard
# 4. All three scorers now have tighter weights that produce real variance

from typing import Dict, Any
from .base_grader import grade_dynamic, safe_score

VALID_ACTIONS = ['identify_vulnerability', 'propose_fix', 'revise_fix']
FORBIDDEN = ['os.remove', 'shutil.rmtree', 'drop table', 'delete from']
SEV_ORDER = ['low', 'medium', 'high', 'critical']


def _adj_sev(predicted, target):
    """Check if severity is off by exactly one level (partial credit)."""
    try:
        return abs(SEV_ORDER.index(predicted) - SEV_ORDER.index(target)) == 1
    except ValueError:
        return False


def _score_identify(action: Dict, case: Dict) -> float:
    """Score vulnerability identification.
    
    FIX: CVSS partial-credit window tightened from Β±3.0 to Β±1.5.
    Previously a model guessing CVSS=5.0 on a [7.5, 9.8] range got 0.5 credit.
    Now it must be within 1.5 of the midpoint to get any partial credit.
    
    Weights: vuln_type=45%, CVSS=30%, severity=25%
    """
    # Detection: correct vuln_type? (45% weight)
    det = 1.0 if action.get('vuln_type') == case.get('expected_vuln_type', '') else 0.0

    # CVSS: within expected range? (30% weight)
    # FIX: Tightened partial credit window from 3.0 to 1.5
    lo, hi = case.get('cvss_range', [0.0, 10.0])
    midpoint = (lo + hi) / 2
    try:
        v = float(action.get('cvss_score', -1))
    except (TypeError, ValueError):
        v = -1.0

    if lo <= v <= hi:
        cvss = 1.0
    elif abs(v - midpoint) <= 1.5:  # FIX: was 3.0
        cvss = 0.4  # FIX: was 0.5 β€” tighter partial credit
    else:
        cvss = 0.0

    # Severity: exact match or adjacent? (25% weight)
    s, es = action.get('severity', ''), case.get('expected_severity', '')
    sev = 1.0 if s == es else (0.3 if _adj_sev(s, es) else 0.0)
    # FIX: adjacent severity was 0.4, now 0.3 β€” being one level off is meaningful

    return det * 0.45 + cvss * 0.30 + sev * 0.25


def _score_propose(action: Dict, case: Dict) -> float:
    """Score proposed fix.
    
    FIX: 
    - Token coverage divisor changed: now we require ALL tokens, not (n-1)
    - Explanation score tightened β€” model must mention BOTH the vuln and the fix mechanism
    - Removed the 0.25 floor β€” a blank or wrong fix_code should score low
    
    Weights: code=55%, explanation=35%, identifier=10%
    """
    tokens = case.get('required_fix_tokens', [])
    if isinstance(tokens, dict):
        tokens = tokens.get(case.get('expected_vuln_type', ''), [])

    def flatten(lst):
        result = []
        for item in lst:
            if isinstance(item, list):
                result.extend(flatten(item))
            elif isinstance(item, str):
                result.append(item)
        return result

    tokens = flatten(tokens) if isinstance(tokens, list) else []

    fix = action.get('fix_code', '')
    if not fix or len(fix.strip()) < 5:
        return 0.05  # FIX: was 0.0 β†’ 0.05 (minimal signal so training doesn't stall)

    # FIX: Token coverage β€” now require ALL tokens (not n-1)
    # This is the main fix: previously len(tokens)-1 in denominator let 1 missing token score 100%
    if tokens:
        matched = sum(1 for t in tokens if t.lower() in fix.lower())
        coverage = matched / len(tokens)  # FIX: was / max(1, len(tokens)-1)
    else:
        coverage = 0.40  # Unknown tokens: give neutral score

    # Identifier preservation (10%)
    key_id = case.get('must_preserve_identifier', '')
    preservation = 0.10 if key_id and key_id in fix else 0.0

    # FIX: Explanation quality (35%) β€” tightened
    explanation = action.get('explanation', '')
    exp_score = 0.0
    if explanation and len(explanation) >= 20:
        # Must mention the mechanism (how the fix works)
        mechanism_words = ['prevent', 'secure', 'validate', 'sanitize', 'parameterize',
                          'escape', 'encode', 'whitelist', 'authenticate', 'authorize']
        mech_hits = sum(0.05 for kw in mechanism_words if kw in explanation.lower())
        exp_score += min(0.20, mech_hits)  # cap mechanism score at 0.20

        # Must mention the vulnerability type
        vuln_type = case.get('expected_vuln_type', '').replace('_', ' ')
        if vuln_type and vuln_type in explanation.lower():
            exp_score += 0.15  # bonus for naming the vuln correctly

    # FIX: Weights adjusted: code 55%, explanation 35%, identifier 10%
    # Previously: code 60%, explanation 30%, identifier 10%
    raw = coverage * 0.55 + exp_score * 0.35 + preservation * 0.10
    # FIX: Removed the max(0.25, ...) floor β€” bad fixes should score low
    return max(0.05, safe_score(raw))


def _score_revise(action: Dict, case: Dict) -> float:
    """Score revised fix after reviewer feedback.
    
    FIX:
    - Floor lowered from 0.20 to 0.10 β€” this is the hardest action, it should be hardest to score
    - Coverage now checks ALL feedback keywords, not (n-1)
    - Regression penalty doubled from -0.20 to -0.35
    - Requires BOTH addressed_feedback AND fix_code to score well
    
    This is intentionally the hardest scorer because revise_fix only happens on hard tasks.
    """
    kw = case.get('current_feedback_keywords', [])
    addressed = action.get('addressed_feedback', '')
    fix = action.get('fix_code', '')

    if not addressed or len(addressed.strip()) < 10:
        return 0.10

    if not fix or len(fix.strip()) < 5:
        return 0.10

    # FIX: Coverage now requires ALL keywords (was n-1)
    if kw:
        cov = sum(1 for k in kw if k.lower() in addressed.lower()) / len(kw)
        # FIX: was / max(1, len(kw)-1)
    else:
        cov = 0.50

    # FIX: Regression penalty doubled: -0.35 (was -0.20)
    reg = 0.35 if case.get('original_vuln_pattern', '') in fix else 0.0

    # Check if fix_code is actually different from previous (no copy-paste regression)
    fix_quality = 0.20 if len(fix) > 30 else 0.0

    # FIX: Floor lowered from 0.20 to 0.10
    return max(0.10, safe_score(cov * 0.60 + fix_quality * 0.20 - reg))


def compute_correctness(action: Dict, case: Dict) -> float:
    """Route to correct scoring function based on action_type."""
    atype = action.get('action_type')
    if atype == 'identify_vulnerability':
        return _score_identify(action, case)
    if atype == 'propose_fix':
        return _score_propose(action, case)
    if atype == 'revise_fix':
        return _score_revise(action, case)
    return None


def grade(action: Dict = None, session: Any = None) -> float:
    """Entry point called by router. Runs full reward pipeline.
    Survives parameterless reflection testing by returning 0.01.
    """
    if action is None or session is None:
        return 0.01
    return grade_dynamic(action, session, compute_correctness, VALID_ACTIONS, FORBIDDEN, max_steps=8)