Spaces:
Running
Running
File size: 7,163 Bytes
4ec75cf 72b3e8d 4ec75cf 699f953 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 6f95f2a 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 6f95f2a 4ec75cf 6f95f2a 72b3e8d 6f95f2a 72b3e8d 6f95f2a 72b3e8d 6f95f2a 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 72b3e8d 4ec75cf 699f953 4ec75cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | # server/graders/security_grader.py
# Grader for MCP Security Sandbox tasks (sec_easy, sec_medium, sec_hard).
#
# FIX SUMMARY:
# 1. _score_identify: CVSS partial credit was too generous (Β±3.0 range β Β±1.5)
# 2. _score_propose: floor raised from 0.0 to 0.15, but explanation scoring tightened
# 3. _score_revise: floor raised from 0.20 to 0.10 β revise should be hard
# 4. All three scorers now have tighter weights that produce real variance
from typing import Dict, Any
from .base_grader import grade_dynamic, safe_score
VALID_ACTIONS = ['identify_vulnerability', 'propose_fix', 'revise_fix']
FORBIDDEN = ['os.remove', 'shutil.rmtree', 'drop table', 'delete from']
SEV_ORDER = ['low', 'medium', 'high', 'critical']
def _adj_sev(predicted, target):
"""Check if severity is off by exactly one level (partial credit)."""
try:
return abs(SEV_ORDER.index(predicted) - SEV_ORDER.index(target)) == 1
except ValueError:
return False
def _score_identify(action: Dict, case: Dict) -> float:
"""Score vulnerability identification.
FIX: CVSS partial-credit window tightened from Β±3.0 to Β±1.5.
Previously a model guessing CVSS=5.0 on a [7.5, 9.8] range got 0.5 credit.
Now it must be within 1.5 of the midpoint to get any partial credit.
Weights: vuln_type=45%, CVSS=30%, severity=25%
"""
# Detection: correct vuln_type? (45% weight)
det = 1.0 if action.get('vuln_type') == case.get('expected_vuln_type', '') else 0.0
# CVSS: within expected range? (30% weight)
# FIX: Tightened partial credit window from 3.0 to 1.5
lo, hi = case.get('cvss_range', [0.0, 10.0])
midpoint = (lo + hi) / 2
try:
v = float(action.get('cvss_score', -1))
except (TypeError, ValueError):
v = -1.0
if lo <= v <= hi:
cvss = 1.0
elif abs(v - midpoint) <= 1.5: # FIX: was 3.0
cvss = 0.4 # FIX: was 0.5 β tighter partial credit
else:
cvss = 0.0
# Severity: exact match or adjacent? (25% weight)
s, es = action.get('severity', ''), case.get('expected_severity', '')
sev = 1.0 if s == es else (0.3 if _adj_sev(s, es) else 0.0)
# FIX: adjacent severity was 0.4, now 0.3 β being one level off is meaningful
return det * 0.45 + cvss * 0.30 + sev * 0.25
def _score_propose(action: Dict, case: Dict) -> float:
"""Score proposed fix.
FIX:
- Token coverage divisor changed: now we require ALL tokens, not (n-1)
- Explanation score tightened β model must mention BOTH the vuln and the fix mechanism
- Removed the 0.25 floor β a blank or wrong fix_code should score low
Weights: code=55%, explanation=35%, identifier=10%
"""
tokens = case.get('required_fix_tokens', [])
if isinstance(tokens, dict):
tokens = tokens.get(case.get('expected_vuln_type', ''), [])
def flatten(lst):
result = []
for item in lst:
if isinstance(item, list):
result.extend(flatten(item))
elif isinstance(item, str):
result.append(item)
return result
tokens = flatten(tokens) if isinstance(tokens, list) else []
fix = action.get('fix_code', '')
if not fix or len(fix.strip()) < 5:
return 0.05 # FIX: was 0.0 β 0.05 (minimal signal so training doesn't stall)
# FIX: Token coverage β now require ALL tokens (not n-1)
# This is the main fix: previously len(tokens)-1 in denominator let 1 missing token score 100%
if tokens:
matched = sum(1 for t in tokens if t.lower() in fix.lower())
coverage = matched / len(tokens) # FIX: was / max(1, len(tokens)-1)
else:
coverage = 0.40 # Unknown tokens: give neutral score
# Identifier preservation (10%)
key_id = case.get('must_preserve_identifier', '')
preservation = 0.10 if key_id and key_id in fix else 0.0
# FIX: Explanation quality (35%) β tightened
explanation = action.get('explanation', '')
exp_score = 0.0
if explanation and len(explanation) >= 20:
# Must mention the mechanism (how the fix works)
mechanism_words = ['prevent', 'secure', 'validate', 'sanitize', 'parameterize',
'escape', 'encode', 'whitelist', 'authenticate', 'authorize']
mech_hits = sum(0.05 for kw in mechanism_words if kw in explanation.lower())
exp_score += min(0.20, mech_hits) # cap mechanism score at 0.20
# Must mention the vulnerability type
vuln_type = case.get('expected_vuln_type', '').replace('_', ' ')
if vuln_type and vuln_type in explanation.lower():
exp_score += 0.15 # bonus for naming the vuln correctly
# FIX: Weights adjusted: code 55%, explanation 35%, identifier 10%
# Previously: code 60%, explanation 30%, identifier 10%
raw = coverage * 0.55 + exp_score * 0.35 + preservation * 0.10
# FIX: Removed the max(0.25, ...) floor β bad fixes should score low
return max(0.05, safe_score(raw))
def _score_revise(action: Dict, case: Dict) -> float:
"""Score revised fix after reviewer feedback.
FIX:
- Floor lowered from 0.20 to 0.10 β this is the hardest action, it should be hardest to score
- Coverage now checks ALL feedback keywords, not (n-1)
- Regression penalty doubled from -0.20 to -0.35
- Requires BOTH addressed_feedback AND fix_code to score well
This is intentionally the hardest scorer because revise_fix only happens on hard tasks.
"""
kw = case.get('current_feedback_keywords', [])
addressed = action.get('addressed_feedback', '')
fix = action.get('fix_code', '')
if not addressed or len(addressed.strip()) < 10:
return 0.10
if not fix or len(fix.strip()) < 5:
return 0.10
# FIX: Coverage now requires ALL keywords (was n-1)
if kw:
cov = sum(1 for k in kw if k.lower() in addressed.lower()) / len(kw)
# FIX: was / max(1, len(kw)-1)
else:
cov = 0.50
# FIX: Regression penalty doubled: -0.35 (was -0.20)
reg = 0.35 if case.get('original_vuln_pattern', '') in fix else 0.0
# Check if fix_code is actually different from previous (no copy-paste regression)
fix_quality = 0.20 if len(fix) > 30 else 0.0
# FIX: Floor lowered from 0.20 to 0.10
return max(0.10, safe_score(cov * 0.60 + fix_quality * 0.20 - reg))
def compute_correctness(action: Dict, case: Dict) -> float:
"""Route to correct scoring function based on action_type."""
atype = action.get('action_type')
if atype == 'identify_vulnerability':
return _score_identify(action, case)
if atype == 'propose_fix':
return _score_propose(action, case)
if atype == 'revise_fix':
return _score_revise(action, case)
return None
def grade(action: Dict = None, session: Any = None) -> float:
"""Entry point called by router. Runs full reward pipeline.
Survives parameterless reflection testing by returning 0.01.
"""
if action is None or session is None:
return 0.01
return grade_dynamic(action, session, compute_correctness, VALID_ACTIONS, FORBIDDEN, max_steps=8)
|