Spaces:
Build error
Build error
| """ | |
| Usage Pattern Analyzer for Chain of Thought System | |
| Analyze usage patterns to improve system performance | |
| """ | |
| import time | |
| import json | |
| import os | |
| from typing import Dict, List, Any, Optional | |
| from collections import defaultdict, deque | |
| from datetime import datetime, timedelta | |
| import logging | |
| # Import the CoT system | |
| from src.core.optimized_chain_of_thought import ReasoningPath | |
| logger = logging.getLogger(__name__) | |
| class UsagePatternAnalyzer: | |
| """Analyze usage patterns to improve system performance""" | |
| def __init__(self, max_history: int = 10000): | |
| self.query_patterns = defaultdict(lambda: { | |
| 'count': 0, | |
| 'avg_complexity': 0, | |
| 'avg_confidence': 0, | |
| 'avg_time': 0, | |
| 'common_templates': defaultdict(int), | |
| 'time_distribution': defaultdict(int), # hour of day | |
| 'day_distribution': defaultdict(int), # day of week | |
| 'user_satisfaction': [], | |
| 'error_count': 0, | |
| 'last_used': None | |
| }) | |
| self.pattern_clusters = [] | |
| self.usage_history = deque(maxlen=max_history) | |
| self.session_data = defaultdict(list) | |
| # Create analytics directory if it doesn't exist | |
| os.makedirs('analytics', exist_ok=True) | |
| def analyze_query(self, query: str, result: ReasoningPath, | |
| timestamp: Optional[float] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| user_feedback: Optional[float] = None): | |
| """Analyze a query and its result""" | |
| if timestamp is None: | |
| timestamp = time.time() | |
| # Extract query pattern | |
| pattern = self._extract_pattern(query) | |
| # Update pattern statistics | |
| data = self.query_patterns[pattern] | |
| data['count'] += 1 | |
| # Update running averages | |
| n = data['count'] | |
| data['avg_complexity'] = ( | |
| (data['avg_complexity'] * (n - 1) + result.complexity_score) / n | |
| ) | |
| data['avg_confidence'] = ( | |
| (data['avg_confidence'] * (n - 1) + result.total_confidence) / n | |
| ) | |
| data['avg_time'] = ( | |
| (data['avg_time'] * (n - 1) + result.execution_time) / n | |
| ) | |
| # Track template usage | |
| if result.template_used: | |
| data['common_templates'][result.template_used] += 1 | |
| # Track time distribution | |
| dt = datetime.fromtimestamp(timestamp) | |
| data['time_distribution'][dt.hour] += 1 | |
| data['day_distribution'][dt.weekday()] += 1 | |
| data['last_used'] = timestamp | |
| # Track user satisfaction | |
| if user_feedback is not None: | |
| data['user_satisfaction'].append(user_feedback) | |
| # Track errors | |
| if result.total_confidence < 0.3: | |
| data['error_count'] += 1 | |
| # Store in usage history | |
| usage_entry = { | |
| 'timestamp': timestamp, | |
| 'query': query, | |
| 'pattern': pattern, | |
| 'result': { | |
| 'confidence': result.total_confidence, | |
| 'execution_time': result.execution_time, | |
| 'steps_count': len(result.steps), | |
| 'template_used': result.template_used, | |
| 'complexity_score': result.complexity_score | |
| }, | |
| 'user_id': user_id, | |
| 'session_id': session_id, | |
| 'user_feedback': user_feedback | |
| } | |
| self.usage_history.append(usage_entry) | |
| # Track session data | |
| if session_id: | |
| self.session_data[session_id].append(usage_entry) | |
| def _extract_pattern(self, query: str) -> str: | |
| """Extract pattern from query""" | |
| # Simple pattern extraction - can be enhanced with NLP | |
| patterns = { | |
| 'definition': ['what is', 'define', 'meaning of', 'explain'], | |
| 'comparison': ['compare', 'difference', 'versus', 'vs', 'contrast'], | |
| 'explanation': ['how does', 'why', 'explain', 'describe'], | |
| 'analysis': ['analyze', 'evaluate', 'assess', 'examine'], | |
| 'calculation': ['calculate', 'compute', 'solve', 'find'], | |
| 'prediction': ['predict', 'forecast', 'estimate', 'project'], | |
| 'recommendation': ['recommend', 'suggest', 'advise', 'propose'], | |
| 'troubleshooting': ['fix', 'debug', 'error', 'problem', 'issue'] | |
| } | |
| query_lower = query.lower() | |
| for pattern_type, indicators in patterns.items(): | |
| if any(ind in query_lower for ind in indicators): | |
| return pattern_type | |
| return 'general' | |
| def get_optimization_insights(self) -> Dict[str, Any]: | |
| """Generate insights for system optimization""" | |
| insights = { | |
| 'popular_patterns': self._get_popular_patterns(), | |
| 'problem_patterns': self._get_problem_patterns(), | |
| 'time_based_insights': self._get_time_insights(), | |
| 'template_effectiveness': self._analyze_template_effectiveness(), | |
| 'session_analysis': self._analyze_sessions(), | |
| 'performance_trends': self._analyze_performance_trends(), | |
| 'recommendations': [] | |
| } | |
| # Generate recommendations | |
| insights['recommendations'] = self._generate_recommendations(insights) | |
| return insights | |
| def _get_popular_patterns(self): | |
| """Identify most common query patterns""" | |
| sorted_patterns = sorted( | |
| self.query_patterns.items(), | |
| key=lambda x: x[1]['count'], | |
| reverse=True | |
| ) | |
| return [ | |
| { | |
| 'pattern': pattern, | |
| 'count': data['count'], | |
| 'avg_complexity': data['avg_complexity'], | |
| 'avg_confidence': data['avg_confidence'], | |
| 'avg_time': data['avg_time'] | |
| } | |
| for pattern, data in sorted_patterns[:10] | |
| ] | |
| def _get_problem_patterns(self): | |
| """Identify patterns with low confidence or high errors""" | |
| problem_patterns = [] | |
| for pattern, data in self.query_patterns.items(): | |
| if data['count'] < 5: | |
| continue # Not enough data | |
| issues = [] | |
| if data['avg_confidence'] < 0.6: | |
| issues.append(f"Low confidence ({data['avg_confidence']:.2f})") | |
| if data['error_count'] > data['count'] * 0.1: # More than 10% errors | |
| issues.append(f"High error rate ({data['error_count']}/{data['count']})") | |
| if data['avg_time'] > 2.0: # Very slow | |
| issues.append(f"Slow execution ({data['avg_time']:.2f}s)") | |
| if issues: | |
| problem_patterns.append({ | |
| 'pattern': pattern, | |
| 'count': data['count'], | |
| 'issues': issues, | |
| 'avg_confidence': data['avg_confidence'], | |
| 'avg_time': data['avg_time'] | |
| }) | |
| return sorted(problem_patterns, key=lambda x: len(x['issues']), reverse=True) | |
| def _get_time_insights(self): | |
| """Analyze time-based usage patterns""" | |
| if not self.usage_history: | |
| return {'message': 'No usage data available'} | |
| # Analyze peak hours | |
| hour_counts = defaultdict(int) | |
| day_counts = defaultdict(int) | |
| for entry in self.usage_history: | |
| dt = datetime.fromtimestamp(entry['timestamp']) | |
| hour_counts[dt.hour] += 1 | |
| day_counts[dt.weekday()] += 1 | |
| peak_hour = max(hour_counts.items(), key=lambda x: x[1])[0] if hour_counts else None | |
| peak_day = max(day_counts.items(), key=lambda x: x[1])[0] if day_counts else None | |
| return { | |
| 'peak_hours': [hour for hour, count in sorted(hour_counts.items(), key=lambda x: x[1], reverse=True)[:3]], | |
| 'peak_day': peak_day, | |
| 'hour_distribution': dict(hour_counts), | |
| 'day_distribution': dict(day_counts), | |
| 'total_queries': len(self.usage_history) | |
| } | |
| def _analyze_template_effectiveness(self): | |
| """Analyze template effectiveness""" | |
| template_stats = defaultdict(lambda: { | |
| 'usage_count': 0, | |
| 'avg_confidence': 0, | |
| 'avg_time': 0, | |
| 'user_satisfaction': [] | |
| }) | |
| for entry in self.usage_history: | |
| template = entry['result']['template_used'] | |
| if template: | |
| stats = template_stats[template] | |
| stats['usage_count'] += 1 | |
| # Update running averages | |
| n = stats['usage_count'] | |
| stats['avg_confidence'] = ( | |
| (stats['avg_confidence'] * (n - 1) + entry['result']['confidence']) / n | |
| ) | |
| stats['avg_time'] = ( | |
| (stats['avg_time'] * (n - 1) + entry['result']['execution_time']) / n | |
| ) | |
| if entry['user_feedback'] is not None: | |
| stats['user_satisfaction'].append(entry['user_feedback']) | |
| # Calculate effectiveness scores | |
| effectiveness = [] | |
| for template, stats in template_stats.items(): | |
| if stats['usage_count'] < 3: | |
| continue | |
| # Simple effectiveness score | |
| confidence_score = stats['avg_confidence'] | |
| speed_score = 1.0 / max(stats['avg_time'], 0.1) | |
| satisfaction_score = ( | |
| sum(stats['user_satisfaction']) / len(stats['user_satisfaction']) | |
| if stats['user_satisfaction'] else 0.5 | |
| ) | |
| effectiveness_score = (confidence_score * 0.4 + speed_score * 0.3 + satisfaction_score * 0.3) | |
| effectiveness.append({ | |
| 'template': template, | |
| 'usage_count': stats['usage_count'], | |
| 'avg_confidence': stats['avg_confidence'], | |
| 'avg_time': stats['avg_time'], | |
| 'avg_satisfaction': satisfaction_score, | |
| 'effectiveness_score': effectiveness_score | |
| }) | |
| return sorted(effectiveness, key=lambda x: x['effectiveness_score'], reverse=True) | |
| def _analyze_sessions(self): | |
| """Analyze user session patterns""" | |
| session_analysis = [] | |
| for session_id, entries in self.session_data.items(): | |
| if len(entries) < 2: | |
| continue | |
| # Sort entries by timestamp | |
| entries.sort(key=lambda x: x['timestamp']) | |
| session_duration = entries[-1]['timestamp'] - entries[0]['timestamp'] | |
| avg_confidence = sum(e['result']['confidence'] for e in entries) / len(entries) | |
| avg_time = sum(e['result']['execution_time'] for e in entries) / len(entries) | |
| session_analysis.append({ | |
| 'session_id': session_id, | |
| 'query_count': len(entries), | |
| 'duration': session_duration, | |
| 'avg_confidence': avg_confidence, | |
| 'avg_time': avg_time, | |
| 'patterns_used': list(set(e['pattern'] for e in entries)) | |
| }) | |
| return session_analysis | |
| def _analyze_performance_trends(self): | |
| """Analyze performance trends over time""" | |
| if len(self.usage_history) < 10: | |
| return {'message': 'Insufficient data for trend analysis'} | |
| # Group by time periods (e.g., hourly) | |
| hourly_stats = defaultdict(lambda: { | |
| 'count': 0, | |
| 'total_confidence': 0, | |
| 'total_time': 0 | |
| }) | |
| for entry in self.usage_history: | |
| dt = datetime.fromtimestamp(entry['timestamp']) | |
| hour_key = dt.replace(minute=0, second=0, microsecond=0) | |
| stats = hourly_stats[hour_key] | |
| stats['count'] += 1 | |
| stats['total_confidence'] += entry['result']['confidence'] | |
| stats['total_time'] += entry['result']['execution_time'] | |
| # Calculate trends | |
| sorted_hours = sorted(hourly_stats.items()) | |
| if len(sorted_hours) < 2: | |
| return {'message': 'Insufficient time data for trend analysis'} | |
| # Simple trend calculation | |
| first_hour = sorted_hours[0] | |
| last_hour = sorted_hours[-1] | |
| first_avg_confidence = first_hour[1]['total_confidence'] / first_hour[1]['count'] | |
| last_avg_confidence = last_hour[1]['total_confidence'] / last_hour[1]['count'] | |
| first_avg_time = first_hour[1]['total_time'] / first_hour[1]['count'] | |
| last_avg_time = last_hour[1]['total_time'] / last_hour[1]['count'] | |
| confidence_trend = (last_avg_confidence - first_avg_confidence) / first_avg_confidence * 100 | |
| time_trend = (last_avg_time - first_avg_time) / first_avg_time * 100 | |
| return { | |
| 'confidence_trend': confidence_trend, | |
| 'time_trend': time_trend, | |
| 'total_periods': len(sorted_hours), | |
| 'trend_direction': 'improving' if confidence_trend > 0 else 'declining' | |
| } | |
| def _generate_recommendations(self, insights): | |
| """Generate optimization recommendations""" | |
| recommendations = [] | |
| # Check for popular patterns that could benefit from specialized templates | |
| for pattern in insights['popular_patterns']: | |
| if pattern['count'] > 100: | |
| recommendations.append( | |
| f"Create specialized template for '{pattern['pattern']}' " | |
| f"queries (used {pattern['count']} times)" | |
| ) | |
| # Check for problem patterns | |
| for pattern in insights['problem_patterns']: | |
| recommendations.append( | |
| f"Review and improve handling of '{pattern['pattern']}' " | |
| f"queries: {', '.join(pattern['issues'])}" | |
| ) | |
| # Time-based recommendations | |
| time_insights = insights['time_based_insights'] | |
| if 'peak_hours' in time_insights and time_insights['peak_hours']: | |
| peak_hours = time_insights['peak_hours'] | |
| recommendations.append( | |
| f"Consider pre-warming cache before peak hours: {peak_hours}" | |
| ) | |
| # Template effectiveness recommendations | |
| template_effectiveness = insights['template_effectiveness'] | |
| if template_effectiveness: | |
| worst_template = template_effectiveness[-1] | |
| if worst_template['effectiveness_score'] < 0.5: | |
| recommendations.append( | |
| f"Review and optimize template '{worst_template['template']}' " | |
| f"(effectiveness: {worst_template['effectiveness_score']:.2f})" | |
| ) | |
| # Performance trend recommendations | |
| performance_trends = insights['performance_trends'] | |
| if 'trend_direction' in performance_trends: | |
| if performance_trends['trend_direction'] == 'declining': | |
| recommendations.append( | |
| "Performance is declining over time. Consider system optimization." | |
| ) | |
| return recommendations | |
| def save_analytics_report(self, filename: Optional[str] = None): | |
| """Save analytics report to file""" | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"analytics/usage_analysis_{timestamp}.json" | |
| insights = self.get_optimization_insights() | |
| report = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'insights': insights, | |
| 'summary': { | |
| 'total_queries': len(self.usage_history), | |
| 'unique_patterns': len(self.query_patterns), | |
| 'total_sessions': len(self.session_data), | |
| 'data_period': { | |
| 'start': min(e['timestamp'] for e in self.usage_history) if self.usage_history else None, | |
| 'end': max(e['timestamp'] for e in self.usage_history) if self.usage_history else None | |
| } | |
| } | |
| } | |
| with open(filename, 'w') as f: | |
| json.dump(report, f, indent=2, default=str) | |
| logger.info(f"Analytics report saved to {filename}") | |
| return filename | |
| def get_realtime_metrics(self) -> Dict[str, Any]: | |
| """Get real-time usage metrics""" | |
| if not self.usage_history: | |
| return {'message': 'No usage data available'} | |
| # Last hour metrics | |
| one_hour_ago = time.time() - 3600 | |
| recent_queries = [ | |
| entry for entry in self.usage_history | |
| if entry['timestamp'] > one_hour_ago | |
| ] | |
| if not recent_queries: | |
| return {'message': 'No recent usage data'} | |
| recent_confidence = sum(e['result']['confidence'] for e in recent_queries) / len(recent_queries) | |
| recent_time = sum(e['result']['execution_time'] for e in recent_queries) / len(recent_queries) | |
| return { | |
| 'queries_last_hour': len(recent_queries), | |
| 'avg_confidence_last_hour': recent_confidence, | |
| 'avg_time_last_hour': recent_time, | |
| 'most_common_pattern': max( | |
| (e['pattern'] for e in recent_queries), | |
| key=lambda p: sum(1 for e in recent_queries if e['pattern'] == p) | |
| ) if recent_queries else None | |
| } | |
| # Example usage | |
| def run_usage_analysis(): | |
| """Example of running usage analysis""" | |
| print("📊 Running Usage Pattern Analysis...") | |
| # Create analyzer | |
| analyzer = UsagePatternAnalyzer() | |
| # Simulate some usage data | |
| sample_queries = [ | |
| ("What is machine learning?", 0.8, 1.2), | |
| ("Compare Python and Java", 0.7, 1.5), | |
| ("How does neural networks work?", 0.9, 2.1), | |
| ("What is 2+2?", 0.95, 0.3), | |
| ("Analyze the impact of AI", 0.6, 3.2), | |
| ("Define blockchain", 0.8, 1.1), | |
| ("Explain recursion", 0.7, 1.8), | |
| ("What causes climate change?", 0.8, 2.5) | |
| ] | |
| # Add sample data | |
| for i, (query, confidence, exec_time) in enumerate(sample_queries): | |
| # Create mock result | |
| mock_result = type('MockResult', (), { | |
| 'total_confidence': confidence, | |
| 'execution_time': exec_time, | |
| 'steps': [type('MockStep', (), {'thought': 'Sample step'})() for _ in range(3)], | |
| 'template_used': 'default', | |
| 'complexity_score': 0.5 + (i * 0.1) | |
| })() | |
| analyzer.analyze_query( | |
| query=query, | |
| result=mock_result, | |
| timestamp=time.time() - (i * 3600), # Spread over hours | |
| user_id=f"user_{i % 3}", | |
| session_id=f"session_{i // 2}", | |
| user_feedback=confidence + 0.1 # Slightly higher than confidence | |
| ) | |
| # Get insights | |
| insights = analyzer.get_optimization_insights() | |
| print("\n📈 Usage Analysis Results:") | |
| print(f"Total queries analyzed: {len(analyzer.usage_history)}") | |
| print(f"Unique patterns found: {len(analyzer.query_patterns)}") | |
| print("\n🔥 Popular Patterns:") | |
| for pattern in insights['popular_patterns'][:3]: | |
| print(f" {pattern['pattern']}: {pattern['count']} uses") | |
| print("\n⚠️ Problem Patterns:") | |
| for pattern in insights['problem_patterns'][:3]: | |
| print(f" {pattern['pattern']}: {', '.join(pattern['issues'])}") | |
| print("\n💡 Recommendations:") | |
| for rec in insights['recommendations'][:5]: | |
| print(f" - {rec}") | |
| # Save report | |
| report_file = analyzer.save_analytics_report() | |
| print(f"\n📄 Report saved to: {report_file}") | |
| return insights | |
| if __name__ == "__main__": | |
| # Run usage analysis | |
| run_usage_analysis() |