| """ |
| Task 2: Vulnerability Remediation (Medium) |
| |
| Agent must propose correct fixes for identified vulnerabilities. |
| Scored on: correct fix version, valid replacement code, budget management. |
| """ |
| from __future__ import annotations |
|
|
| from typing import Any, Dict, Tuple |
|
|
| from .base import TaskHandler, apply_identification, apply_remediation |
| from env.models import Action, EngineState |
| from env.verification import task_completion_score |
|
|
|
|
| class Task2Handler(TaskHandler): |
| """ |
| Remediation task: agent proposes fixes for code vulnerabilities. |
| |
| Actions: |
| - identify: find vulns first (same as task 1) |
| - remediate: propose a fix for a specific CVE |
| - done: end episode |
| |
| Scoring: fraction of vulns correctly remediated, minus budget overruns. |
| """ |
|
|
| def execute(self, state: EngineState, action: Action) -> Tuple[float, Dict[str, Any]]: |
| if action.action_type == "done": |
| state.done = True |
| sc = task_completion_score(state, state.task_id) |
| return sc, {"sc": round(sc, 4)} |
|
|
| if action.action_type == "identify": |
| return self._handle_identify(state, action) |
|
|
| if action.action_type == "remediate": |
| return self._handle_remediate(state, action) |
|
|
| state.last_action_error = f"Unsupported: {action.action_type}" |
| return 0.0, {} |
|
|
| def _handle_identify(self, state: EngineState, action: Action) -> Tuple[float, Dict[str, Any]]: |
| if not action.findings: |
| state.last_action_error = "No findings" |
| return 0.0, {} |
|
|
| info = apply_identification(state, action.findings) |
| return task_completion_score(state, state.task_id), { |
| "identified": info["accepted"], |
| "partial": info["partial"], |
| "fp": info["fp"], |
| "evidence_gain": info["evidence_gain"], |
| } |
|
|
| def _handle_remediate(self, state: EngineState, action: Action) -> Tuple[float, Dict[str, Any]]: |
| if not action.remediation: |
| state.last_action_error = "No remediation provided" |
| return 0.0, {} |
|
|
| rem = action.remediation |
| cve_id = rem.cve_id |
|
|
| if cve_id not in state.ground_truth_vulns: |
| state.invalid_remediations += 1 |
| state.last_action_error = f"CVE {cve_id} not in ground truth" |
| return -0.15, {"error": "not_present"} |
|
|
| if cve_id in state.remediated_vulns: |
| state.last_action_error = "Already remediated" |
| return -0.05, {} |
|
|
| if state.budget_points < 2: |
| state.invalid_remediations += 1 |
| state.last_action_error = "Insufficient budget" |
| return -0.1, {} |
| state.budget_points -= 2 |
|
|
| info = {"cve": cve_id, **apply_remediation(state, rem)} |
|
|
| all_fixed = len(state.remediated_vulns) >= state.initial_vuln_count |
| if all_fixed: |
| state.done = True |
| sc = task_completion_score(state, state.task_id) |
| return sc, {**info, "sc": round(sc, 4)} |
|
|
| return task_completion_score(state, state.task_id), info |
|
|