EntropyEnv / server /graders /security_grader.py
immortalindeed's picture
Fix Phase 2 OpenEnv validation traps: add grader paths to openenv.yaml and safe parameterless defaults
699f953
# server/graders/security_grader.py
# Grader for MCP Security Sandbox tasks (sec_easy, sec_medium, sec_hard).
#
# FIX SUMMARY:
# 1. _score_identify: CVSS partial credit was too generous (Β±3.0 range β†’ Β±1.5)
# 2. _score_propose: floor raised from 0.0 to 0.15, but explanation scoring tightened
# 3. _score_revise: floor raised from 0.20 to 0.10 β€” revise should be hard
# 4. All three scorers now have tighter weights that produce real variance
from typing import Dict, Any
from .base_grader import grade_dynamic, safe_score
VALID_ACTIONS = ['identify_vulnerability', 'propose_fix', 'revise_fix']
FORBIDDEN = ['os.remove', 'shutil.rmtree', 'drop table', 'delete from']
SEV_ORDER = ['low', 'medium', 'high', 'critical']
def _adj_sev(predicted, target):
"""Check if severity is off by exactly one level (partial credit)."""
try:
return abs(SEV_ORDER.index(predicted) - SEV_ORDER.index(target)) == 1
except ValueError:
return False
def _score_identify(action: Dict, case: Dict) -> float:
"""Score vulnerability identification.
FIX: CVSS partial-credit window tightened from Β±3.0 to Β±1.5.
Previously a model guessing CVSS=5.0 on a [7.5, 9.8] range got 0.5 credit.
Now it must be within 1.5 of the midpoint to get any partial credit.
Weights: vuln_type=45%, CVSS=30%, severity=25%
"""
# Detection: correct vuln_type? (45% weight)
det = 1.0 if action.get('vuln_type') == case.get('expected_vuln_type', '') else 0.0
# CVSS: within expected range? (30% weight)
# FIX: Tightened partial credit window from 3.0 to 1.5
lo, hi = case.get('cvss_range', [0.0, 10.0])
midpoint = (lo + hi) / 2
try:
v = float(action.get('cvss_score', -1))
except (TypeError, ValueError):
v = -1.0
if lo <= v <= hi:
cvss = 1.0
elif abs(v - midpoint) <= 1.5: # FIX: was 3.0
cvss = 0.4 # FIX: was 0.5 β€” tighter partial credit
else:
cvss = 0.0
# Severity: exact match or adjacent? (25% weight)
s, es = action.get('severity', ''), case.get('expected_severity', '')
sev = 1.0 if s == es else (0.3 if _adj_sev(s, es) else 0.0)
# FIX: adjacent severity was 0.4, now 0.3 β€” being one level off is meaningful
return det * 0.45 + cvss * 0.30 + sev * 0.25
def _score_propose(action: Dict, case: Dict) -> float:
"""Score proposed fix.
FIX:
- Token coverage divisor changed: now we require ALL tokens, not (n-1)
- Explanation score tightened β€” model must mention BOTH the vuln and the fix mechanism
- Removed the 0.25 floor β€” a blank or wrong fix_code should score low
Weights: code=55%, explanation=35%, identifier=10%
"""
tokens = case.get('required_fix_tokens', [])
if isinstance(tokens, dict):
tokens = tokens.get(case.get('expected_vuln_type', ''), [])
def flatten(lst):
result = []
for item in lst:
if isinstance(item, list):
result.extend(flatten(item))
elif isinstance(item, str):
result.append(item)
return result
tokens = flatten(tokens) if isinstance(tokens, list) else []
fix = action.get('fix_code', '')
if not fix or len(fix.strip()) < 5:
return 0.05 # FIX: was 0.0 β†’ 0.05 (minimal signal so training doesn't stall)
# FIX: Token coverage β€” now require ALL tokens (not n-1)
# This is the main fix: previously len(tokens)-1 in denominator let 1 missing token score 100%
if tokens:
matched = sum(1 for t in tokens if t.lower() in fix.lower())
coverage = matched / len(tokens) # FIX: was / max(1, len(tokens)-1)
else:
coverage = 0.40 # Unknown tokens: give neutral score
# Identifier preservation (10%)
key_id = case.get('must_preserve_identifier', '')
preservation = 0.10 if key_id and key_id in fix else 0.0
# FIX: Explanation quality (35%) β€” tightened
explanation = action.get('explanation', '')
exp_score = 0.0
if explanation and len(explanation) >= 20:
# Must mention the mechanism (how the fix works)
mechanism_words = ['prevent', 'secure', 'validate', 'sanitize', 'parameterize',
'escape', 'encode', 'whitelist', 'authenticate', 'authorize']
mech_hits = sum(0.05 for kw in mechanism_words if kw in explanation.lower())
exp_score += min(0.20, mech_hits) # cap mechanism score at 0.20
# Must mention the vulnerability type
vuln_type = case.get('expected_vuln_type', '').replace('_', ' ')
if vuln_type and vuln_type in explanation.lower():
exp_score += 0.15 # bonus for naming the vuln correctly
# FIX: Weights adjusted: code 55%, explanation 35%, identifier 10%
# Previously: code 60%, explanation 30%, identifier 10%
raw = coverage * 0.55 + exp_score * 0.35 + preservation * 0.10
# FIX: Removed the max(0.25, ...) floor β€” bad fixes should score low
return max(0.05, safe_score(raw))
def _score_revise(action: Dict, case: Dict) -> float:
"""Score revised fix after reviewer feedback.
FIX:
- Floor lowered from 0.20 to 0.10 β€” this is the hardest action, it should be hardest to score
- Coverage now checks ALL feedback keywords, not (n-1)
- Regression penalty doubled from -0.20 to -0.35
- Requires BOTH addressed_feedback AND fix_code to score well
This is intentionally the hardest scorer because revise_fix only happens on hard tasks.
"""
kw = case.get('current_feedback_keywords', [])
addressed = action.get('addressed_feedback', '')
fix = action.get('fix_code', '')
if not addressed or len(addressed.strip()) < 10:
return 0.10
if not fix or len(fix.strip()) < 5:
return 0.10
# FIX: Coverage now requires ALL keywords (was n-1)
if kw:
cov = sum(1 for k in kw if k.lower() in addressed.lower()) / len(kw)
# FIX: was / max(1, len(kw)-1)
else:
cov = 0.50
# FIX: Regression penalty doubled: -0.35 (was -0.20)
reg = 0.35 if case.get('original_vuln_pattern', '') in fix else 0.0
# Check if fix_code is actually different from previous (no copy-paste regression)
fix_quality = 0.20 if len(fix) > 30 else 0.0
# FIX: Floor lowered from 0.20 to 0.10
return max(0.10, safe_score(cov * 0.60 + fix_quality * 0.20 - reg))
def compute_correctness(action: Dict, case: Dict) -> float:
"""Route to correct scoring function based on action_type."""
atype = action.get('action_type')
if atype == 'identify_vulnerability':
return _score_identify(action, case)
if atype == 'propose_fix':
return _score_propose(action, case)
if atype == 'revise_fix':
return _score_revise(action, case)
return None
def grade(action: Dict = None, session: Any = None) -> float:
"""Entry point called by router. Runs full reward pipeline.
Survives parameterless reflection testing by returning 0.01.
"""
if action is None or session is None:
return 0.01
return grade_dynamic(action, session, compute_correctness, VALID_ACTIONS, FORBIDDEN, max_steps=8)