petter2025 commited on
Commit
51ec0be
·
verified ·
1 Parent(s): 9846bab

Create scenario_registry.py

Browse files
Files changed (1) hide show
  1. config/scenario_registry.py +155 -0
config/scenario_registry.py ADDED
@@ -0,0 +1,155 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Single source of truth for incident scenarios
3
+ """
4
+ import json
5
+ import yaml
6
+ from pathlib import Path
7
+ from typing import Dict, Any, List, Optional
8
+ import logging
9
+ from functools import lru_cache
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ class ScenarioRegistry:
15
+ """Registry for incident scenarios with caching"""
16
+
17
+ _instance = None
18
+ _scenarios = None
19
+
20
+ def __new__(cls):
21
+ if cls._instance is None:
22
+ cls._instance = super().__new__(cls)
23
+ return cls._instance
24
+
25
+ @classmethod
26
+ @lru_cache(maxsize=1)
27
+ def load_scenarios(cls) -> Dict[str, Dict[str, Any]]:
28
+ """
29
+ Load scenarios from config files with caching
30
+
31
+ Priority:
32
+ 1. scenarios.json in config directory
33
+ 2. scenarios.yaml in config directory
34
+ 3. scenarios.yml in config directory
35
+ 4. Hardcoded scenarios from demo module
36
+ """
37
+ from config.settings import settings
38
+
39
+ config_path = Path(settings.scenario_config_path)
40
+
41
+ # Try to load from config directory
42
+ config_files = [
43
+ config_path / "scenarios.json",
44
+ config_path / "scenarios.yaml",
45
+ config_path / "scenarios.yml",
46
+ ]
47
+
48
+ for file_path in config_files:
49
+ if file_path.exists():
50
+ try:
51
+ logger.info(f"Loading scenarios from {file_path}")
52
+ with open(file_path, 'r', encoding='utf-8') as f:
53
+ if file_path.suffix == '.json':
54
+ scenarios = json.load(f)
55
+ else:
56
+ scenarios = yaml.safe_load(f)
57
+
58
+ # Validate scenarios
59
+ validated = cls._validate_scenarios(scenarios)
60
+ logger.info(f"Loaded {len(validated)} scenarios from {file_path}")
61
+ return validated
62
+ except Exception as e:
63
+ logger.error(f"Failed to load scenarios from {file_path}: {e}")
64
+
65
+ # Fallback to hardcoded scenarios
66
+ logger.info("Loading hardcoded scenarios from demo module")
67
+ from demo.scenarios import INCIDENT_SCENARIOS
68
+ return INCIDENT_SCENARIOS
69
+
70
+ @classmethod
71
+ def get_scenario(cls, name: str) -> Dict[str, Any]:
72
+ """Get scenario by name"""
73
+ if cls._scenarios is None:
74
+ cls._scenarios = cls.load_scenarios()
75
+
76
+ scenario = cls._scenarios.get(name)
77
+ if not scenario:
78
+ raise ValueError(f"Scenario '{name}' not found")
79
+
80
+ return scenario.copy() # Return copy to prevent mutation
81
+
82
+ @classmethod
83
+ def get_all_scenario_names(cls) -> List[str]:
84
+ """Get all scenario names"""
85
+ if cls._scenarios is None:
86
+ cls._scenarios = cls.load_scenarios()
87
+
88
+ return list(cls._scenarios.keys())
89
+
90
+ @classmethod
91
+ def get_scenario_metrics(cls, scenario_name: str) -> Dict[str, Any]:
92
+ """Get metrics for a specific scenario"""
93
+ scenario = cls.get_scenario(scenario_name)
94
+ return scenario.get("metrics", {})
95
+
96
+ @classmethod
97
+ def get_scenario_business_impact(cls, scenario_name: str) -> Dict[str, Any]:
98
+ """Get business impact for a specific scenario"""
99
+ scenario = cls.get_scenario(scenario_name)
100
+ return scenario.get("business_impact", {})
101
+
102
+ @classmethod
103
+ def get_scenario_roi_data(cls, scenario_name: str) -> Dict[str, Any]:
104
+ """Get ROI data for a specific scenario"""
105
+ scenario = cls.get_scenario(scenario_name)
106
+ return scenario.get("roi_data", {})
107
+
108
+ @classmethod
109
+ def _validate_scenario(cls, scenario: Dict[str, Any]) -> bool:
110
+ """Validate single scenario"""
111
+ required_fields = ["description", "severity", "component", "metrics", "business_impact"]
112
+
113
+ # Check required fields
114
+ for field in required_fields:
115
+ if field not in scenario:
116
+ logger.error(f"Missing required field: {field}")
117
+ return False
118
+
119
+ # Validate severity
120
+ valid_severities = ["LOW", "MEDIUM", "HIGH", "CRITICAL"]
121
+ if scenario["severity"] not in valid_severities:
122
+ logger.error(f"Invalid severity: {scenario['severity']}")
123
+ return False
124
+
125
+ # Validate metrics (at least one metric required)
126
+ if not isinstance(scenario["metrics"], dict) or not scenario["metrics"]:
127
+ logger.error("Metrics must be a non-empty dictionary")
128
+ return False
129
+
130
+ # Validate business impact
131
+ if not isinstance(scenario["business_impact"], dict):
132
+ logger.error("Business impact must be a dictionary")
133
+ return False
134
+
135
+ return True
136
+
137
+ @classmethod
138
+ def _validate_scenarios(cls, scenarios: Dict[str, Dict]) -> Dict[str, Dict]:
139
+ """Validate all scenarios and return valid ones"""
140
+ validated = {}
141
+
142
+ for name, scenario in scenarios.items():
143
+ if cls._validate_scenario(scenario):
144
+ validated[name] = scenario
145
+ else:
146
+ logger.warning(f"Skipping invalid scenario: {name}")
147
+
148
+ return validated
149
+
150
+ @classmethod
151
+ def reload_scenarios(cls) -> None:
152
+ """Clear cache and reload scenarios"""
153
+ cls.load_scenarios.cache_clear()
154
+ cls._scenarios = None
155
+ logger.info("Scenario cache cleared")