File size: 5,490 Bytes
51ec0be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Single source of truth for incident scenarios
"""
import json
import yaml
from pathlib import Path
from typing import Dict, Any, List, Optional
import logging
from functools import lru_cache

logger = logging.getLogger(__name__)


class ScenarioRegistry:
    """Registry for incident scenarios with caching"""
    
    _instance = None
    _scenarios = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @classmethod
    @lru_cache(maxsize=1)
    def load_scenarios(cls) -> Dict[str, Dict[str, Any]]:
        """
        Load scenarios from config files with caching
        
        Priority:
        1. scenarios.json in config directory
        2. scenarios.yaml in config directory
        3. scenarios.yml in config directory
        4. Hardcoded scenarios from demo module
        """
        from config.settings import settings
        
        config_path = Path(settings.scenario_config_path)
        
        # Try to load from config directory
        config_files = [
            config_path / "scenarios.json",
            config_path / "scenarios.yaml",
            config_path / "scenarios.yml",
        ]
        
        for file_path in config_files:
            if file_path.exists():
                try:
                    logger.info(f"Loading scenarios from {file_path}")
                    with open(file_path, 'r', encoding='utf-8') as f:
                        if file_path.suffix == '.json':
                            scenarios = json.load(f)
                        else:
                            scenarios = yaml.safe_load(f)
                    
                    # Validate scenarios
                    validated = cls._validate_scenarios(scenarios)
                    logger.info(f"Loaded {len(validated)} scenarios from {file_path}")
                    return validated
                except Exception as e:
                    logger.error(f"Failed to load scenarios from {file_path}: {e}")
        
        # Fallback to hardcoded scenarios
        logger.info("Loading hardcoded scenarios from demo module")
        from demo.scenarios import INCIDENT_SCENARIOS
        return INCIDENT_SCENARIOS
    
    @classmethod
    def get_scenario(cls, name: str) -> Dict[str, Any]:
        """Get scenario by name"""
        if cls._scenarios is None:
            cls._scenarios = cls.load_scenarios()
        
        scenario = cls._scenarios.get(name)
        if not scenario:
            raise ValueError(f"Scenario '{name}' not found")
        
        return scenario.copy()  # Return copy to prevent mutation
    
    @classmethod
    def get_all_scenario_names(cls) -> List[str]:
        """Get all scenario names"""
        if cls._scenarios is None:
            cls._scenarios = cls.load_scenarios()
        
        return list(cls._scenarios.keys())
    
    @classmethod
    def get_scenario_metrics(cls, scenario_name: str) -> Dict[str, Any]:
        """Get metrics for a specific scenario"""
        scenario = cls.get_scenario(scenario_name)
        return scenario.get("metrics", {})
    
    @classmethod
    def get_scenario_business_impact(cls, scenario_name: str) -> Dict[str, Any]:
        """Get business impact for a specific scenario"""
        scenario = cls.get_scenario(scenario_name)
        return scenario.get("business_impact", {})
    
    @classmethod
    def get_scenario_roi_data(cls, scenario_name: str) -> Dict[str, Any]:
        """Get ROI data for a specific scenario"""
        scenario = cls.get_scenario(scenario_name)
        return scenario.get("roi_data", {})
    
    @classmethod
    def _validate_scenario(cls, scenario: Dict[str, Any]) -> bool:
        """Validate single scenario"""
        required_fields = ["description", "severity", "component", "metrics", "business_impact"]
        
        # Check required fields
        for field in required_fields:
            if field not in scenario:
                logger.error(f"Missing required field: {field}")
                return False
        
        # Validate severity
        valid_severities = ["LOW", "MEDIUM", "HIGH", "CRITICAL"]
        if scenario["severity"] not in valid_severities:
            logger.error(f"Invalid severity: {scenario['severity']}")
            return False
        
        # Validate metrics (at least one metric required)
        if not isinstance(scenario["metrics"], dict) or not scenario["metrics"]:
            logger.error("Metrics must be a non-empty dictionary")
            return False
        
        # Validate business impact
        if not isinstance(scenario["business_impact"], dict):
            logger.error("Business impact must be a dictionary")
            return False
        
        return True
    
    @classmethod
    def _validate_scenarios(cls, scenarios: Dict[str, Dict]) -> Dict[str, Dict]:
        """Validate all scenarios and return valid ones"""
        validated = {}
        
        for name, scenario in scenarios.items():
            if cls._validate_scenario(scenario):
                validated[name] = scenario
            else:
                logger.warning(f"Skipping invalid scenario: {name}")
        
        return validated
    
    @classmethod
    def reload_scenarios(cls) -> None:
        """Clear cache and reload scenarios"""
        cls.load_scenarios.cache_clear()
        cls._scenarios = None
        logger.info("Scenario cache cleared")