""" Pattern recognition and analysis for feedback system. Implements automated improvement suggestion generation and feedback aggregation. """ import json from collections import Counter, defaultdict from datetime import datetime, timedelta from typing import List, Dict, Optional, Any, Tuple from pathlib import Path from .data_models import ( ErrorPattern, ClassificationError, QuestionIssue, ReferralProblem, ErrorType, ErrorSubcategory, ScenarioType ) class PatternRecognizer: """ Advanced pattern recognition for identifying common error types and generating automated improvement suggestions based on feedback data analysis. Provides functionality to: - Identify recurring error patterns across different dimensions - Generate data-driven improvement suggestions - Analyze temporal trends in feedback data - Provide aggregated reporting for system optimization """ def __init__(self, min_pattern_frequency: int = 3, confidence_threshold: float = 0.7): """ Initialize the pattern recognizer. Args: min_pattern_frequency: Minimum frequency for a pattern to be considered significant confidence_threshold: Minimum confidence level for pattern suggestions """ self.min_pattern_frequency = min_pattern_frequency self.confidence_threshold = confidence_threshold # Pattern analysis strategies (for future expansion) self.analysis_strategies = { 'error_type_clustering': 'analyze_error_type_patterns', 'subcategory_analysis': 'analyze_subcategory_patterns', 'temporal_trends': 'analyze_temporal_patterns', 'confidence_correlation': 'analyze_confidence_patterns', 'message_content_analysis': 'analyze_message_content_patterns', 'cross_category_analysis': 'analyze_cross_category_patterns' } # Improvement suggestion templates self.suggestion_templates = { 'wrong_classification': { 'high_frequency': "Review classification criteria for {category_pair} transitions - {frequency} occurrences detected", 'confidence_pattern': "Low confidence in {category} classifications suggests need for clearer decision boundaries", 'content_pattern': "Common phrases in misclassified messages: {phrases} - consider training data expansion" }, 'severity_misjudgment': { 'underestimation': "Severity assessment appears to underestimate distress in {context} scenarios", 'overestimation': "Sensitivity may be too high for {context} expressions - consider calibration", 'temporal': "Severity misjudgments increased {trend} over time - review recent changes" }, 'missed_indicators': { 'category_specific': "Frequently missed {indicator_category} indicators - enhance detection algorithms", 'subtle_cues': "Missing subtle distress cues in {scenario_type} scenarios", 'context_dependent': "Indicators missed when {context_condition} - improve context awareness" }, 'question_targeting': { 'scenario_mismatch': "Questions not well-targeted for {scenario_type} scenarios - {frequency} issues", 'sensitivity': "Question sensitivity issues in {context} - review language patterns", 'effectiveness': "Low effectiveness scores for {question_type} questions - consider alternatives" } } def analyze_comprehensive_patterns(self, errors: List[Dict[str, Any]], questions: List[Dict[str, Any]], referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: """ Perform comprehensive pattern analysis across all feedback types. Args: errors: List of classification error records questions: List of question issue records referrals: List of referral problem records Returns: List[ErrorPattern]: Identified patterns with improvement suggestions """ all_patterns = [] # Analyze classification error patterns if errors: error_patterns = self._analyze_classification_error_patterns(errors) all_patterns.extend(error_patterns) # Analyze question issue patterns if questions: question_patterns = self._analyze_question_issue_patterns(questions) all_patterns.extend(question_patterns) # Analyze referral problem patterns if referrals: referral_patterns = self._analyze_referral_problem_patterns(referrals) all_patterns.extend(referral_patterns) # Cross-analysis patterns (relationships between different feedback types) if errors and questions: cross_patterns = self._analyze_cross_feedback_patterns(errors, questions, referrals) all_patterns.extend(cross_patterns) # Sort patterns by significance (frequency * confidence) all_patterns.sort(key=lambda p: p.frequency * p.confidence_score, reverse=True) return all_patterns def _analyze_classification_error_patterns(self, errors: List[Dict[str, Any]]) -> List[ErrorPattern]: """Analyze patterns in classification errors.""" patterns = [] # Error type frequency analysis error_type_counts = Counter(error['error_type'] for error in errors) for error_type, frequency in error_type_counts.items(): if frequency >= self.min_pattern_frequency: related_errors = [e for e in errors if e['error_type'] == error_type] pattern = ErrorPattern( pattern_id=f"error_type_{error_type}_{frequency}", pattern_type=f"error_type_{error_type}", description=f"Frequent {error_type.replace('_', ' ')} errors ({frequency} occurrences)", frequency=frequency, affected_scenarios=self._extract_scenarios_from_errors(related_errors), suggested_improvements=self._generate_error_type_suggestions(error_type, related_errors), confidence_score=min(frequency / 10.0, 1.0) ) patterns.append(pattern) # Subcategory analysis subcategory_counts = Counter(error['subcategory'] for error in errors) for subcategory, frequency in subcategory_counts.items(): if frequency >= self.min_pattern_frequency: related_errors = [e for e in errors if e['subcategory'] == subcategory] pattern = ErrorPattern( pattern_id=f"subcategory_{subcategory}_{frequency}", pattern_type=f"subcategory_{subcategory}", description=f"Frequent {subcategory.replace('_', ' ')} errors ({frequency} occurrences)", frequency=frequency, affected_scenarios=self._extract_scenarios_from_errors(related_errors), suggested_improvements=self._generate_subcategory_suggestions(subcategory, related_errors), confidence_score=min(frequency / 8.0, 1.0) ) patterns.append(pattern) # Category transition analysis transitions = Counter(f"{error['actual_category']}_to_{error['expected_category']}" for error in errors) for transition, frequency in transitions.items(): if frequency >= self.min_pattern_frequency: actual, expected = transition.split('_to_') related_errors = [e for e in errors if e['actual_category'] == actual and e['expected_category'] == expected] pattern = ErrorPattern( pattern_id=f"transition_{transition}_{frequency}", pattern_type=f"category_transition_{transition}", description=f"Frequent {actual} → {expected} misclassifications ({frequency} occurrences)", frequency=frequency, affected_scenarios=self._extract_scenarios_from_errors(related_errors), suggested_improvements=self._generate_transition_suggestions(actual, expected, related_errors), confidence_score=min(frequency / 6.0, 1.0) ) patterns.append(pattern) # Confidence level analysis low_confidence_errors = [e for e in errors if e['confidence_level'] < self.confidence_threshold] if len(low_confidence_errors) >= self.min_pattern_frequency: pattern = ErrorPattern( pattern_id=f"low_confidence_{len(low_confidence_errors)}", pattern_type="low_confidence_pattern", description=f"High number of low-confidence error reports ({len(low_confidence_errors)} occurrences)", frequency=len(low_confidence_errors), affected_scenarios=self._extract_scenarios_from_errors(low_confidence_errors), suggested_improvements=self._generate_confidence_suggestions(low_confidence_errors), confidence_score=0.8 ) patterns.append(pattern) return patterns def _analyze_question_issue_patterns(self, questions: List[Dict[str, Any]]) -> List[ErrorPattern]: """Analyze patterns in question issues.""" patterns = [] # Issue type frequency analysis issue_type_counts = Counter(question['issue_type'] for question in questions) for issue_type, frequency in issue_type_counts.items(): if frequency >= self.min_pattern_frequency: related_questions = [q for q in questions if q['issue_type'] == issue_type] pattern = ErrorPattern( pattern_id=f"question_issue_{issue_type}_{frequency}", pattern_type=f"question_issue_{issue_type}", description=f"Frequent {issue_type.replace('_', ' ')} issues ({frequency} occurrences)", frequency=frequency, affected_scenarios=[ScenarioType(q['scenario_type']) for q in related_questions], suggested_improvements=self._generate_question_issue_suggestions(issue_type, related_questions), confidence_score=min(frequency / 5.0, 1.0) ) patterns.append(pattern) # Scenario-specific question issues scenario_issue_combinations = Counter( f"{question['scenario_type']}_{question['issue_type']}" for question in questions ) for combination, frequency in scenario_issue_combinations.items(): if frequency >= self.min_pattern_frequency: scenario_str, issue = combination.split('_', 1) related_questions = [q for q in questions if q['scenario_type'] == scenario_str and q['issue_type'] == issue] # Try to create ScenarioType, skip if invalid try: scenario_enum = ScenarioType(scenario_str) affected_scenarios = [scenario_enum] except ValueError: affected_scenarios = [] pattern = ErrorPattern( pattern_id=f"scenario_issue_{combination}_{frequency}", pattern_type=f"scenario_specific_{combination}", description=f"Frequent {issue.replace('_', ' ')} issues in {scenario_str.replace('_', ' ')} scenarios ({frequency} occurrences)", frequency=frequency, affected_scenarios=affected_scenarios, suggested_improvements=self._generate_scenario_specific_suggestions(scenario_str, issue, related_questions), confidence_score=min(frequency / 4.0, 1.0) ) patterns.append(pattern) return patterns def _analyze_referral_problem_patterns(self, referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: """Analyze patterns in referral problems.""" patterns = [] # Problem type frequency analysis problem_type_counts = Counter(referral['problem_type'] for referral in referrals) for problem_type, frequency in problem_type_counts.items(): if frequency >= self.min_pattern_frequency: related_referrals = [r for r in referrals if r['problem_type'] == problem_type] pattern = ErrorPattern( pattern_id=f"referral_problem_{problem_type}_{frequency}", pattern_type=f"referral_problem_{problem_type}", description=f"Frequent {problem_type.replace('_', ' ')} problems ({frequency} occurrences)", frequency=frequency, affected_scenarios=[], # Referrals don't have scenarios suggested_improvements=self._generate_referral_problem_suggestions(problem_type, related_referrals), confidence_score=min(frequency / 4.0, 1.0) ) patterns.append(pattern) # Missing fields analysis all_missing_fields = [] for referral in referrals: all_missing_fields.extend(referral.get('missing_fields', [])) missing_field_counts = Counter(all_missing_fields) for field, frequency in missing_field_counts.items(): if frequency >= self.min_pattern_frequency: pattern = ErrorPattern( pattern_id=f"missing_field_{field}_{frequency}", pattern_type=f"missing_field_{field}", description=f"Frequently missing field: {field} ({frequency} occurrences)", frequency=frequency, affected_scenarios=[], suggested_improvements=[f"Improve {field} capture in referral generation", f"Add validation for {field} field", f"Enhance {field} extraction from conversation context"], confidence_score=min(frequency / 3.0, 1.0) ) patterns.append(pattern) return patterns def _analyze_cross_feedback_patterns(self, errors: List[Dict[str, Any]], questions: List[Dict[str, Any]], referrals: List[Dict[str, Any]]) -> List[ErrorPattern]: """Analyze patterns across different feedback types.""" patterns = [] # Correlation between classification errors and question issues error_sessions = {error.get('session_id') for error in errors if error.get('session_id')} question_sessions = {question.get('session_id') for question in questions if question.get('session_id')} common_sessions = error_sessions.intersection(question_sessions) if len(common_sessions) >= self.min_pattern_frequency: pattern = ErrorPattern( pattern_id=f"error_question_correlation_{len(common_sessions)}", pattern_type="error_question_correlation", description=f"Sessions with both classification errors and question issues ({len(common_sessions)} sessions)", frequency=len(common_sessions), affected_scenarios=[], suggested_improvements=[ "Review sessions with multiple issue types for systemic problems", "Investigate correlation between classification accuracy and question quality", "Consider integrated training for both classification and question generation" ], confidence_score=0.7 ) patterns.append(pattern) return patterns def _extract_scenarios_from_errors(self, errors: List[Dict[str, Any]]) -> List[ScenarioType]: """Extract scenario types from error additional context.""" scenarios = set() for error in errors: context = error.get('additional_context', {}) if 'scenario_type' in context: try: scenarios.add(ScenarioType(context['scenario_type'])) except ValueError: pass return list(scenarios) def _generate_error_type_suggestions(self, error_type: str, related_errors: List[Dict]) -> List[str]: """Generate improvement suggestions for specific error types.""" suggestions = [] if error_type == "wrong_classification": # Analyze common misclassification patterns transitions = Counter(f"{e['actual_category']}_to_{e['expected_category']}" for e in related_errors) most_common = transitions.most_common(1) if most_common: transition = most_common[0][0] suggestions.append(f"Review classification criteria for {transition.replace('_to_', ' → ')} transitions") suggestions.extend([ "Add more training examples for edge cases", "Refine decision boundaries between categories", "Implement additional validation checks for ambiguous cases" ]) elif error_type == "severity_misjudgment": # Analyze severity patterns underestimated = sum(1 for e in related_errors if e.get('subcategory') == 'underestimated_distress') overestimated = sum(1 for e in related_errors if e.get('subcategory') == 'overestimated_distress') if underestimated > overestimated: suggestions.append("Increase sensitivity to subtle distress indicators") elif overestimated > underestimated: suggestions.append("Reduce false positive triggers for normal expressions") suggestions.extend([ "Calibrate severity assessment algorithms", "Add contextual weighting for distress indicators", "Improve training data balance for severity levels" ]) elif error_type == "missed_indicators": suggestions.extend([ "Expand indicator recognition patterns", "Improve natural language processing for subtle cues", "Add more comprehensive indicator training data", "Enhance context-aware indicator detection" ]) elif error_type == "context_misunderstanding": suggestions.extend([ "Enhance conversation history integration", "Improve defensive response detection algorithms", "Add contextual reasoning capabilities", "Strengthen temporal context awareness" ]) return suggestions def _generate_subcategory_suggestions(self, subcategory: str, related_errors: List[Dict]) -> List[str]: """Generate improvement suggestions for specific error subcategories.""" suggestions = [] # Analyze common words in error messages common_words = self._extract_common_words([e['message_content'] for e in related_errors]) if subcategory in ["green_to_yellow", "green_to_red"]: suggestions.extend([ f"Reduce sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'common expressions'}", "Add negative examples to training data", "Strengthen criteria for non-distress expressions" ]) elif subcategory in ["yellow_to_green", "red_to_green"]: suggestions.extend([ f"Increase sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'distress indicators'}", "Strengthen distress indicator detection", "Add more positive examples of distress expressions" ]) elif subcategory in ["underestimated_distress", "overestimated_distress"]: suggestions.extend([ f"Calibrate severity assessment for {subcategory.replace('_', ' ')} patterns", "Review severity thresholds and criteria", "Add contextual weighting for severity indicators" ]) # Default suggestions if none matched if not suggestions: suggestions.extend([ f"Review {subcategory.replace('_', ' ')} error patterns", f"Improve detection accuracy for {subcategory.replace('_', ' ')} cases", "Add more training data for this error type" ]) return suggestions def _generate_transition_suggestions(self, actual: str, expected: str, related_errors: List[Dict]) -> List[str]: """Generate suggestions for specific category transitions.""" suggestions = [] transition_name = f"{actual} → {expected}" suggestions.append(f"Review decision criteria for {transition_name} boundary") # Analyze confidence levels for this transition avg_confidence = sum(e['confidence_level'] for e in related_errors) / len(related_errors) if avg_confidence < 0.7: suggestions.append(f"Low reviewer confidence ({avg_confidence:.2f}) suggests unclear criteria for {transition_name}") # Common phrases analysis common_words = self._extract_common_words([e['message_content'] for e in related_errors]) if common_words: suggestions.append(f"Common phrases in {transition_name} errors: {', '.join(common_words[:3])}") return suggestions def _generate_confidence_suggestions(self, low_confidence_errors: List[Dict]) -> List[str]: """Generate suggestions for low confidence patterns.""" return [ "Review feedback guidelines to improve reviewer confidence", "Provide additional training for edge case identification", "Consider adding confidence calibration exercises", "Implement inter-reviewer agreement checks" ] def _generate_question_issue_suggestions(self, issue_type: str, related_questions: List[Dict]) -> List[str]: """Generate suggestions for question issues.""" suggestions = [] if issue_type == "inappropriate_question": suggestions.extend([ "Review question appropriateness guidelines", "Add sensitivity training for question generation", "Implement question validation checks" ]) elif issue_type == "wrong_scenario_targeting": scenarios = Counter(q['scenario_type'] for q in related_questions) most_common_scenario = scenarios.most_common(1)[0][0] if scenarios else "unknown" suggestions.extend([ f"Improve question targeting for {most_common_scenario.replace('_', ' ')} scenarios", "Enhance scenario detection accuracy", "Add scenario-specific question validation" ]) return suggestions def _generate_scenario_specific_suggestions(self, scenario: str, issue: str, related_questions: List[Dict]) -> List[str]: """Generate suggestions for scenario-specific issues.""" return [ f"Review {issue.replace('_', ' ')} patterns in {scenario.replace('_', ' ')} scenarios", f"Enhance question templates for {scenario.replace('_', ' ')} situations", f"Add specialized training for {scenario.replace('_', ' ')} question generation" ] def _generate_referral_problem_suggestions(self, problem_type: str, related_referrals: List[Dict]) -> List[str]: """Generate suggestions for referral problems.""" suggestions = [] if problem_type == "incomplete_summary": suggestions.extend([ "Enhance summary generation completeness checks", "Add required field validation for summaries", "Improve context extraction for referral summaries" ]) elif problem_type == "missing_contact_info": suggestions.extend([ "Implement contact information validation", "Add contact info extraction from conversation", "Enhance referral template completeness" ]) return suggestions def _extract_common_words(self, messages: List[str]) -> List[str]: """Extract common words from error messages.""" if not messages: return [] # Simple word frequency analysis word_counts = Counter() for message in messages: words = message.lower().split() # Filter out common stop words and short words filtered_words = [ w for w in words if len(w) > 3 and w not in ['the', 'and', 'that', 'this', 'with', 'have', 'will', 'been', 'they', 'their', 'from', 'were', 'said', 'each', 'which', 'what', 'about'] ] word_counts.update(filtered_words) return [word for word, count in word_counts.most_common(5)] def generate_optimization_report(self, patterns: List[ErrorPattern]) -> Dict[str, Any]: """ Generate a comprehensive optimization report based on identified patterns. Args: patterns: List of identified error patterns Returns: Dict[str, Any]: Comprehensive optimization report """ if not patterns: return { "summary": "No significant patterns identified", "total_patterns": 0, "recommendations": ["Continue monitoring for patterns"], "priority_actions": [], "confidence_score": 0.0 } # Sort patterns by priority (frequency * confidence) sorted_patterns = sorted(patterns, key=lambda p: p.frequency * p.confidence_score, reverse=True) # Extract top recommendations all_suggestions = [] for pattern in sorted_patterns[:10]: # Top 10 patterns all_suggestions.extend(pattern.suggested_improvements) # Remove duplicates while preserving order unique_suggestions = [] seen = set() for suggestion in all_suggestions: if suggestion not in seen: unique_suggestions.append(suggestion) seen.add(suggestion) # Categorize patterns pattern_categories = defaultdict(list) for pattern in patterns: category = pattern.pattern_type.split('_')[0] pattern_categories[category].append(pattern) # Calculate overall confidence overall_confidence = sum(p.confidence_score for p in patterns) / len(patterns) # Generate priority actions priority_actions = [] for pattern in sorted_patterns[:5]: # Top 5 patterns if pattern.frequency >= 5 and pattern.confidence_score >= 0.7: priority_actions.append({ "pattern": pattern.description, "frequency": pattern.frequency, "confidence": pattern.confidence_score, "top_suggestion": pattern.suggested_improvements[0] if pattern.suggested_improvements else "Review pattern manually" }) return { "summary": f"Identified {len(patterns)} significant patterns across feedback data", "total_patterns": len(patterns), "pattern_categories": {cat: len(pats) for cat, pats in pattern_categories.items()}, "recommendations": unique_suggestions[:15], # Top 15 recommendations "priority_actions": priority_actions, "confidence_score": overall_confidence, "most_frequent_pattern": { "description": sorted_patterns[0].description, "frequency": sorted_patterns[0].frequency, "suggestions": sorted_patterns[0].suggested_improvements[:3] } if sorted_patterns else None, "affected_scenarios": list(set( scenario.value for pattern in patterns for scenario in pattern.affected_scenarios )), "report_generated": datetime.now().isoformat() }