Spaces:
Sleeping
Sleeping
| """ | |
| Pattern recognition and analysis for feedback system. | |
| Implements automated improvement suggestion generation and feedback aggregation. | |
| """ | |
| import json | |
| from collections import Counter, defaultdict | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Any, Tuple | |
| from pathlib import Path | |
| from .data_models import ( | |
| ErrorPattern, ClassificationError, QuestionIssue, ReferralProblem, | |
| ErrorType, ErrorSubcategory, ScenarioType | |
| ) | |
| class PatternRecognizer: | |
| """ | |
| Advanced pattern recognition for identifying common error types and generating | |
| automated improvement suggestions based on feedback data analysis. | |
| Provides functionality to: | |
| - Identify recurring error patterns across different dimensions | |
| - Generate data-driven improvement suggestions | |
| - Analyze temporal trends in feedback data | |
| - Provide aggregated reporting for system optimization | |
| """ | |
| def __init__(self, min_pattern_frequency: int = 3, confidence_threshold: float = 0.7): | |
| """ | |
| Initialize the pattern recognizer. | |
| Args: | |
| min_pattern_frequency: Minimum frequency for a pattern to be considered significant | |
| confidence_threshold: Minimum confidence level for pattern suggestions | |
| """ | |
| self.min_pattern_frequency = min_pattern_frequency | |
| self.confidence_threshold = confidence_threshold | |
| # Pattern analysis strategies (for future expansion) | |
| self.analysis_strategies = { | |
| 'error_type_clustering': 'analyze_error_type_patterns', | |
| 'subcategory_analysis': 'analyze_subcategory_patterns', | |
| 'temporal_trends': 'analyze_temporal_patterns', | |
| 'confidence_correlation': 'analyze_confidence_patterns', | |
| 'message_content_analysis': 'analyze_message_content_patterns', | |
| 'cross_category_analysis': 'analyze_cross_category_patterns' | |
| } | |
| # Improvement suggestion templates | |
| self.suggestion_templates = { | |
| 'wrong_classification': { | |
| 'high_frequency': "Review classification criteria for {category_pair} transitions - {frequency} occurrences detected", | |
| 'confidence_pattern': "Low confidence in {category} classifications suggests need for clearer decision boundaries", | |
| 'content_pattern': "Common phrases in misclassified messages: {phrases} - consider training data expansion" | |
| }, | |
| 'severity_misjudgment': { | |
| 'underestimation': "Severity assessment appears to underestimate distress in {context} scenarios", | |
| 'overestimation': "Sensitivity may be too high for {context} expressions - consider calibration", | |
| 'temporal': "Severity misjudgments increased {trend} over time - review recent changes" | |
| }, | |
| 'missed_indicators': { | |
| 'category_specific': "Frequently missed {indicator_category} indicators - enhance detection algorithms", | |
| 'subtle_cues': "Missing subtle distress cues in {scenario_type} scenarios", | |
| 'context_dependent': "Indicators missed when {context_condition} - improve context awareness" | |
| }, | |
| 'question_targeting': { | |
| 'scenario_mismatch': "Questions not well-targeted for {scenario_type} scenarios - {frequency} issues", | |
| 'sensitivity': "Question sensitivity issues in {context} - review language patterns", | |
| 'effectiveness': "Low effectiveness scores for {question_type} questions - consider alternatives" | |
| } | |
| } | |
| def analyze_comprehensive_patterns(self, | |
| errors: List[Dict[str, Any]], | |
| questions: List[Dict[str, Any]], | |
| referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: | |
| """ | |
| Perform comprehensive pattern analysis across all feedback types. | |
| Args: | |
| errors: List of classification error records | |
| questions: List of question issue records | |
| referrals: List of referral problem records | |
| Returns: | |
| List[ErrorPattern]: Identified patterns with improvement suggestions | |
| """ | |
| all_patterns = [] | |
| # Analyze classification error patterns | |
| if errors: | |
| error_patterns = self._analyze_classification_error_patterns(errors) | |
| all_patterns.extend(error_patterns) | |
| # Analyze question issue patterns | |
| if questions: | |
| question_patterns = self._analyze_question_issue_patterns(questions) | |
| all_patterns.extend(question_patterns) | |
| # Analyze referral problem patterns | |
| if referrals: | |
| referral_patterns = self._analyze_referral_problem_patterns(referrals) | |
| all_patterns.extend(referral_patterns) | |
| # Cross-analysis patterns (relationships between different feedback types) | |
| if errors and questions: | |
| cross_patterns = self._analyze_cross_feedback_patterns(errors, questions, referrals) | |
| all_patterns.extend(cross_patterns) | |
| # Sort patterns by significance (frequency * confidence) | |
| all_patterns.sort(key=lambda p: p.frequency * p.confidence_score, reverse=True) | |
| return all_patterns | |
| def _analyze_classification_error_patterns(self, errors: List[Dict[str, Any]]) -> List[ErrorPattern]: | |
| """Analyze patterns in classification errors.""" | |
| patterns = [] | |
| # Error type frequency analysis | |
| error_type_counts = Counter(error['error_type'] for error in errors) | |
| for error_type, frequency in error_type_counts.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| related_errors = [e for e in errors if e['error_type'] == error_type] | |
| pattern = ErrorPattern( | |
| pattern_id=f"error_type_{error_type}_{frequency}", | |
| pattern_type=f"error_type_{error_type}", | |
| description=f"Frequent {error_type.replace('_', ' ')} errors ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=self._extract_scenarios_from_errors(related_errors), | |
| suggested_improvements=self._generate_error_type_suggestions(error_type, related_errors), | |
| confidence_score=min(frequency / 10.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| # Subcategory analysis | |
| subcategory_counts = Counter(error['subcategory'] for error in errors) | |
| for subcategory, frequency in subcategory_counts.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| related_errors = [e for e in errors if e['subcategory'] == subcategory] | |
| pattern = ErrorPattern( | |
| pattern_id=f"subcategory_{subcategory}_{frequency}", | |
| pattern_type=f"subcategory_{subcategory}", | |
| description=f"Frequent {subcategory.replace('_', ' ')} errors ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=self._extract_scenarios_from_errors(related_errors), | |
| suggested_improvements=self._generate_subcategory_suggestions(subcategory, related_errors), | |
| confidence_score=min(frequency / 8.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| # Category transition analysis | |
| transitions = Counter(f"{error['actual_category']}_to_{error['expected_category']}" for error in errors) | |
| for transition, frequency in transitions.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| actual, expected = transition.split('_to_') | |
| related_errors = [e for e in errors if e['actual_category'] == actual and e['expected_category'] == expected] | |
| pattern = ErrorPattern( | |
| pattern_id=f"transition_{transition}_{frequency}", | |
| pattern_type=f"category_transition_{transition}", | |
| description=f"Frequent {actual} → {expected} misclassifications ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=self._extract_scenarios_from_errors(related_errors), | |
| suggested_improvements=self._generate_transition_suggestions(actual, expected, related_errors), | |
| confidence_score=min(frequency / 6.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| # Confidence level analysis | |
| low_confidence_errors = [e for e in errors if e['confidence_level'] < self.confidence_threshold] | |
| if len(low_confidence_errors) >= self.min_pattern_frequency: | |
| pattern = ErrorPattern( | |
| pattern_id=f"low_confidence_{len(low_confidence_errors)}", | |
| pattern_type="low_confidence_pattern", | |
| description=f"High number of low-confidence error reports ({len(low_confidence_errors)} occurrences)", | |
| frequency=len(low_confidence_errors), | |
| affected_scenarios=self._extract_scenarios_from_errors(low_confidence_errors), | |
| suggested_improvements=self._generate_confidence_suggestions(low_confidence_errors), | |
| confidence_score=0.8 | |
| ) | |
| patterns.append(pattern) | |
| return patterns | |
| def _analyze_question_issue_patterns(self, questions: List[Dict[str, Any]]) -> List[ErrorPattern]: | |
| """Analyze patterns in question issues.""" | |
| patterns = [] | |
| # Issue type frequency analysis | |
| issue_type_counts = Counter(question['issue_type'] for question in questions) | |
| for issue_type, frequency in issue_type_counts.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| related_questions = [q for q in questions if q['issue_type'] == issue_type] | |
| pattern = ErrorPattern( | |
| pattern_id=f"question_issue_{issue_type}_{frequency}", | |
| pattern_type=f"question_issue_{issue_type}", | |
| description=f"Frequent {issue_type.replace('_', ' ')} issues ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=[ScenarioType(q['scenario_type']) for q in related_questions], | |
| suggested_improvements=self._generate_question_issue_suggestions(issue_type, related_questions), | |
| confidence_score=min(frequency / 5.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| # Scenario-specific question issues | |
| scenario_issue_combinations = Counter( | |
| f"{question['scenario_type']}_{question['issue_type']}" for question in questions | |
| ) | |
| for combination, frequency in scenario_issue_combinations.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| scenario_str, issue = combination.split('_', 1) | |
| related_questions = [q for q in questions if q['scenario_type'] == scenario_str and q['issue_type'] == issue] | |
| # Try to create ScenarioType, skip if invalid | |
| try: | |
| scenario_enum = ScenarioType(scenario_str) | |
| affected_scenarios = [scenario_enum] | |
| except ValueError: | |
| affected_scenarios = [] | |
| pattern = ErrorPattern( | |
| pattern_id=f"scenario_issue_{combination}_{frequency}", | |
| pattern_type=f"scenario_specific_{combination}", | |
| description=f"Frequent {issue.replace('_', ' ')} issues in {scenario_str.replace('_', ' ')} scenarios ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=affected_scenarios, | |
| suggested_improvements=self._generate_scenario_specific_suggestions(scenario_str, issue, related_questions), | |
| confidence_score=min(frequency / 4.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| return patterns | |
| def _analyze_referral_problem_patterns(self, referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: | |
| """Analyze patterns in referral problems.""" | |
| patterns = [] | |
| # Problem type frequency analysis | |
| problem_type_counts = Counter(referral['problem_type'] for referral in referrals) | |
| for problem_type, frequency in problem_type_counts.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| related_referrals = [r for r in referrals if r['problem_type'] == problem_type] | |
| pattern = ErrorPattern( | |
| pattern_id=f"referral_problem_{problem_type}_{frequency}", | |
| pattern_type=f"referral_problem_{problem_type}", | |
| description=f"Frequent {problem_type.replace('_', ' ')} problems ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=[], # Referrals don't have scenarios | |
| suggested_improvements=self._generate_referral_problem_suggestions(problem_type, related_referrals), | |
| confidence_score=min(frequency / 4.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| # Missing fields analysis | |
| all_missing_fields = [] | |
| for referral in referrals: | |
| all_missing_fields.extend(referral.get('missing_fields', [])) | |
| missing_field_counts = Counter(all_missing_fields) | |
| for field, frequency in missing_field_counts.items(): | |
| if frequency >= self.min_pattern_frequency: | |
| pattern = ErrorPattern( | |
| pattern_id=f"missing_field_{field}_{frequency}", | |
| pattern_type=f"missing_field_{field}", | |
| description=f"Frequently missing field: {field} ({frequency} occurrences)", | |
| frequency=frequency, | |
| affected_scenarios=[], | |
| suggested_improvements=[f"Improve {field} capture in referral generation", | |
| f"Add validation for {field} field", | |
| f"Enhance {field} extraction from conversation context"], | |
| confidence_score=min(frequency / 3.0, 1.0) | |
| ) | |
| patterns.append(pattern) | |
| return patterns | |
| def _analyze_cross_feedback_patterns(self, | |
| errors: List[Dict[str, Any]], | |
| questions: List[Dict[str, Any]], | |
| referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: | |
| """Analyze patterns across different feedback types.""" | |
| patterns = [] | |
| # Correlation between classification errors and question issues | |
| error_sessions = {error.get('session_id') for error in errors if error.get('session_id')} | |
| question_sessions = {question.get('session_id') for question in questions if question.get('session_id')} | |
| common_sessions = error_sessions.intersection(question_sessions) | |
| if len(common_sessions) >= self.min_pattern_frequency: | |
| pattern = ErrorPattern( | |
| pattern_id=f"error_question_correlation_{len(common_sessions)}", | |
| pattern_type="error_question_correlation", | |
| description=f"Sessions with both classification errors and question issues ({len(common_sessions)} sessions)", | |
| frequency=len(common_sessions), | |
| affected_scenarios=[], | |
| suggested_improvements=[ | |
| "Review sessions with multiple issue types for systemic problems", | |
| "Investigate correlation between classification accuracy and question quality", | |
| "Consider integrated training for both classification and question generation" | |
| ], | |
| confidence_score=0.7 | |
| ) | |
| patterns.append(pattern) | |
| return patterns | |
| def _extract_scenarios_from_errors(self, errors: List[Dict[str, Any]]) -> List[ScenarioType]: | |
| """Extract scenario types from error additional context.""" | |
| scenarios = set() | |
| for error in errors: | |
| context = error.get('additional_context', {}) | |
| if 'scenario_type' in context: | |
| try: | |
| scenarios.add(ScenarioType(context['scenario_type'])) | |
| except ValueError: | |
| pass | |
| return list(scenarios) | |
| def _generate_error_type_suggestions(self, error_type: str, related_errors: List[Dict]) -> List[str]: | |
| """Generate improvement suggestions for specific error types.""" | |
| suggestions = [] | |
| if error_type == "wrong_classification": | |
| # Analyze common misclassification patterns | |
| transitions = Counter(f"{e['actual_category']}_to_{e['expected_category']}" for e in related_errors) | |
| most_common = transitions.most_common(1) | |
| if most_common: | |
| transition = most_common[0][0] | |
| suggestions.append(f"Review classification criteria for {transition.replace('_to_', ' → ')} transitions") | |
| suggestions.extend([ | |
| "Add more training examples for edge cases", | |
| "Refine decision boundaries between categories", | |
| "Implement additional validation checks for ambiguous cases" | |
| ]) | |
| elif error_type == "severity_misjudgment": | |
| # Analyze severity patterns | |
| underestimated = sum(1 for e in related_errors if e.get('subcategory') == 'underestimated_distress') | |
| overestimated = sum(1 for e in related_errors if e.get('subcategory') == 'overestimated_distress') | |
| if underestimated > overestimated: | |
| suggestions.append("Increase sensitivity to subtle distress indicators") | |
| elif overestimated > underestimated: | |
| suggestions.append("Reduce false positive triggers for normal expressions") | |
| suggestions.extend([ | |
| "Calibrate severity assessment algorithms", | |
| "Add contextual weighting for distress indicators", | |
| "Improve training data balance for severity levels" | |
| ]) | |
| elif error_type == "missed_indicators": | |
| suggestions.extend([ | |
| "Expand indicator recognition patterns", | |
| "Improve natural language processing for subtle cues", | |
| "Add more comprehensive indicator training data", | |
| "Enhance context-aware indicator detection" | |
| ]) | |
| elif error_type == "context_misunderstanding": | |
| suggestions.extend([ | |
| "Enhance conversation history integration", | |
| "Improve defensive response detection algorithms", | |
| "Add contextual reasoning capabilities", | |
| "Strengthen temporal context awareness" | |
| ]) | |
| return suggestions | |
| def _generate_subcategory_suggestions(self, subcategory: str, related_errors: List[Dict]) -> List[str]: | |
| """Generate improvement suggestions for specific error subcategories.""" | |
| suggestions = [] | |
| # Analyze common words in error messages | |
| common_words = self._extract_common_words([e['message_content'] for e in related_errors]) | |
| if subcategory in ["green_to_yellow", "green_to_red"]: | |
| suggestions.extend([ | |
| f"Reduce sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'common expressions'}", | |
| "Add negative examples to training data", | |
| "Strengthen criteria for non-distress expressions" | |
| ]) | |
| elif subcategory in ["yellow_to_green", "red_to_green"]: | |
| suggestions.extend([ | |
| f"Increase sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'distress indicators'}", | |
| "Strengthen distress indicator detection", | |
| "Add more positive examples of distress expressions" | |
| ]) | |
| elif subcategory in ["underestimated_distress", "overestimated_distress"]: | |
| suggestions.extend([ | |
| f"Calibrate severity assessment for {subcategory.replace('_', ' ')} patterns", | |
| "Review severity thresholds and criteria", | |
| "Add contextual weighting for severity indicators" | |
| ]) | |
| # Default suggestions if none matched | |
| if not suggestions: | |
| suggestions.extend([ | |
| f"Review {subcategory.replace('_', ' ')} error patterns", | |
| f"Improve detection accuracy for {subcategory.replace('_', ' ')} cases", | |
| "Add more training data for this error type" | |
| ]) | |
| return suggestions | |
| def _generate_transition_suggestions(self, actual: str, expected: str, related_errors: List[Dict]) -> List[str]: | |
| """Generate suggestions for specific category transitions.""" | |
| suggestions = [] | |
| transition_name = f"{actual} → {expected}" | |
| suggestions.append(f"Review decision criteria for {transition_name} boundary") | |
| # Analyze confidence levels for this transition | |
| avg_confidence = sum(e['confidence_level'] for e in related_errors) / len(related_errors) | |
| if avg_confidence < 0.7: | |
| suggestions.append(f"Low reviewer confidence ({avg_confidence:.2f}) suggests unclear criteria for {transition_name}") | |
| # Common phrases analysis | |
| common_words = self._extract_common_words([e['message_content'] for e in related_errors]) | |
| if common_words: | |
| suggestions.append(f"Common phrases in {transition_name} errors: {', '.join(common_words[:3])}") | |
| return suggestions | |
| def _generate_confidence_suggestions(self, low_confidence_errors: List[Dict]) -> List[str]: | |
| """Generate suggestions for low confidence patterns.""" | |
| return [ | |
| "Review feedback guidelines to improve reviewer confidence", | |
| "Provide additional training for edge case identification", | |
| "Consider adding confidence calibration exercises", | |
| "Implement inter-reviewer agreement checks" | |
| ] | |
| def _generate_question_issue_suggestions(self, issue_type: str, related_questions: List[Dict]) -> List[str]: | |
| """Generate suggestions for question issues.""" | |
| suggestions = [] | |
| if issue_type == "inappropriate_question": | |
| suggestions.extend([ | |
| "Review question appropriateness guidelines", | |
| "Add sensitivity training for question generation", | |
| "Implement question validation checks" | |
| ]) | |
| elif issue_type == "wrong_scenario_targeting": | |
| scenarios = Counter(q['scenario_type'] for q in related_questions) | |
| most_common_scenario = scenarios.most_common(1)[0][0] if scenarios else "unknown" | |
| suggestions.extend([ | |
| f"Improve question targeting for {most_common_scenario.replace('_', ' ')} scenarios", | |
| "Enhance scenario detection accuracy", | |
| "Add scenario-specific question validation" | |
| ]) | |
| return suggestions | |
| def _generate_scenario_specific_suggestions(self, scenario: str, issue: str, related_questions: List[Dict]) -> List[str]: | |
| """Generate suggestions for scenario-specific issues.""" | |
| return [ | |
| f"Review {issue.replace('_', ' ')} patterns in {scenario.replace('_', ' ')} scenarios", | |
| f"Enhance question templates for {scenario.replace('_', ' ')} situations", | |
| f"Add specialized training for {scenario.replace('_', ' ')} question generation" | |
| ] | |
| def _generate_referral_problem_suggestions(self, problem_type: str, related_referrals: List[Dict]) -> List[str]: | |
| """Generate suggestions for referral problems.""" | |
| suggestions = [] | |
| if problem_type == "incomplete_summary": | |
| suggestions.extend([ | |
| "Enhance summary generation completeness checks", | |
| "Add required field validation for summaries", | |
| "Improve context extraction for referral summaries" | |
| ]) | |
| elif problem_type == "missing_contact_info": | |
| suggestions.extend([ | |
| "Implement contact information validation", | |
| "Add contact info extraction from conversation", | |
| "Enhance referral template completeness" | |
| ]) | |
| return suggestions | |
| def _extract_common_words(self, messages: List[str]) -> List[str]: | |
| """Extract common words from error messages.""" | |
| if not messages: | |
| return [] | |
| # Simple word frequency analysis | |
| word_counts = Counter() | |
| for message in messages: | |
| words = message.lower().split() | |
| # Filter out common stop words and short words | |
| filtered_words = [ | |
| w for w in words | |
| if len(w) > 3 and w not in ['the', 'and', 'that', 'this', 'with', 'have', 'will', 'been', 'they', 'their', 'from', 'were', 'said', 'each', 'which', 'what', 'about'] | |
| ] | |
| word_counts.update(filtered_words) | |
| return [word for word, count in word_counts.most_common(5)] | |
| def generate_optimization_report(self, patterns: List[ErrorPattern]) -> Dict[str, Any]: | |
| """ | |
| Generate a comprehensive optimization report based on identified patterns. | |
| Args: | |
| patterns: List of identified error patterns | |
| Returns: | |
| Dict[str, Any]: Comprehensive optimization report | |
| """ | |
| if not patterns: | |
| return { | |
| "summary": "No significant patterns identified", | |
| "total_patterns": 0, | |
| "recommendations": ["Continue monitoring for patterns"], | |
| "priority_actions": [], | |
| "confidence_score": 0.0 | |
| } | |
| # Sort patterns by priority (frequency * confidence) | |
| sorted_patterns = sorted(patterns, key=lambda p: p.frequency * p.confidence_score, reverse=True) | |
| # Extract top recommendations | |
| all_suggestions = [] | |
| for pattern in sorted_patterns[:10]: # Top 10 patterns | |
| all_suggestions.extend(pattern.suggested_improvements) | |
| # Remove duplicates while preserving order | |
| unique_suggestions = [] | |
| seen = set() | |
| for suggestion in all_suggestions: | |
| if suggestion not in seen: | |
| unique_suggestions.append(suggestion) | |
| seen.add(suggestion) | |
| # Categorize patterns | |
| pattern_categories = defaultdict(list) | |
| for pattern in patterns: | |
| category = pattern.pattern_type.split('_')[0] | |
| pattern_categories[category].append(pattern) | |
| # Calculate overall confidence | |
| overall_confidence = sum(p.confidence_score for p in patterns) / len(patterns) | |
| # Generate priority actions | |
| priority_actions = [] | |
| for pattern in sorted_patterns[:5]: # Top 5 patterns | |
| if pattern.frequency >= 5 and pattern.confidence_score >= 0.7: | |
| priority_actions.append({ | |
| "pattern": pattern.description, | |
| "frequency": pattern.frequency, | |
| "confidence": pattern.confidence_score, | |
| "top_suggestion": pattern.suggested_improvements[0] if pattern.suggested_improvements else "Review pattern manually" | |
| }) | |
| return { | |
| "summary": f"Identified {len(patterns)} significant patterns across feedback data", | |
| "total_patterns": len(patterns), | |
| "pattern_categories": {cat: len(pats) for cat, pats in pattern_categories.items()}, | |
| "recommendations": unique_suggestions[:15], # Top 15 recommendations | |
| "priority_actions": priority_actions, | |
| "confidence_score": overall_confidence, | |
| "most_frequent_pattern": { | |
| "description": sorted_patterns[0].description, | |
| "frequency": sorted_patterns[0].frequency, | |
| "suggestions": sorted_patterns[0].suggested_improvements[:3] | |
| } if sorted_patterns else None, | |
| "affected_scenarios": list(set( | |
| scenario.value for pattern in patterns | |
| for scenario in pattern.affected_scenarios | |
| )), | |
| "report_generated": datetime.now().isoformat() | |
| } |