""" Single source of truth for incident scenarios """ import json import yaml from pathlib import Path from typing import Dict, Any, List, Optional import logging from functools import lru_cache logger = logging.getLogger(__name__) class ScenarioRegistry: """Registry for incident scenarios with caching""" _instance = None _scenarios = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod @lru_cache(maxsize=1) def load_scenarios(cls) -> Dict[str, Dict[str, Any]]: """ Load scenarios from config files with caching Priority: 1. scenarios.json in config directory 2. scenarios.yaml in config directory 3. scenarios.yml in config directory 4. Hardcoded scenarios from demo module """ from config.settings import settings config_path = Path(settings.scenario_config_path) # Try to load from config directory config_files = [ config_path / "scenarios.json", config_path / "scenarios.yaml", config_path / "scenarios.yml", ] for file_path in config_files: if file_path.exists(): try: logger.info(f"Loading scenarios from {file_path}") with open(file_path, 'r', encoding='utf-8') as f: if file_path.suffix == '.json': scenarios = json.load(f) else: scenarios = yaml.safe_load(f) # Validate scenarios validated = cls._validate_scenarios(scenarios) logger.info(f"Loaded {len(validated)} scenarios from {file_path}") return validated except Exception as e: logger.error(f"Failed to load scenarios from {file_path}: {e}") # Fallback to hardcoded scenarios logger.info("Loading hardcoded scenarios from demo module") from demo.scenarios import INCIDENT_SCENARIOS return INCIDENT_SCENARIOS @classmethod def get_scenario(cls, name: str) -> Dict[str, Any]: """Get scenario by name""" if cls._scenarios is None: cls._scenarios = cls.load_scenarios() scenario = cls._scenarios.get(name) if not scenario: raise ValueError(f"Scenario '{name}' not found") return scenario.copy() # Return copy to prevent mutation @classmethod def get_all_scenario_names(cls) -> List[str]: """Get all scenario names""" if cls._scenarios is None: cls._scenarios = cls.load_scenarios() return list(cls._scenarios.keys()) @classmethod def get_scenario_metrics(cls, scenario_name: str) -> Dict[str, Any]: """Get metrics for a specific scenario""" scenario = cls.get_scenario(scenario_name) return scenario.get("metrics", {}) @classmethod def get_scenario_business_impact(cls, scenario_name: str) -> Dict[str, Any]: """Get business impact for a specific scenario""" scenario = cls.get_scenario(scenario_name) return scenario.get("business_impact", {}) @classmethod def get_scenario_roi_data(cls, scenario_name: str) -> Dict[str, Any]: """Get ROI data for a specific scenario""" scenario = cls.get_scenario(scenario_name) return scenario.get("roi_data", {}) @classmethod def _validate_scenario(cls, scenario: Dict[str, Any]) -> bool: """Validate single scenario""" required_fields = ["description", "severity", "component", "metrics", "business_impact"] # Check required fields for field in required_fields: if field not in scenario: logger.error(f"Missing required field: {field}") return False # Validate severity valid_severities = ["LOW", "MEDIUM", "HIGH", "CRITICAL"] if scenario["severity"] not in valid_severities: logger.error(f"Invalid severity: {scenario['severity']}") return False # Validate metrics (at least one metric required) if not isinstance(scenario["metrics"], dict) or not scenario["metrics"]: logger.error("Metrics must be a non-empty dictionary") return False # Validate business impact if not isinstance(scenario["business_impact"], dict): logger.error("Business impact must be a dictionary") return False return True @classmethod def _validate_scenarios(cls, scenarios: Dict[str, Dict]) -> Dict[str, Dict]: """Validate all scenarios and return valid ones""" validated = {} for name, scenario in scenarios.items(): if cls._validate_scenario(scenario): validated[name] = scenario else: logger.warning(f"Skipping invalid scenario: {name}") return validated @classmethod def reload_scenarios(cls) -> None: """Clear cache and reload scenarios""" cls.load_scenarios.cache_clear() cls._scenarios = None logger.info("Scenario cache cleared")