File size: 20,178 Bytes
8a682b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
"""
Usage Pattern Analyzer for Chain of Thought System
Analyze usage patterns to improve system performance
"""

import time
import json
import os
from typing import Dict, List, Any, Optional
from collections import defaultdict, deque
from datetime import datetime, timedelta
import logging

# Import the CoT system
from src.core.optimized_chain_of_thought import ReasoningPath

logger = logging.getLogger(__name__)


class UsagePatternAnalyzer:
    """Analyze usage patterns to improve system performance"""
    
    def __init__(self, max_history: int = 10000):
        self.query_patterns = defaultdict(lambda: {
            'count': 0,
            'avg_complexity': 0,
            'avg_confidence': 0,
            'avg_time': 0,
            'common_templates': defaultdict(int),
            'time_distribution': defaultdict(int),  # hour of day
            'day_distribution': defaultdict(int),   # day of week
            'user_satisfaction': [],
            'error_count': 0,
            'last_used': None
        })
        self.pattern_clusters = []
        self.usage_history = deque(maxlen=max_history)
        self.session_data = defaultdict(list)
        
        # Create analytics directory if it doesn't exist
        os.makedirs('analytics', exist_ok=True)
    
    def analyze_query(self, query: str, result: ReasoningPath, 
                     timestamp: Optional[float] = None,
                     user_id: Optional[str] = None,
                     session_id: Optional[str] = None,
                     user_feedback: Optional[float] = None):
        """Analyze a query and its result"""
        if timestamp is None:
            timestamp = time.time()
        
        # Extract query pattern
        pattern = self._extract_pattern(query)
        
        # Update pattern statistics
        data = self.query_patterns[pattern]
        data['count'] += 1
        
        # Update running averages
        n = data['count']
        data['avg_complexity'] = (
            (data['avg_complexity'] * (n - 1) + result.complexity_score) / n
        )
        data['avg_confidence'] = (
            (data['avg_confidence'] * (n - 1) + result.total_confidence) / n
        )
        data['avg_time'] = (
            (data['avg_time'] * (n - 1) + result.execution_time) / n
        )
        
        # Track template usage
        if result.template_used:
            data['common_templates'][result.template_used] += 1
        
        # Track time distribution
        dt = datetime.fromtimestamp(timestamp)
        data['time_distribution'][dt.hour] += 1
        data['day_distribution'][dt.weekday()] += 1
        data['last_used'] = timestamp
        
        # Track user satisfaction
        if user_feedback is not None:
            data['user_satisfaction'].append(user_feedback)
        
        # Track errors
        if result.total_confidence < 0.3:
            data['error_count'] += 1
        
        # Store in usage history
        usage_entry = {
            'timestamp': timestamp,
            'query': query,
            'pattern': pattern,
            'result': {
                'confidence': result.total_confidence,
                'execution_time': result.execution_time,
                'steps_count': len(result.steps),
                'template_used': result.template_used,
                'complexity_score': result.complexity_score
            },
            'user_id': user_id,
            'session_id': session_id,
            'user_feedback': user_feedback
        }
        self.usage_history.append(usage_entry)
        
        # Track session data
        if session_id:
            self.session_data[session_id].append(usage_entry)
    
    def _extract_pattern(self, query: str) -> str:
        """Extract pattern from query"""
        # Simple pattern extraction - can be enhanced with NLP
        patterns = {
            'definition': ['what is', 'define', 'meaning of', 'explain'],
            'comparison': ['compare', 'difference', 'versus', 'vs', 'contrast'],
            'explanation': ['how does', 'why', 'explain', 'describe'],
            'analysis': ['analyze', 'evaluate', 'assess', 'examine'],
            'calculation': ['calculate', 'compute', 'solve', 'find'],
            'prediction': ['predict', 'forecast', 'estimate', 'project'],
            'recommendation': ['recommend', 'suggest', 'advise', 'propose'],
            'troubleshooting': ['fix', 'debug', 'error', 'problem', 'issue']
        }
        
        query_lower = query.lower()
        
        for pattern_type, indicators in patterns.items():
            if any(ind in query_lower for ind in indicators):
                return pattern_type
        
        return 'general'
    
    def get_optimization_insights(self) -> Dict[str, Any]:
        """Generate insights for system optimization"""
        insights = {
            'popular_patterns': self._get_popular_patterns(),
            'problem_patterns': self._get_problem_patterns(),
            'time_based_insights': self._get_time_insights(),
            'template_effectiveness': self._analyze_template_effectiveness(),
            'session_analysis': self._analyze_sessions(),
            'performance_trends': self._analyze_performance_trends(),
            'recommendations': []
        }
        
        # Generate recommendations
        insights['recommendations'] = self._generate_recommendations(insights)
        
        return insights
    
    def _get_popular_patterns(self):
        """Identify most common query patterns"""
        sorted_patterns = sorted(
            self.query_patterns.items(),
            key=lambda x: x[1]['count'],
            reverse=True
        )
        
        return [
            {
                'pattern': pattern,
                'count': data['count'],
                'avg_complexity': data['avg_complexity'],
                'avg_confidence': data['avg_confidence'],
                'avg_time': data['avg_time']
            }
            for pattern, data in sorted_patterns[:10]
        ]
    
    def _get_problem_patterns(self):
        """Identify patterns with low confidence or high errors"""
        problem_patterns = []
        
        for pattern, data in self.query_patterns.items():
            if data['count'] < 5:
                continue  # Not enough data
            
            issues = []
            
            if data['avg_confidence'] < 0.6:
                issues.append(f"Low confidence ({data['avg_confidence']:.2f})")
            
            if data['error_count'] > data['count'] * 0.1:  # More than 10% errors
                issues.append(f"High error rate ({data['error_count']}/{data['count']})")
            
            if data['avg_time'] > 2.0:  # Very slow
                issues.append(f"Slow execution ({data['avg_time']:.2f}s)")
            
            if issues:
                problem_patterns.append({
                    'pattern': pattern,
                    'count': data['count'],
                    'issues': issues,
                    'avg_confidence': data['avg_confidence'],
                    'avg_time': data['avg_time']
                })
        
        return sorted(problem_patterns, key=lambda x: len(x['issues']), reverse=True)
    
    def _get_time_insights(self):
        """Analyze time-based usage patterns"""
        if not self.usage_history:
            return {'message': 'No usage data available'}
        
        # Analyze peak hours
        hour_counts = defaultdict(int)
        day_counts = defaultdict(int)
        
        for entry in self.usage_history:
            dt = datetime.fromtimestamp(entry['timestamp'])
            hour_counts[dt.hour] += 1
            day_counts[dt.weekday()] += 1
        
        peak_hour = max(hour_counts.items(), key=lambda x: x[1])[0] if hour_counts else None
        peak_day = max(day_counts.items(), key=lambda x: x[1])[0] if day_counts else None
        
        return {
            'peak_hours': [hour for hour, count in sorted(hour_counts.items(), key=lambda x: x[1], reverse=True)[:3]],
            'peak_day': peak_day,
            'hour_distribution': dict(hour_counts),
            'day_distribution': dict(day_counts),
            'total_queries': len(self.usage_history)
        }
    
    def _analyze_template_effectiveness(self):
        """Analyze template effectiveness"""
        template_stats = defaultdict(lambda: {
            'usage_count': 0,
            'avg_confidence': 0,
            'avg_time': 0,
            'user_satisfaction': []
        })
        
        for entry in self.usage_history:
            template = entry['result']['template_used']
            if template:
                stats = template_stats[template]
                stats['usage_count'] += 1
                
                # Update running averages
                n = stats['usage_count']
                stats['avg_confidence'] = (
                    (stats['avg_confidence'] * (n - 1) + entry['result']['confidence']) / n
                )
                stats['avg_time'] = (
                    (stats['avg_time'] * (n - 1) + entry['result']['execution_time']) / n
                )
                
                if entry['user_feedback'] is not None:
                    stats['user_satisfaction'].append(entry['user_feedback'])
        
        # Calculate effectiveness scores
        effectiveness = []
        for template, stats in template_stats.items():
            if stats['usage_count'] < 3:
                continue
            
            # Simple effectiveness score
            confidence_score = stats['avg_confidence']
            speed_score = 1.0 / max(stats['avg_time'], 0.1)
            satisfaction_score = (
                sum(stats['user_satisfaction']) / len(stats['user_satisfaction'])
                if stats['user_satisfaction'] else 0.5
            )
            
            effectiveness_score = (confidence_score * 0.4 + speed_score * 0.3 + satisfaction_score * 0.3)
            
            effectiveness.append({
                'template': template,
                'usage_count': stats['usage_count'],
                'avg_confidence': stats['avg_confidence'],
                'avg_time': stats['avg_time'],
                'avg_satisfaction': satisfaction_score,
                'effectiveness_score': effectiveness_score
            })
        
        return sorted(effectiveness, key=lambda x: x['effectiveness_score'], reverse=True)
    
    def _analyze_sessions(self):
        """Analyze user session patterns"""
        session_analysis = []
        
        for session_id, entries in self.session_data.items():
            if len(entries) < 2:
                continue
            
            # Sort entries by timestamp
            entries.sort(key=lambda x: x['timestamp'])
            
            session_duration = entries[-1]['timestamp'] - entries[0]['timestamp']
            avg_confidence = sum(e['result']['confidence'] for e in entries) / len(entries)
            avg_time = sum(e['result']['execution_time'] for e in entries) / len(entries)
            
            session_analysis.append({
                'session_id': session_id,
                'query_count': len(entries),
                'duration': session_duration,
                'avg_confidence': avg_confidence,
                'avg_time': avg_time,
                'patterns_used': list(set(e['pattern'] for e in entries))
            })
        
        return session_analysis
    
    def _analyze_performance_trends(self):
        """Analyze performance trends over time"""
        if len(self.usage_history) < 10:
            return {'message': 'Insufficient data for trend analysis'}
        
        # Group by time periods (e.g., hourly)
        hourly_stats = defaultdict(lambda: {
            'count': 0,
            'total_confidence': 0,
            'total_time': 0
        })
        
        for entry in self.usage_history:
            dt = datetime.fromtimestamp(entry['timestamp'])
            hour_key = dt.replace(minute=0, second=0, microsecond=0)
            
            stats = hourly_stats[hour_key]
            stats['count'] += 1
            stats['total_confidence'] += entry['result']['confidence']
            stats['total_time'] += entry['result']['execution_time']
        
        # Calculate trends
        sorted_hours = sorted(hourly_stats.items())
        
        if len(sorted_hours) < 2:
            return {'message': 'Insufficient time data for trend analysis'}
        
        # Simple trend calculation
        first_hour = sorted_hours[0]
        last_hour = sorted_hours[-1]
        
        first_avg_confidence = first_hour[1]['total_confidence'] / first_hour[1]['count']
        last_avg_confidence = last_hour[1]['total_confidence'] / last_hour[1]['count']
        
        first_avg_time = first_hour[1]['total_time'] / first_hour[1]['count']
        last_avg_time = last_hour[1]['total_time'] / last_hour[1]['count']
        
        confidence_trend = (last_avg_confidence - first_avg_confidence) / first_avg_confidence * 100
        time_trend = (last_avg_time - first_avg_time) / first_avg_time * 100
        
        return {
            'confidence_trend': confidence_trend,
            'time_trend': time_trend,
            'total_periods': len(sorted_hours),
            'trend_direction': 'improving' if confidence_trend > 0 else 'declining'
        }
    
    def _generate_recommendations(self, insights):
        """Generate optimization recommendations"""
        recommendations = []
        
        # Check for popular patterns that could benefit from specialized templates
        for pattern in insights['popular_patterns']:
            if pattern['count'] > 100:
                recommendations.append(
                    f"Create specialized template for '{pattern['pattern']}' "
                    f"queries (used {pattern['count']} times)"
                )
        
        # Check for problem patterns
        for pattern in insights['problem_patterns']:
            recommendations.append(
                f"Review and improve handling of '{pattern['pattern']}' "
                f"queries: {', '.join(pattern['issues'])}"
            )
        
        # Time-based recommendations
        time_insights = insights['time_based_insights']
        if 'peak_hours' in time_insights and time_insights['peak_hours']:
            peak_hours = time_insights['peak_hours']
            recommendations.append(
                f"Consider pre-warming cache before peak hours: {peak_hours}"
            )
        
        # Template effectiveness recommendations
        template_effectiveness = insights['template_effectiveness']
        if template_effectiveness:
            worst_template = template_effectiveness[-1]
            if worst_template['effectiveness_score'] < 0.5:
                recommendations.append(
                    f"Review and optimize template '{worst_template['template']}' "
                    f"(effectiveness: {worst_template['effectiveness_score']:.2f})"
                )
        
        # Performance trend recommendations
        performance_trends = insights['performance_trends']
        if 'trend_direction' in performance_trends:
            if performance_trends['trend_direction'] == 'declining':
                recommendations.append(
                    "Performance is declining over time. Consider system optimization."
                )
        
        return recommendations
    
    def save_analytics_report(self, filename: Optional[str] = None):
        """Save analytics report to file"""
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"analytics/usage_analysis_{timestamp}.json"
        
        insights = self.get_optimization_insights()
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'insights': insights,
            'summary': {
                'total_queries': len(self.usage_history),
                'unique_patterns': len(self.query_patterns),
                'total_sessions': len(self.session_data),
                'data_period': {
                    'start': min(e['timestamp'] for e in self.usage_history) if self.usage_history else None,
                    'end': max(e['timestamp'] for e in self.usage_history) if self.usage_history else None
                }
            }
        }
        
        with open(filename, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        logger.info(f"Analytics report saved to {filename}")
        return filename
    
    def get_realtime_metrics(self) -> Dict[str, Any]:
        """Get real-time usage metrics"""
        if not self.usage_history:
            return {'message': 'No usage data available'}
        
        # Last hour metrics
        one_hour_ago = time.time() - 3600
        recent_queries = [
            entry for entry in self.usage_history
            if entry['timestamp'] > one_hour_ago
        ]
        
        if not recent_queries:
            return {'message': 'No recent usage data'}
        
        recent_confidence = sum(e['result']['confidence'] for e in recent_queries) / len(recent_queries)
        recent_time = sum(e['result']['execution_time'] for e in recent_queries) / len(recent_queries)
        
        return {
            'queries_last_hour': len(recent_queries),
            'avg_confidence_last_hour': recent_confidence,
            'avg_time_last_hour': recent_time,
            'most_common_pattern': max(
                (e['pattern'] for e in recent_queries),
                key=lambda p: sum(1 for e in recent_queries if e['pattern'] == p)
            ) if recent_queries else None
        }


# Example usage
def run_usage_analysis():
    """Example of running usage analysis"""
    print("📊 Running Usage Pattern Analysis...")
    
    # Create analyzer
    analyzer = UsagePatternAnalyzer()
    
    # Simulate some usage data
    sample_queries = [
        ("What is machine learning?", 0.8, 1.2),
        ("Compare Python and Java", 0.7, 1.5),
        ("How does neural networks work?", 0.9, 2.1),
        ("What is 2+2?", 0.95, 0.3),
        ("Analyze the impact of AI", 0.6, 3.2),
        ("Define blockchain", 0.8, 1.1),
        ("Explain recursion", 0.7, 1.8),
        ("What causes climate change?", 0.8, 2.5)
    ]
    
    # Add sample data
    for i, (query, confidence, exec_time) in enumerate(sample_queries):
        # Create mock result
        mock_result = type('MockResult', (), {
            'total_confidence': confidence,
            'execution_time': exec_time,
            'steps': [type('MockStep', (), {'thought': 'Sample step'})() for _ in range(3)],
            'template_used': 'default',
            'complexity_score': 0.5 + (i * 0.1)
        })()
        
        analyzer.analyze_query(
            query=query,
            result=mock_result,
            timestamp=time.time() - (i * 3600),  # Spread over hours
            user_id=f"user_{i % 3}",
            session_id=f"session_{i // 2}",
            user_feedback=confidence + 0.1  # Slightly higher than confidence
        )
    
    # Get insights
    insights = analyzer.get_optimization_insights()
    
    print("\n📈 Usage Analysis Results:")
    print(f"Total queries analyzed: {len(analyzer.usage_history)}")
    print(f"Unique patterns found: {len(analyzer.query_patterns)}")
    
    print("\n🔥 Popular Patterns:")
    for pattern in insights['popular_patterns'][:3]:
        print(f"  {pattern['pattern']}: {pattern['count']} uses")
    
    print("\n⚠️ Problem Patterns:")
    for pattern in insights['problem_patterns'][:3]:
        print(f"  {pattern['pattern']}: {', '.join(pattern['issues'])}")
    
    print("\n💡 Recommendations:")
    for rec in insights['recommendations'][:5]:
        print(f"  - {rec}")
    
    # Save report
    report_file = analyzer.save_analytics_report()
    print(f"\n📄 Report saved to: {report_file}")
    
    return insights


if __name__ == "__main__":
    # Run usage analysis
    run_usage_analysis()