import json import random from pathlib import Path from typing import Dict, List, Optional from app.models import ( ScenarioConfig, FileContent, GroundTruthFinding, TaskInfo, Difficulty, Severity ) SCENARIOS_DIR = Path(__file__).parent / "scenarios" EXTENSION_TO_LANGUAGE = { ".tf": "hcl", ".hcl": "hcl", ".yaml": "yaml", ".yml": "yaml", ".sql": "sql", ".py": "python", ".txt": "txt", ".json": "json", ".md": "markdown", } TASK_DEFINITIONS: Dict[str, TaskInfo] = { "dependency_review": TaskInfo( id="dependency_review", name="Dependency & Supply Chain Review", description="Review dependency files for hallucinated packages, typosquatting, and known vulnerabilities", difficulty=Difficulty.EASY, max_steps=15, ), "iac_review": TaskInfo( id="iac_review", name="Infrastructure-as-Code Security Review", description="Review Terraform/Kubernetes configurations for security misconfigurations", difficulty=Difficulty.MEDIUM, max_steps=25, ), "migration_review": TaskInfo( id="migration_review", name="Database Migration Safety Review", description="Review SQL migration scripts for backward-incompatibility, safety risks, and production impact", difficulty=Difficulty.HARD, max_steps=35, ), } TASK_SCENARIO_DIRS = { "dependency_review": "dependency", "iac_review": "iac", "migration_review": "migration", } def _detect_language(filename: str) -> str: suffix = Path(filename).suffix.lower() return EXTENSION_TO_LANGUAGE.get(suffix, "txt") def _load_file_content(filepath: Path) -> FileContent: content = filepath.read_text(encoding="utf-8") return FileContent( filename=filepath.name, content=content, language=_detect_language(filepath.name), ) class TaskRegistry: def __init__(self): self._scenarios: Dict[str, Dict[str, ScenarioConfig]] = {} self._load_all_scenarios() def _load_all_scenarios(self): for task_id, subdir in TASK_SCENARIO_DIRS.items(): self._scenarios[task_id] = {} task_dir = SCENARIOS_DIR / subdir if not task_dir.exists(): continue for scenario_dir in sorted(task_dir.iterdir()): if not scenario_dir.is_dir(): continue scenario = self._load_scenario(task_id, scenario_dir) if scenario: self._scenarios[task_id][scenario.scenario_id] = scenario def _load_scenario(self, task_id: str, scenario_dir: Path) -> Optional[ScenarioConfig]: gt_path = scenario_dir / "ground_truth.json" if not gt_path.exists(): return None with open(gt_path, "r", encoding="utf-8") as f: gt_data = json.load(f) # Load all non-ground-truth files in the directory all_files: Dict[str, FileContent] = {} for filepath in sorted(scenario_dir.iterdir()): if filepath.is_file() and filepath.name != "ground_truth.json": all_files[filepath.name] = _load_file_content(filepath) # Parse ground truth findings ground_truth = [] for gt in gt_data.get("ground_truth", []): ground_truth.append(GroundTruthFinding( file=gt["file"], line=gt.get("line"), rule_id=gt["rule_id"], severity=Severity(gt["severity"]), description=gt["description"], match_key=gt["match_key"], category=gt.get("category"), )) return ScenarioConfig( scenario_id=gt_data.get("scenario_id", scenario_dir.name), task_id=task_id, description=gt_data.get("description", ""), files=all_files, initial_files=gt_data.get("initial_files", list(all_files.keys())), available_files=gt_data.get("available_files", []), ground_truth=ground_truth, review_checklist=gt_data.get("review_checklist", []), ) def get_tasks(self) -> List[TaskInfo]: return list(TASK_DEFINITIONS.values()) def get_task_info(self, task_id: str) -> TaskInfo: if task_id not in TASK_DEFINITIONS: raise ValueError( f"Unknown task_id '{task_id}'. Valid: {list(TASK_DEFINITIONS.keys())}" ) return TASK_DEFINITIONS[task_id] def get_scenario(self, task_id: str, scenario_id: str) -> ScenarioConfig: if task_id not in self._scenarios: raise ValueError(f"Unknown task_id '{task_id}'") scenarios = self._scenarios[task_id] if scenario_id not in scenarios: raise ValueError( f"Unknown scenario_id '{scenario_id}' for task '{task_id}'. " f"Valid: {list(scenarios.keys())}" ) return scenarios[scenario_id] def get_random_scenario(self, task_id: str) -> ScenarioConfig: if task_id not in self._scenarios: raise ValueError(f"Unknown task_id '{task_id}'") scenarios = list(self._scenarios[task_id].values()) if not scenarios: raise ValueError(f"No scenarios found for task '{task_id}'") return random.choice(scenarios) def get_scenario_ids(self, task_id: str) -> List[str]: if task_id not in self._scenarios: raise ValueError(f"Unknown task_id '{task_id}'") return list(self._scenarios[task_id].keys())