| """ |
| Single source of truth for incident scenarios |
| """ |
| import json |
| import yaml |
| from pathlib import Path |
| from typing import Dict, Any, List, Optional |
| import logging |
| from functools import lru_cache |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ScenarioRegistry: |
| """Registry for incident scenarios with caching""" |
| |
| _instance = None |
| _scenarios = None |
| |
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| return cls._instance |
| |
| @classmethod |
| @lru_cache(maxsize=1) |
| def load_scenarios(cls) -> Dict[str, Dict[str, Any]]: |
| """ |
| Load scenarios from config files with caching |
| |
| Priority: |
| 1. scenarios.json in config directory |
| 2. scenarios.yaml in config directory |
| 3. scenarios.yml in config directory |
| 4. Hardcoded scenarios from demo module |
| """ |
| from config.settings import settings |
| |
| config_path = Path(settings.scenario_config_path) |
| |
| |
| config_files = [ |
| config_path / "scenarios.json", |
| config_path / "scenarios.yaml", |
| config_path / "scenarios.yml", |
| ] |
| |
| for file_path in config_files: |
| if file_path.exists(): |
| try: |
| logger.info(f"Loading scenarios from {file_path}") |
| with open(file_path, 'r', encoding='utf-8') as f: |
| if file_path.suffix == '.json': |
| scenarios = json.load(f) |
| else: |
| scenarios = yaml.safe_load(f) |
| |
| |
| validated = cls._validate_scenarios(scenarios) |
| logger.info(f"Loaded {len(validated)} scenarios from {file_path}") |
| return validated |
| except Exception as e: |
| logger.error(f"Failed to load scenarios from {file_path}: {e}") |
| |
| |
| logger.info("Loading hardcoded scenarios from demo module") |
| from demo.scenarios import INCIDENT_SCENARIOS |
| return INCIDENT_SCENARIOS |
| |
| @classmethod |
| def get_scenario(cls, name: str) -> Dict[str, Any]: |
| """Get scenario by name""" |
| if cls._scenarios is None: |
| cls._scenarios = cls.load_scenarios() |
| |
| scenario = cls._scenarios.get(name) |
| if not scenario: |
| raise ValueError(f"Scenario '{name}' not found") |
| |
| return scenario.copy() |
| |
| @classmethod |
| def get_all_scenario_names(cls) -> List[str]: |
| """Get all scenario names""" |
| if cls._scenarios is None: |
| cls._scenarios = cls.load_scenarios() |
| |
| return list(cls._scenarios.keys()) |
| |
| @classmethod |
| def get_scenario_metrics(cls, scenario_name: str) -> Dict[str, Any]: |
| """Get metrics for a specific scenario""" |
| scenario = cls.get_scenario(scenario_name) |
| return scenario.get("metrics", {}) |
| |
| @classmethod |
| def get_scenario_business_impact(cls, scenario_name: str) -> Dict[str, Any]: |
| """Get business impact for a specific scenario""" |
| scenario = cls.get_scenario(scenario_name) |
| return scenario.get("business_impact", {}) |
| |
| @classmethod |
| def get_scenario_roi_data(cls, scenario_name: str) -> Dict[str, Any]: |
| """Get ROI data for a specific scenario""" |
| scenario = cls.get_scenario(scenario_name) |
| return scenario.get("roi_data", {}) |
| |
| @classmethod |
| def _validate_scenario(cls, scenario: Dict[str, Any]) -> bool: |
| """Validate single scenario""" |
| required_fields = ["description", "severity", "component", "metrics", "business_impact"] |
| |
| |
| for field in required_fields: |
| if field not in scenario: |
| logger.error(f"Missing required field: {field}") |
| return False |
| |
| |
| valid_severities = ["LOW", "MEDIUM", "HIGH", "CRITICAL"] |
| if scenario["severity"] not in valid_severities: |
| logger.error(f"Invalid severity: {scenario['severity']}") |
| return False |
| |
| |
| if not isinstance(scenario["metrics"], dict) or not scenario["metrics"]: |
| logger.error("Metrics must be a non-empty dictionary") |
| return False |
| |
| |
| if not isinstance(scenario["business_impact"], dict): |
| logger.error("Business impact must be a dictionary") |
| return False |
| |
| return True |
| |
| @classmethod |
| def _validate_scenarios(cls, scenarios: Dict[str, Dict]) -> Dict[str, Dict]: |
| """Validate all scenarios and return valid ones""" |
| validated = {} |
| |
| for name, scenario in scenarios.items(): |
| if cls._validate_scenario(scenario): |
| validated[name] = scenario |
| else: |
| logger.warning(f"Skipping invalid scenario: {name}") |
| |
| return validated |
| |
| @classmethod |
| def reload_scenarios(cls) -> None: |
| """Clear cache and reload scenarios""" |
| cls.load_scenarios.cache_clear() |
| cls._scenarios = None |
| logger.info("Scenario cache cleared") |