"""Deterministic graders for the Cyber Analyst OpenEnv tasks.""" from __future__ import annotations import json from typing import Any try: from .tasks import SCENARIOS except ImportError: # pragma: no cover - supports direct module execution from tasks import SCENARIOS MIN_SCORE = 0.01 MAX_SCORE = 0.99 def safe_reward(score: float | int | None) -> float: """Clamp validator-facing scores to the strict open interval (0, 1).""" try: value = float(score if score is not None else 0.0) except (TypeError, ValueError): value = 0.0 return max(MIN_SCORE, min(MAX_SCORE, value)) def _coerce_report(report: Any) -> dict[str, Any]: if isinstance(report, dict): return report if isinstance(report, str): try: decoded = json.loads(report) except json.JSONDecodeError: return {"summary": report, "findings": []} return decoded if isinstance(decoded, dict) else {"findings": []} return {"findings": []} def _text_contains_any(text: str, keywords: list[str]) -> bool: lowered = text.lower() return any(keyword.lower() in lowered for keyword in keywords) def _report_findings(report: dict[str, Any]) -> list[dict[str, Any]]: findings = report.get("findings", []) if isinstance(findings, dict): findings = [findings] return [finding for finding in findings if isinstance(finding, dict)] def score_report( task_id: str, report: Any, verified_findings: list[dict[str, Any]] | None = None, validation_attempted: bool = False, ) -> tuple[float, dict[str, Any]]: """Score a submitted report against one task's deterministic ground truth.""" scenario = SCENARIOS.get(task_id) report_dict = _coerce_report(report) report_findings = _report_findings(report_dict) verified_findings = verified_findings or [] if scenario is None: return MIN_SCORE, {"unknown_task": task_id} expected_type = scenario["finding_type"] expected_evidence = set(scenario.get("required_evidence", [])) | set( scenario.get("supporting_evidence", []) ) matching_verified = [ finding for finding in verified_findings if finding.get("finding_type") == expected_type ] matching_report = [ finding for finding in report_findings if finding.get("finding_type") == expected_type ] score = 0.05 breakdown: dict[str, Any] = { "base": 0.05, "verified_correct": 0.0, "valid_evidence": 0.0, "actionable_remediation": 0.0, "hallucination_penalty": 0.0, "validation_penalty": 0.0, } if matching_verified and matching_report: impact_text = " ".join( str(finding.get("impact", "")) + " " + str(finding.get("description", "")) for finding in matching_report ) if _text_contains_any(impact_text, scenario.get("impact_keywords", [])): score += 0.60 breakdown["verified_correct"] = 0.60 report_evidence: set[str] = set() for finding in matching_report: evidence_ids = finding.get("evidence_ids", []) if isinstance(evidence_ids, str): evidence_ids = [evidence_ids] report_evidence.update(str(evidence_id) for evidence_id in evidence_ids) if report_evidence & expected_evidence: score += 0.15 breakdown["valid_evidence"] = 0.15 remediation_text = " ".join( str(finding.get("remediation", "")) for finding in matching_report ) if _text_contains_any(remediation_text, scenario.get("remediation_keywords", [])): score += 0.15 breakdown["actionable_remediation"] = 0.15 verified_types = {finding.get("finding_type") for finding in verified_findings} hallucinated = [ finding for finding in report_findings if finding.get("finding_type") not in verified_types ] if hallucinated: penalty = 0.40 * len(hallucinated) score -= penalty breakdown["hallucination_penalty"] = -penalty if not validation_attempted: score -= 0.20 breakdown["validation_penalty"] = -0.20 final_score = safe_reward(score) breakdown["raw_score"] = round(score, 4) breakdown["score"] = final_score return final_score, breakdown def _payload_from_args(*args: Any, **kwargs: Any) -> dict[str, Any]: if args and isinstance(args[0], dict): payload = dict(args[0]) else: payload = {} payload.update(kwargs) return payload def grade_task(task_id: str, *args: Any, **kwargs: Any) -> float: """Manifest-friendly grader adapter.""" payload = _payload_from_args(*args, **kwargs) report = payload.get("report") or payload.get("report_json") or payload verified_findings = payload.get("verified_findings", []) validation_attempted = bool(payload.get("validation_attempted", False)) score, _ = score_report(task_id, report, verified_findings, validation_attempted) return score def grade_secret_exposure_easy(*args: Any, **kwargs: Any) -> float: return grade_task("secret_exposure_easy", *args, **kwargs) def grade_missing_security_headers_medium(*args: Any, **kwargs: Any) -> float: return grade_task("missing_security_headers_medium", *args, **kwargs) def grade_authz_boundary_hard(*args: Any, **kwargs: Any) -> float: return grade_task("authz_boundary_hard", *args, **kwargs)