Agentic-Reliability-Framework-API / config /scenario_registry.py
petter2025's picture
Create scenario_registry.py
51ec0be verified
raw
history blame
5.49 kB
"""
Single source of truth for incident scenarios
"""
import json
import yaml
from pathlib import Path
from typing import Dict, Any, List, Optional
import logging
from functools import lru_cache
logger = logging.getLogger(__name__)
class ScenarioRegistry:
"""Registry for incident scenarios with caching"""
_instance = None
_scenarios = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
@lru_cache(maxsize=1)
def load_scenarios(cls) -> Dict[str, Dict[str, Any]]:
"""
Load scenarios from config files with caching
Priority:
1. scenarios.json in config directory
2. scenarios.yaml in config directory
3. scenarios.yml in config directory
4. Hardcoded scenarios from demo module
"""
from config.settings import settings
config_path = Path(settings.scenario_config_path)
# Try to load from config directory
config_files = [
config_path / "scenarios.json",
config_path / "scenarios.yaml",
config_path / "scenarios.yml",
]
for file_path in config_files:
if file_path.exists():
try:
logger.info(f"Loading scenarios from {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
if file_path.suffix == '.json':
scenarios = json.load(f)
else:
scenarios = yaml.safe_load(f)
# Validate scenarios
validated = cls._validate_scenarios(scenarios)
logger.info(f"Loaded {len(validated)} scenarios from {file_path}")
return validated
except Exception as e:
logger.error(f"Failed to load scenarios from {file_path}: {e}")
# Fallback to hardcoded scenarios
logger.info("Loading hardcoded scenarios from demo module")
from demo.scenarios import INCIDENT_SCENARIOS
return INCIDENT_SCENARIOS
@classmethod
def get_scenario(cls, name: str) -> Dict[str, Any]:
"""Get scenario by name"""
if cls._scenarios is None:
cls._scenarios = cls.load_scenarios()
scenario = cls._scenarios.get(name)
if not scenario:
raise ValueError(f"Scenario '{name}' not found")
return scenario.copy() # Return copy to prevent mutation
@classmethod
def get_all_scenario_names(cls) -> List[str]:
"""Get all scenario names"""
if cls._scenarios is None:
cls._scenarios = cls.load_scenarios()
return list(cls._scenarios.keys())
@classmethod
def get_scenario_metrics(cls, scenario_name: str) -> Dict[str, Any]:
"""Get metrics for a specific scenario"""
scenario = cls.get_scenario(scenario_name)
return scenario.get("metrics", {})
@classmethod
def get_scenario_business_impact(cls, scenario_name: str) -> Dict[str, Any]:
"""Get business impact for a specific scenario"""
scenario = cls.get_scenario(scenario_name)
return scenario.get("business_impact", {})
@classmethod
def get_scenario_roi_data(cls, scenario_name: str) -> Dict[str, Any]:
"""Get ROI data for a specific scenario"""
scenario = cls.get_scenario(scenario_name)
return scenario.get("roi_data", {})
@classmethod
def _validate_scenario(cls, scenario: Dict[str, Any]) -> bool:
"""Validate single scenario"""
required_fields = ["description", "severity", "component", "metrics", "business_impact"]
# Check required fields
for field in required_fields:
if field not in scenario:
logger.error(f"Missing required field: {field}")
return False
# Validate severity
valid_severities = ["LOW", "MEDIUM", "HIGH", "CRITICAL"]
if scenario["severity"] not in valid_severities:
logger.error(f"Invalid severity: {scenario['severity']}")
return False
# Validate metrics (at least one metric required)
if not isinstance(scenario["metrics"], dict) or not scenario["metrics"]:
logger.error("Metrics must be a non-empty dictionary")
return False
# Validate business impact
if not isinstance(scenario["business_impact"], dict):
logger.error("Business impact must be a dictionary")
return False
return True
@classmethod
def _validate_scenarios(cls, scenarios: Dict[str, Dict]) -> Dict[str, Dict]:
"""Validate all scenarios and return valid ones"""
validated = {}
for name, scenario in scenarios.items():
if cls._validate_scenario(scenario):
validated[name] = scenario
else:
logger.warning(f"Skipping invalid scenario: {name}")
return validated
@classmethod
def reload_scenarios(cls) -> None:
"""Clear cache and reload scenarios"""
cls.load_scenarios.cache_clear()
cls._scenarios = None
logger.info("Scenario cache cleared")