Spaces:
Running
Running
File size: 5,460 Bytes
63a6397 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """Deterministic graders for the Cyber Analyst OpenEnv tasks."""
from __future__ import annotations
import json
from typing import Any
try:
from .tasks import SCENARIOS
except ImportError: # pragma: no cover - supports direct module execution
from tasks import SCENARIOS
MIN_SCORE = 0.01
MAX_SCORE = 0.99
def safe_reward(score: float | int | None) -> float:
"""Clamp validator-facing scores to the strict open interval (0, 1)."""
try:
value = float(score if score is not None else 0.0)
except (TypeError, ValueError):
value = 0.0
return max(MIN_SCORE, min(MAX_SCORE, value))
def _coerce_report(report: Any) -> dict[str, Any]:
if isinstance(report, dict):
return report
if isinstance(report, str):
try:
decoded = json.loads(report)
except json.JSONDecodeError:
return {"summary": report, "findings": []}
return decoded if isinstance(decoded, dict) else {"findings": []}
return {"findings": []}
def _text_contains_any(text: str, keywords: list[str]) -> bool:
lowered = text.lower()
return any(keyword.lower() in lowered for keyword in keywords)
def _report_findings(report: dict[str, Any]) -> list[dict[str, Any]]:
findings = report.get("findings", [])
if isinstance(findings, dict):
findings = [findings]
return [finding for finding in findings if isinstance(finding, dict)]
def score_report(
task_id: str,
report: Any,
verified_findings: list[dict[str, Any]] | None = None,
validation_attempted: bool = False,
) -> tuple[float, dict[str, Any]]:
"""Score a submitted report against one task's deterministic ground truth."""
scenario = SCENARIOS.get(task_id)
report_dict = _coerce_report(report)
report_findings = _report_findings(report_dict)
verified_findings = verified_findings or []
if scenario is None:
return MIN_SCORE, {"unknown_task": task_id}
expected_type = scenario["finding_type"]
expected_evidence = set(scenario.get("required_evidence", [])) | set(
scenario.get("supporting_evidence", [])
)
matching_verified = [
finding
for finding in verified_findings
if finding.get("finding_type") == expected_type
]
matching_report = [
finding for finding in report_findings if finding.get("finding_type") == expected_type
]
score = 0.05
breakdown: dict[str, Any] = {
"base": 0.05,
"verified_correct": 0.0,
"valid_evidence": 0.0,
"actionable_remediation": 0.0,
"hallucination_penalty": 0.0,
"validation_penalty": 0.0,
}
if matching_verified and matching_report:
impact_text = " ".join(
str(finding.get("impact", "")) + " " + str(finding.get("description", ""))
for finding in matching_report
)
if _text_contains_any(impact_text, scenario.get("impact_keywords", [])):
score += 0.60
breakdown["verified_correct"] = 0.60
report_evidence: set[str] = set()
for finding in matching_report:
evidence_ids = finding.get("evidence_ids", [])
if isinstance(evidence_ids, str):
evidence_ids = [evidence_ids]
report_evidence.update(str(evidence_id) for evidence_id in evidence_ids)
if report_evidence & expected_evidence:
score += 0.15
breakdown["valid_evidence"] = 0.15
remediation_text = " ".join(
str(finding.get("remediation", "")) for finding in matching_report
)
if _text_contains_any(remediation_text, scenario.get("remediation_keywords", [])):
score += 0.15
breakdown["actionable_remediation"] = 0.15
verified_types = {finding.get("finding_type") for finding in verified_findings}
hallucinated = [
finding
for finding in report_findings
if finding.get("finding_type") not in verified_types
]
if hallucinated:
penalty = 0.40 * len(hallucinated)
score -= penalty
breakdown["hallucination_penalty"] = -penalty
if not validation_attempted:
score -= 0.20
breakdown["validation_penalty"] = -0.20
final_score = safe_reward(score)
breakdown["raw_score"] = round(score, 4)
breakdown["score"] = final_score
return final_score, breakdown
def _payload_from_args(*args: Any, **kwargs: Any) -> dict[str, Any]:
if args and isinstance(args[0], dict):
payload = dict(args[0])
else:
payload = {}
payload.update(kwargs)
return payload
def grade_task(task_id: str, *args: Any, **kwargs: Any) -> float:
"""Manifest-friendly grader adapter."""
payload = _payload_from_args(*args, **kwargs)
report = payload.get("report") or payload.get("report_json") or payload
verified_findings = payload.get("verified_findings", [])
validation_attempted = bool(payload.get("validation_attempted", False))
score, _ = score_report(task_id, report, verified_findings, validation_attempted)
return score
def grade_secret_exposure_easy(*args: Any, **kwargs: Any) -> float:
return grade_task("secret_exposure_easy", *args, **kwargs)
def grade_missing_security_headers_medium(*args: Any, **kwargs: Any) -> float:
return grade_task("missing_security_headers_medium", *args, **kwargs)
def grade_authz_boundary_hard(*args: Any, **kwargs: Any) -> float:
return grade_task("authz_boundary_hard", *args, **kwargs)
|