DocUA's picture
feat: Complete prompt optimization system implementation
24214fc
"""
Pattern recognition and analysis for feedback system.
Implements automated improvement suggestion generation and feedback aggregation.
"""
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path
from .data_models import (
ErrorPattern, ClassificationError, QuestionIssue, ReferralProblem,
ErrorType, ErrorSubcategory, ScenarioType
)
class PatternRecognizer:
"""
Advanced pattern recognition for identifying common error types and generating
automated improvement suggestions based on feedback data analysis.
Provides functionality to:
- Identify recurring error patterns across different dimensions
- Generate data-driven improvement suggestions
- Analyze temporal trends in feedback data
- Provide aggregated reporting for system optimization
"""
def __init__(self, min_pattern_frequency: int = 3, confidence_threshold: float = 0.7):
"""
Initialize the pattern recognizer.
Args:
min_pattern_frequency: Minimum frequency for a pattern to be considered significant
confidence_threshold: Minimum confidence level for pattern suggestions
"""
self.min_pattern_frequency = min_pattern_frequency
self.confidence_threshold = confidence_threshold
# Pattern analysis strategies (for future expansion)
self.analysis_strategies = {
'error_type_clustering': 'analyze_error_type_patterns',
'subcategory_analysis': 'analyze_subcategory_patterns',
'temporal_trends': 'analyze_temporal_patterns',
'confidence_correlation': 'analyze_confidence_patterns',
'message_content_analysis': 'analyze_message_content_patterns',
'cross_category_analysis': 'analyze_cross_category_patterns'
}
# Improvement suggestion templates
self.suggestion_templates = {
'wrong_classification': {
'high_frequency': "Review classification criteria for {category_pair} transitions - {frequency} occurrences detected",
'confidence_pattern': "Low confidence in {category} classifications suggests need for clearer decision boundaries",
'content_pattern': "Common phrases in misclassified messages: {phrases} - consider training data expansion"
},
'severity_misjudgment': {
'underestimation': "Severity assessment appears to underestimate distress in {context} scenarios",
'overestimation': "Sensitivity may be too high for {context} expressions - consider calibration",
'temporal': "Severity misjudgments increased {trend} over time - review recent changes"
},
'missed_indicators': {
'category_specific': "Frequently missed {indicator_category} indicators - enhance detection algorithms",
'subtle_cues': "Missing subtle distress cues in {scenario_type} scenarios",
'context_dependent': "Indicators missed when {context_condition} - improve context awareness"
},
'question_targeting': {
'scenario_mismatch': "Questions not well-targeted for {scenario_type} scenarios - {frequency} issues",
'sensitivity': "Question sensitivity issues in {context} - review language patterns",
'effectiveness': "Low effectiveness scores for {question_type} questions - consider alternatives"
}
}
def analyze_comprehensive_patterns(self,
errors: List[Dict[str, Any]],
questions: List[Dict[str, Any]],
referrals: List[Dict[str, Any]]) -> List[ErrorPattern]:
"""
Perform comprehensive pattern analysis across all feedback types.
Args:
errors: List of classification error records
questions: List of question issue records
referrals: List of referral problem records
Returns:
List[ErrorPattern]: Identified patterns with improvement suggestions
"""
all_patterns = []
# Analyze classification error patterns
if errors:
error_patterns = self._analyze_classification_error_patterns(errors)
all_patterns.extend(error_patterns)
# Analyze question issue patterns
if questions:
question_patterns = self._analyze_question_issue_patterns(questions)
all_patterns.extend(question_patterns)
# Analyze referral problem patterns
if referrals:
referral_patterns = self._analyze_referral_problem_patterns(referrals)
all_patterns.extend(referral_patterns)
# Cross-analysis patterns (relationships between different feedback types)
if errors and questions:
cross_patterns = self._analyze_cross_feedback_patterns(errors, questions, referrals)
all_patterns.extend(cross_patterns)
# Sort patterns by significance (frequency * confidence)
all_patterns.sort(key=lambda p: p.frequency * p.confidence_score, reverse=True)
return all_patterns
def _analyze_classification_error_patterns(self, errors: List[Dict[str, Any]]) -> List[ErrorPattern]:
"""Analyze patterns in classification errors."""
patterns = []
# Error type frequency analysis
error_type_counts = Counter(error['error_type'] for error in errors)
for error_type, frequency in error_type_counts.items():
if frequency >= self.min_pattern_frequency:
related_errors = [e for e in errors if e['error_type'] == error_type]
pattern = ErrorPattern(
pattern_id=f"error_type_{error_type}_{frequency}",
pattern_type=f"error_type_{error_type}",
description=f"Frequent {error_type.replace('_', ' ')} errors ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=self._extract_scenarios_from_errors(related_errors),
suggested_improvements=self._generate_error_type_suggestions(error_type, related_errors),
confidence_score=min(frequency / 10.0, 1.0)
)
patterns.append(pattern)
# Subcategory analysis
subcategory_counts = Counter(error['subcategory'] for error in errors)
for subcategory, frequency in subcategory_counts.items():
if frequency >= self.min_pattern_frequency:
related_errors = [e for e in errors if e['subcategory'] == subcategory]
pattern = ErrorPattern(
pattern_id=f"subcategory_{subcategory}_{frequency}",
pattern_type=f"subcategory_{subcategory}",
description=f"Frequent {subcategory.replace('_', ' ')} errors ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=self._extract_scenarios_from_errors(related_errors),
suggested_improvements=self._generate_subcategory_suggestions(subcategory, related_errors),
confidence_score=min(frequency / 8.0, 1.0)
)
patterns.append(pattern)
# Category transition analysis
transitions = Counter(f"{error['actual_category']}_to_{error['expected_category']}" for error in errors)
for transition, frequency in transitions.items():
if frequency >= self.min_pattern_frequency:
actual, expected = transition.split('_to_')
related_errors = [e for e in errors if e['actual_category'] == actual and e['expected_category'] == expected]
pattern = ErrorPattern(
pattern_id=f"transition_{transition}_{frequency}",
pattern_type=f"category_transition_{transition}",
description=f"Frequent {actual}{expected} misclassifications ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=self._extract_scenarios_from_errors(related_errors),
suggested_improvements=self._generate_transition_suggestions(actual, expected, related_errors),
confidence_score=min(frequency / 6.0, 1.0)
)
patterns.append(pattern)
# Confidence level analysis
low_confidence_errors = [e for e in errors if e['confidence_level'] < self.confidence_threshold]
if len(low_confidence_errors) >= self.min_pattern_frequency:
pattern = ErrorPattern(
pattern_id=f"low_confidence_{len(low_confidence_errors)}",
pattern_type="low_confidence_pattern",
description=f"High number of low-confidence error reports ({len(low_confidence_errors)} occurrences)",
frequency=len(low_confidence_errors),
affected_scenarios=self._extract_scenarios_from_errors(low_confidence_errors),
suggested_improvements=self._generate_confidence_suggestions(low_confidence_errors),
confidence_score=0.8
)
patterns.append(pattern)
return patterns
def _analyze_question_issue_patterns(self, questions: List[Dict[str, Any]]) -> List[ErrorPattern]:
"""Analyze patterns in question issues."""
patterns = []
# Issue type frequency analysis
issue_type_counts = Counter(question['issue_type'] for question in questions)
for issue_type, frequency in issue_type_counts.items():
if frequency >= self.min_pattern_frequency:
related_questions = [q for q in questions if q['issue_type'] == issue_type]
pattern = ErrorPattern(
pattern_id=f"question_issue_{issue_type}_{frequency}",
pattern_type=f"question_issue_{issue_type}",
description=f"Frequent {issue_type.replace('_', ' ')} issues ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=[ScenarioType(q['scenario_type']) for q in related_questions],
suggested_improvements=self._generate_question_issue_suggestions(issue_type, related_questions),
confidence_score=min(frequency / 5.0, 1.0)
)
patterns.append(pattern)
# Scenario-specific question issues
scenario_issue_combinations = Counter(
f"{question['scenario_type']}_{question['issue_type']}" for question in questions
)
for combination, frequency in scenario_issue_combinations.items():
if frequency >= self.min_pattern_frequency:
scenario_str, issue = combination.split('_', 1)
related_questions = [q for q in questions if q['scenario_type'] == scenario_str and q['issue_type'] == issue]
# Try to create ScenarioType, skip if invalid
try:
scenario_enum = ScenarioType(scenario_str)
affected_scenarios = [scenario_enum]
except ValueError:
affected_scenarios = []
pattern = ErrorPattern(
pattern_id=f"scenario_issue_{combination}_{frequency}",
pattern_type=f"scenario_specific_{combination}",
description=f"Frequent {issue.replace('_', ' ')} issues in {scenario_str.replace('_', ' ')} scenarios ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=affected_scenarios,
suggested_improvements=self._generate_scenario_specific_suggestions(scenario_str, issue, related_questions),
confidence_score=min(frequency / 4.0, 1.0)
)
patterns.append(pattern)
return patterns
def _analyze_referral_problem_patterns(self, referrals: List[Dict[str, Any]]) -> List[ErrorPattern]:
"""Analyze patterns in referral problems."""
patterns = []
# Problem type frequency analysis
problem_type_counts = Counter(referral['problem_type'] for referral in referrals)
for problem_type, frequency in problem_type_counts.items():
if frequency >= self.min_pattern_frequency:
related_referrals = [r for r in referrals if r['problem_type'] == problem_type]
pattern = ErrorPattern(
pattern_id=f"referral_problem_{problem_type}_{frequency}",
pattern_type=f"referral_problem_{problem_type}",
description=f"Frequent {problem_type.replace('_', ' ')} problems ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=[], # Referrals don't have scenarios
suggested_improvements=self._generate_referral_problem_suggestions(problem_type, related_referrals),
confidence_score=min(frequency / 4.0, 1.0)
)
patterns.append(pattern)
# Missing fields analysis
all_missing_fields = []
for referral in referrals:
all_missing_fields.extend(referral.get('missing_fields', []))
missing_field_counts = Counter(all_missing_fields)
for field, frequency in missing_field_counts.items():
if frequency >= self.min_pattern_frequency:
pattern = ErrorPattern(
pattern_id=f"missing_field_{field}_{frequency}",
pattern_type=f"missing_field_{field}",
description=f"Frequently missing field: {field} ({frequency} occurrences)",
frequency=frequency,
affected_scenarios=[],
suggested_improvements=[f"Improve {field} capture in referral generation",
f"Add validation for {field} field",
f"Enhance {field} extraction from conversation context"],
confidence_score=min(frequency / 3.0, 1.0)
)
patterns.append(pattern)
return patterns
def _analyze_cross_feedback_patterns(self,
errors: List[Dict[str, Any]],
questions: List[Dict[str, Any]],
referrals: List[Dict[str, Any]]) -> List[ErrorPattern]:
"""Analyze patterns across different feedback types."""
patterns = []
# Correlation between classification errors and question issues
error_sessions = {error.get('session_id') for error in errors if error.get('session_id')}
question_sessions = {question.get('session_id') for question in questions if question.get('session_id')}
common_sessions = error_sessions.intersection(question_sessions)
if len(common_sessions) >= self.min_pattern_frequency:
pattern = ErrorPattern(
pattern_id=f"error_question_correlation_{len(common_sessions)}",
pattern_type="error_question_correlation",
description=f"Sessions with both classification errors and question issues ({len(common_sessions)} sessions)",
frequency=len(common_sessions),
affected_scenarios=[],
suggested_improvements=[
"Review sessions with multiple issue types for systemic problems",
"Investigate correlation between classification accuracy and question quality",
"Consider integrated training for both classification and question generation"
],
confidence_score=0.7
)
patterns.append(pattern)
return patterns
def _extract_scenarios_from_errors(self, errors: List[Dict[str, Any]]) -> List[ScenarioType]:
"""Extract scenario types from error additional context."""
scenarios = set()
for error in errors:
context = error.get('additional_context', {})
if 'scenario_type' in context:
try:
scenarios.add(ScenarioType(context['scenario_type']))
except ValueError:
pass
return list(scenarios)
def _generate_error_type_suggestions(self, error_type: str, related_errors: List[Dict]) -> List[str]:
"""Generate improvement suggestions for specific error types."""
suggestions = []
if error_type == "wrong_classification":
# Analyze common misclassification patterns
transitions = Counter(f"{e['actual_category']}_to_{e['expected_category']}" for e in related_errors)
most_common = transitions.most_common(1)
if most_common:
transition = most_common[0][0]
suggestions.append(f"Review classification criteria for {transition.replace('_to_', ' → ')} transitions")
suggestions.extend([
"Add more training examples for edge cases",
"Refine decision boundaries between categories",
"Implement additional validation checks for ambiguous cases"
])
elif error_type == "severity_misjudgment":
# Analyze severity patterns
underestimated = sum(1 for e in related_errors if e.get('subcategory') == 'underestimated_distress')
overestimated = sum(1 for e in related_errors if e.get('subcategory') == 'overestimated_distress')
if underestimated > overestimated:
suggestions.append("Increase sensitivity to subtle distress indicators")
elif overestimated > underestimated:
suggestions.append("Reduce false positive triggers for normal expressions")
suggestions.extend([
"Calibrate severity assessment algorithms",
"Add contextual weighting for distress indicators",
"Improve training data balance for severity levels"
])
elif error_type == "missed_indicators":
suggestions.extend([
"Expand indicator recognition patterns",
"Improve natural language processing for subtle cues",
"Add more comprehensive indicator training data",
"Enhance context-aware indicator detection"
])
elif error_type == "context_misunderstanding":
suggestions.extend([
"Enhance conversation history integration",
"Improve defensive response detection algorithms",
"Add contextual reasoning capabilities",
"Strengthen temporal context awareness"
])
return suggestions
def _generate_subcategory_suggestions(self, subcategory: str, related_errors: List[Dict]) -> List[str]:
"""Generate improvement suggestions for specific error subcategories."""
suggestions = []
# Analyze common words in error messages
common_words = self._extract_common_words([e['message_content'] for e in related_errors])
if subcategory in ["green_to_yellow", "green_to_red"]:
suggestions.extend([
f"Reduce sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'common expressions'}",
"Add negative examples to training data",
"Strengthen criteria for non-distress expressions"
])
elif subcategory in ["yellow_to_green", "red_to_green"]:
suggestions.extend([
f"Increase sensitivity to phrases like: {', '.join(common_words[:3]) if common_words else 'distress indicators'}",
"Strengthen distress indicator detection",
"Add more positive examples of distress expressions"
])
elif subcategory in ["underestimated_distress", "overestimated_distress"]:
suggestions.extend([
f"Calibrate severity assessment for {subcategory.replace('_', ' ')} patterns",
"Review severity thresholds and criteria",
"Add contextual weighting for severity indicators"
])
# Default suggestions if none matched
if not suggestions:
suggestions.extend([
f"Review {subcategory.replace('_', ' ')} error patterns",
f"Improve detection accuracy for {subcategory.replace('_', ' ')} cases",
"Add more training data for this error type"
])
return suggestions
def _generate_transition_suggestions(self, actual: str, expected: str, related_errors: List[Dict]) -> List[str]:
"""Generate suggestions for specific category transitions."""
suggestions = []
transition_name = f"{actual}{expected}"
suggestions.append(f"Review decision criteria for {transition_name} boundary")
# Analyze confidence levels for this transition
avg_confidence = sum(e['confidence_level'] for e in related_errors) / len(related_errors)
if avg_confidence < 0.7:
suggestions.append(f"Low reviewer confidence ({avg_confidence:.2f}) suggests unclear criteria for {transition_name}")
# Common phrases analysis
common_words = self._extract_common_words([e['message_content'] for e in related_errors])
if common_words:
suggestions.append(f"Common phrases in {transition_name} errors: {', '.join(common_words[:3])}")
return suggestions
def _generate_confidence_suggestions(self, low_confidence_errors: List[Dict]) -> List[str]:
"""Generate suggestions for low confidence patterns."""
return [
"Review feedback guidelines to improve reviewer confidence",
"Provide additional training for edge case identification",
"Consider adding confidence calibration exercises",
"Implement inter-reviewer agreement checks"
]
def _generate_question_issue_suggestions(self, issue_type: str, related_questions: List[Dict]) -> List[str]:
"""Generate suggestions for question issues."""
suggestions = []
if issue_type == "inappropriate_question":
suggestions.extend([
"Review question appropriateness guidelines",
"Add sensitivity training for question generation",
"Implement question validation checks"
])
elif issue_type == "wrong_scenario_targeting":
scenarios = Counter(q['scenario_type'] for q in related_questions)
most_common_scenario = scenarios.most_common(1)[0][0] if scenarios else "unknown"
suggestions.extend([
f"Improve question targeting for {most_common_scenario.replace('_', ' ')} scenarios",
"Enhance scenario detection accuracy",
"Add scenario-specific question validation"
])
return suggestions
def _generate_scenario_specific_suggestions(self, scenario: str, issue: str, related_questions: List[Dict]) -> List[str]:
"""Generate suggestions for scenario-specific issues."""
return [
f"Review {issue.replace('_', ' ')} patterns in {scenario.replace('_', ' ')} scenarios",
f"Enhance question templates for {scenario.replace('_', ' ')} situations",
f"Add specialized training for {scenario.replace('_', ' ')} question generation"
]
def _generate_referral_problem_suggestions(self, problem_type: str, related_referrals: List[Dict]) -> List[str]:
"""Generate suggestions for referral problems."""
suggestions = []
if problem_type == "incomplete_summary":
suggestions.extend([
"Enhance summary generation completeness checks",
"Add required field validation for summaries",
"Improve context extraction for referral summaries"
])
elif problem_type == "missing_contact_info":
suggestions.extend([
"Implement contact information validation",
"Add contact info extraction from conversation",
"Enhance referral template completeness"
])
return suggestions
def _extract_common_words(self, messages: List[str]) -> List[str]:
"""Extract common words from error messages."""
if not messages:
return []
# Simple word frequency analysis
word_counts = Counter()
for message in messages:
words = message.lower().split()
# Filter out common stop words and short words
filtered_words = [
w for w in words
if len(w) > 3 and w not in ['the', 'and', 'that', 'this', 'with', 'have', 'will', 'been', 'they', 'their', 'from', 'were', 'said', 'each', 'which', 'what', 'about']
]
word_counts.update(filtered_words)
return [word for word, count in word_counts.most_common(5)]
def generate_optimization_report(self, patterns: List[ErrorPattern]) -> Dict[str, Any]:
"""
Generate a comprehensive optimization report based on identified patterns.
Args:
patterns: List of identified error patterns
Returns:
Dict[str, Any]: Comprehensive optimization report
"""
if not patterns:
return {
"summary": "No significant patterns identified",
"total_patterns": 0,
"recommendations": ["Continue monitoring for patterns"],
"priority_actions": [],
"confidence_score": 0.0
}
# Sort patterns by priority (frequency * confidence)
sorted_patterns = sorted(patterns, key=lambda p: p.frequency * p.confidence_score, reverse=True)
# Extract top recommendations
all_suggestions = []
for pattern in sorted_patterns[:10]: # Top 10 patterns
all_suggestions.extend(pattern.suggested_improvements)
# Remove duplicates while preserving order
unique_suggestions = []
seen = set()
for suggestion in all_suggestions:
if suggestion not in seen:
unique_suggestions.append(suggestion)
seen.add(suggestion)
# Categorize patterns
pattern_categories = defaultdict(list)
for pattern in patterns:
category = pattern.pattern_type.split('_')[0]
pattern_categories[category].append(pattern)
# Calculate overall confidence
overall_confidence = sum(p.confidence_score for p in patterns) / len(patterns)
# Generate priority actions
priority_actions = []
for pattern in sorted_patterns[:5]: # Top 5 patterns
if pattern.frequency >= 5 and pattern.confidence_score >= 0.7:
priority_actions.append({
"pattern": pattern.description,
"frequency": pattern.frequency,
"confidence": pattern.confidence_score,
"top_suggestion": pattern.suggested_improvements[0] if pattern.suggested_improvements else "Review pattern manually"
})
return {
"summary": f"Identified {len(patterns)} significant patterns across feedback data",
"total_patterns": len(patterns),
"pattern_categories": {cat: len(pats) for cat, pats in pattern_categories.items()},
"recommendations": unique_suggestions[:15], # Top 15 recommendations
"priority_actions": priority_actions,
"confidence_score": overall_confidence,
"most_frequent_pattern": {
"description": sorted_patterns[0].description,
"frequency": sorted_patterns[0].frequency,
"suggestions": sorted_patterns[0].suggested_improvements[:3]
} if sorted_patterns else None,
"affected_scenarios": list(set(
scenario.value for pattern in patterns
for scenario in pattern.affected_scenarios
)),
"report_generated": datetime.now().isoformat()
}