Spaces:
Sleeping
Sleeping
| import random | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from app.models import ( | |
| Action, ActionType, Finding, Observation, ReviewContext, | |
| FileContent, Reward, TaskInfo, ScenarioConfig | |
| ) | |
| from app.tasks.task_registry import TaskRegistry | |
| from app.graders.dependency_grader import DependencyGrader | |
| from app.graders.iac_grader import IaCGrader | |
| from app.graders.migration_grader import MigrationGrader | |
| from app.graders.base import BaseGrader | |
| GRADER_MAP: Dict[str, BaseGrader] = { | |
| "dependency_review": DependencyGrader(), | |
| "iac_review": IaCGrader(), | |
| "migration_review": MigrationGrader(), | |
| } | |
| # Rolling average thresholds for adaptive task selection. | |
| # If rolling avg < 0.30 → easy (dependency), < 0.60 → medium (iac), else → hard (migration). | |
| _ADAPTIVE_THRESHOLDS = [ | |
| (0.30, "dependency_review"), | |
| (0.60, "iac_review"), | |
| ] | |
| _ADAPTIVE_FALLBACK_TASK = "migration_review" | |
| _SCORE_HISTORY_WINDOW = 5 # scores used for rolling average | |
| _SCORE_HISTORY_MAX = 20 # total scores retained | |
| class EpisodeState: | |
| def __init__( | |
| self, | |
| task_id: str, | |
| task_info: TaskInfo, | |
| scenario: ScenarioConfig, | |
| ): | |
| self.task_id = task_id | |
| self.task_info = task_info | |
| self.scenario = scenario | |
| self.current_step = 0 | |
| self.max_steps = task_info.max_steps | |
| self.findings: List[Finding] = [] | |
| self.done = False | |
| self.last_feedback: Optional[str] = None | |
| self.final_reward: Optional[Reward] = None | |
| # Track which files the agent has access to | |
| self.revealed_files: Dict[str, FileContent] = {} | |
| for fname in scenario.initial_files: | |
| if fname in scenario.files: | |
| self.revealed_files[fname] = scenario.files[fname] | |
| class SecureReviewEnvironment: | |
| def __init__(self): | |
| self.registry = TaskRegistry() | |
| self._state: Optional[EpisodeState] = None | |
| self._score_history: List[float] = [] | |
| def get_tasks(self) -> List[TaskInfo]: | |
| return self.registry.get_tasks() | |
| def reset( | |
| self, | |
| task_id: Optional[str] = None, | |
| scenario_id: Optional[str] = None, | |
| adaptive: bool = False, | |
| ) -> Tuple[Observation, Dict[str, Any]]: | |
| # Adaptive mode: auto-select task and scenario tier based on score history. | |
| if adaptive: | |
| if task_id is None: | |
| task_id = self._get_adaptive_task_id() | |
| if scenario_id is None: | |
| tier = self._get_adaptive_tier() | |
| scenario = self._get_scenario_for_tier(task_id, tier) | |
| else: | |
| scenario = self.registry.get_scenario(task_id, scenario_id) | |
| else: | |
| if task_id is None: | |
| task_id = "dependency_review" | |
| if scenario_id: | |
| scenario = self.registry.get_scenario(task_id, scenario_id) | |
| else: | |
| scenario = self.registry.get_random_scenario(task_id) | |
| task_info = self.registry.get_task_info(task_id) | |
| self._state = EpisodeState( | |
| task_id=task_id, | |
| task_info=task_info, | |
| scenario=scenario, | |
| ) | |
| observation = self._build_observation() | |
| info = { | |
| "task_id": task_id, | |
| "scenario_id": scenario.scenario_id, | |
| "difficulty": task_info.difficulty.value, | |
| "max_steps": task_info.max_steps, | |
| "adaptive": adaptive, | |
| } | |
| return observation, info | |
| def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]: | |
| if self._state is None: | |
| raise RuntimeError("No active episode. Call /reset first.") | |
| if self._state.done: | |
| raise RuntimeError("Episode already complete. Call /reset to start a new one.") | |
| state = self._state | |
| state.current_step += 1 | |
| reward = 0.0 | |
| info: Dict[str, Any] = {"step": state.current_step} | |
| if action.action_type == ActionType.REPORT_FINDING: | |
| reward, feedback = self._handle_report_finding(action, state) | |
| state.last_feedback = feedback | |
| elif action.action_type == ActionType.REQUEST_CONTEXT: | |
| feedback = self._handle_request_context(action, state) | |
| state.last_feedback = feedback | |
| elif action.action_type == ActionType.REQUEST_FILE_LIST: | |
| all_files = list(state.scenario.files.keys()) | |
| state.last_feedback = f"Available files: {', '.join(all_files)}" | |
| elif action.action_type == ActionType.MARK_COMPLETE: | |
| return self._finish_episode(state, info, "Agent marked review complete.") | |
| # Check step budget | |
| if state.current_step >= state.max_steps and not state.done: | |
| return self._finish_episode(state, info, "Step budget exhausted.") | |
| observation = self._build_observation() | |
| return observation, reward, state.done, info | |
| def get_state(self) -> Dict[str, Any]: | |
| if self._state is None: | |
| raise RuntimeError("No active episode. Call /reset first.") | |
| state = self._state | |
| return { | |
| "task_id": state.task_id, | |
| "scenario_id": state.scenario.scenario_id, | |
| "current_step": state.current_step, | |
| "max_steps": state.max_steps, | |
| "done": state.done, | |
| "findings_count": len(state.findings), | |
| "revealed_files": list(state.revealed_files.keys()), | |
| "final_score": state.final_reward.score if state.final_reward else None, | |
| } | |
| def get_curriculum(self) -> Dict[str, Any]: | |
| """Return the adaptive curriculum state for the /curriculum endpoint.""" | |
| recent = self._score_history[-_SCORE_HISTORY_WINDOW:] | |
| rolling_avg = sum(recent) / len(recent) if recent else 0.0 | |
| task_id = self._get_adaptive_task_id() | |
| # Determine human-readable skill level and next threshold. | |
| if rolling_avg < 0.30: | |
| skill_level = "easy" | |
| next_threshold = 0.30 | |
| elif rolling_avg < 0.60: | |
| skill_level = "medium" | |
| next_threshold = 0.60 | |
| else: | |
| skill_level = "hard" | |
| next_threshold = None | |
| progress_pct: Optional[int] = None | |
| if next_threshold is not None: | |
| lower = 0.0 if skill_level == "easy" else 0.30 | |
| span = next_threshold - lower | |
| progress_pct = min(100, int((rolling_avg - lower) / span * 100)) if span > 0 else 0 | |
| return { | |
| "episodes_completed": len(self._score_history), | |
| "rolling_average": round(rolling_avg, 3), | |
| "current_skill_level": skill_level, | |
| "recommended_task": task_id, | |
| "recent_scores": [round(s, 3) for s in recent], | |
| "next_tier_threshold": next_threshold, | |
| "progress_pct": progress_pct, | |
| } | |
| # ------------------------------------------------------------------ | |
| # Adaptive curriculum helpers | |
| # ------------------------------------------------------------------ | |
| def _get_adaptive_task_id(self) -> str: | |
| """Return task_id based on rolling average of recent scores.""" | |
| recent = self._score_history[-_SCORE_HISTORY_WINDOW:] | |
| if not recent: | |
| return "dependency_review" | |
| avg = sum(recent) / len(recent) | |
| for threshold, task_id in _ADAPTIVE_THRESHOLDS: | |
| if avg < threshold: | |
| return task_id | |
| return _ADAPTIVE_FALLBACK_TASK | |
| def _get_adaptive_tier(self) -> int: | |
| """Return scenario difficulty tier (1/2/3) based on rolling average.""" | |
| recent = self._score_history[-_SCORE_HISTORY_WINDOW:] | |
| if not recent: | |
| return 1 | |
| avg = sum(recent) / len(recent) | |
| if avg < 0.30: | |
| return 1 | |
| if avg < 0.60: | |
| return 2 | |
| return 3 | |
| def _get_scenario_tier(scenario: ScenarioConfig) -> int: | |
| """Compute scenario difficulty tier from number of ground-truth findings.""" | |
| n = len(scenario.ground_truth) | |
| if n <= 3: | |
| return 1 | |
| if n <= 5: | |
| return 2 | |
| return 3 | |
| def _get_scenario_for_tier(self, task_id: str, target_tier: int) -> ScenarioConfig: | |
| """Pick a scenario matching target_tier; fall back to random if none match.""" | |
| scenarios = list(self.registry._scenarios[task_id].values()) | |
| matching = [s for s in scenarios if self._get_scenario_tier(s) == target_tier] | |
| pool = matching if matching else scenarios | |
| return random.choice(pool) | |
| # ------------------------------------------------------------------ | |
| # Episode internals | |
| # ------------------------------------------------------------------ | |
| def _handle_report_finding( | |
| self, action: Action, state: EpisodeState | |
| ) -> Tuple[float, str]: | |
| if action.finding is None: | |
| return -0.01, "Error: report_finding requires a 'finding' field." | |
| finding = action.finding | |
| state.findings.append(finding) | |
| # Small step reward: check if finding references a known file | |
| if finding.file in state.revealed_files: | |
| file_content = state.revealed_files[finding.file].content | |
| line_count = len(file_content.splitlines()) | |
| if finding.line is None or 1 <= finding.line <= line_count: | |
| reward = 0.02 | |
| feedback = ( | |
| f"Finding recorded for '{finding.file}'. " | |
| f"{state.max_steps - state.current_step} steps remaining." | |
| ) | |
| else: | |
| reward = -0.01 | |
| feedback = ( | |
| f"Finding recorded but line {finding.line} is outside file range " | |
| f"(1-{line_count}). {state.max_steps - state.current_step} steps remaining." | |
| ) | |
| else: | |
| reward = 0.0 | |
| feedback = ( | |
| f"Finding recorded for '{finding.file}' (file not yet in review context). " | |
| f"{state.max_steps - state.current_step} steps remaining." | |
| ) | |
| return reward, feedback | |
| def _handle_request_context(self, action: Action, state: EpisodeState) -> str: | |
| if not action.filename: | |
| return "Error: request_context requires a 'filename' field." | |
| fname = action.filename | |
| if fname in state.revealed_files: | |
| return f"File '{fname}' is already in your review context." | |
| if fname in state.scenario.files: | |
| state.revealed_files[fname] = state.scenario.files[fname] | |
| return f"File '{fname}' loaded into review context." | |
| else: | |
| return ( | |
| f"File '{fname}' not available. " | |
| f"Use request_file_list to see available files." | |
| ) | |
| def _finish_episode( | |
| self, state: EpisodeState, info: Dict[str, Any], reason: str | |
| ) -> Tuple[Observation, float, bool, Dict[str, Any]]: | |
| state.done = True | |
| grader = GRADER_MAP.get(state.task_id) | |
| if grader is None: | |
| raise RuntimeError(f"No grader found for task '{state.task_id}'") | |
| reward_result = grader.grade( | |
| agent_findings=state.findings, | |
| ground_truth=state.scenario.ground_truth, | |
| steps_used=state.current_step, | |
| max_steps=state.max_steps, | |
| ) | |
| state.final_reward = reward_result | |
| state.last_feedback = f"{reason} Final score: {reward_result.score}" | |
| # Record score for adaptive curriculum. | |
| self._score_history.append(reward_result.score) | |
| if len(self._score_history) > _SCORE_HISTORY_MAX: | |
| self._score_history = self._score_history[-_SCORE_HISTORY_MAX:] | |
| info["reward_breakdown"] = reward_result.breakdown | |
| info["final_score"] = reward_result.score | |
| observation = self._build_observation() | |
| return observation, reward_result.score, True, info | |
| def _build_observation(self) -> Observation: | |
| state = self._state | |
| if state is None: | |
| raise RuntimeError("No active episode.") | |
| # Build list of available files the agent can still request | |
| unrevealed = [ | |
| fname for fname in state.scenario.files.keys() | |
| if fname not in state.revealed_files | |
| and fname != "ground_truth.json" | |
| ] | |
| context = ReviewContext( | |
| task_id=state.task_id, | |
| task_description=state.scenario.description, | |
| difficulty=state.task_info.difficulty, | |
| files=list(state.revealed_files.values()), | |
| available_files=unrevealed, | |
| review_checklist=state.scenario.review_checklist, | |
| max_steps=state.max_steps, | |
| current_step=state.current_step, | |
| ) | |
| findings_serialized = [f.model_dump() for f in state.findings] | |
| return Observation( | |
| context=context, | |
| findings_so_far=findings_serialized, | |
| feedback=state.last_feedback, | |
| ) | |