""" Built-in Analytics Engine for AegisLM SaaS Backend. Production-ready usage analytics with pattern detection, trend analysis, and business intelligence. """ import asyncio import json from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Tuple from sqlalchemy import text, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from collections import defaultdict, deque import logging import statistics from .database import async_engine, get_redis from .performance_monitor import performance_monitor from .query_optimizer import query_optimizer from .config import settings logger = logging.getLogger(__name__) class UsagePattern: """Usage pattern definition.""" def __init__(self, pattern_type: str, description: str, frequency: float, impact_score: float, recommendations: List[str]): self.pattern_type = pattern_type self.description = description self.frequency = frequency self.impact_score = impact_score self.recommendations = recommendations class TrendAnalysis: """Trend analysis results.""" def __init__(self, metric: str, trend: str, confidence: float, change_rate: float, forecast: List[Dict[str, Any]]): self.metric = metric self.trend = trend # 'increasing', 'decreasing', 'stable' self.confidence = confidence self.change_rate = change_rate self.forecast = forecast class AnalyticsEngine: """Built-in analytics engine for usage patterns and business intelligence.""" def __init__(self): self.redis_client = None self.analytics_retention_days = getattr(settings, 'ANALYTICS_RETENTION_DAYS', 90) self.pattern_detection_window = getattr(settings, 'PATTERN_DETECTION_WINDOW', 7) # days # Analytics data storage self.usage_history = deque(maxlen=1000) # Last 1000 data points self.query_patterns = defaultdict(int) self.user_activity = defaultdict(list) async def get_redis(self): """Get Redis client.""" if not self.redis_client: self.redis_client = await get_redis() return self.redis_client async def collect_usage_metrics(self): """Collect current usage metrics.""" try: async with async_engine.begin() as conn: # Get user activity metrics result = await conn.execute(text(""" SELECT COUNT(*) as total_users, COUNT(*) FILTER (WHERE is_active = true) as active_users, COUNT(*) FILTER (WHERE is_verified = true) as verified_users, COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as new_users_24h, COUNT(*) FILTER (WHERE last_login_at >= NOW() - INTERVAL '24 hours') as active_users_24h FROM users """)) user_stats = result.fetchone() # Get evaluation metrics result = await conn.execute(text(""" SELECT COUNT(*) as total_evaluations, COUNT(*) FILTER (WHERE status = 'completed') as completed_evaluations, COUNT(*) FILTER (WHERE status = 'running') as running_evaluations, COUNT(*) FILTER (WHERE status = 'failed') as failed_evaluations, COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as evaluations_24h, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_execution_time_sec FROM evaluations WHERE created_at >= NOW() - INTERVAL '7 days' """)) eval_stats = result.fetchone() # Get API key usage result = await conn.execute(text(""" SELECT COUNT(*) as total_api_keys, COUNT(*) FILTER (WHERE is_active = true) as active_api_keys, SUM(usage_count) as total_usage, COUNT(*) FILTER (WHERE last_used_at >= NOW() - INTERVAL '24 hours') as used_24h FROM api_keys """)) api_stats = result.fetchone() # Get database performance metrics perf_summary = await performance_monitor.get_performance_summary() # Compile metrics metrics = { "timestamp": datetime.utcnow().isoformat(), "users": { "total": user_stats.total_users, "active": user_stats.active_users, "verified": user_stats.verified_users, "new_24h": user_stats.new_users_24h, "active_24h": user_stats.active_users_24h }, "evaluations": { "total": eval_stats.total_evaluations or 0, "completed": eval_stats.completed_evaluations or 0, "running": eval_stats.running_evaluations or 0, "failed": eval_stats.failed_evaluations or 0, "new_24h": eval_stats.evaluations_24h or 0, "avg_execution_time_sec": float(eval_stats.avg_execution_time_sec or 0) }, "api_keys": { "total": api_stats.total_api_keys, "active": api_stats.active_api_keys, "total_usage": api_stats.total_usage or 0, "used_24h": api_stats.used_24h }, "performance": { "query_performance": perf_summary.get("query_performance", {}), "connection_pool": perf_summary.get("connection_pool", {}), "database": perf_summary.get("database", {}) } } # Store in analytics history self.usage_history.append(metrics) # Store in Redis redis_client = await self.get_redis() await redis_client.lpush( "analytics_metrics", json.dumps(metrics) ) await redis_client.ltrim("analytics_metrics", 0, 999) # Keep last 1000 await redis_client.expire("analytics_metrics", self.analytics_retention_days * 24 * 3600) return metrics except Exception as e: logger.error(f"Failed to collect usage metrics: {e}") return None async def detect_usage_patterns(self) -> List[UsagePattern]: """Detect usage patterns from collected data.""" patterns = [] try: if len(self.usage_history) < 10: return patterns # Analyze user activity patterns user_pattern = await self._analyze_user_activity_pattern() if user_pattern: patterns.append(user_pattern) # Analyze evaluation patterns eval_pattern = await self._analyze_evaluation_pattern() if eval_pattern: patterns.append(eval_pattern) # Analyze API usage patterns api_pattern = await self._analyze_api_usage_pattern() if api_pattern: patterns.append(api_pattern) # Analyze performance patterns perf_pattern = await self._analyze_performance_pattern() if perf_pattern: patterns.append(perf_pattern) # Analyze query patterns query_pattern = await self._analyze_query_pattern() if query_pattern: patterns.append(query_pattern) return patterns except Exception as e: logger.error(f"Failed to detect usage patterns: {e}") return [] async def _analyze_user_activity_pattern(self) -> Optional[UsagePattern]: """Analyze user activity patterns.""" try: recent_metrics = list(self.usage_history)[-7:] # Last 7 data points # Calculate user growth rate user_counts = [m["users"]["total"] for m in recent_metrics] if len(user_counts) < 2: return None growth_rate = (user_counts[-1] - user_counts[0]) / user_counts[0] * 100 if user_counts[0] > 0 else 0 # Calculate active user ratio active_ratios = [m["users"]["active_24h"] / max(m["users"]["active"], 1) * 100 for m in recent_metrics] avg_active_ratio = statistics.mean(active_ratios) # Determine pattern if growth_rate > 10: pattern_type = "high_user_growth" description = f"High user growth detected: {growth_rate:.1f}% growth rate" frequency = 0.8 impact_score = 0.9 recommendations = [ "Ensure database capacity can handle growth", "Monitor user onboarding funnel", "Consider scaling customer support" ] elif avg_active_ratio < 20: pattern_type = "low_user_engagement" description = f"Low user engagement: {avg_active_ratio:.1f}% active users" frequency = 0.7 impact_score = 0.8 recommendations = [ "Investigate user onboarding process", "Implement user engagement campaigns", "Review product features for usability" ] else: return None return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) except Exception as e: logger.error(f"Failed to analyze user activity pattern: {e}") return None async def _analyze_evaluation_pattern(self) -> Optional[UsagePattern]: """Analyze evaluation usage patterns.""" try: recent_metrics = list(self.usage_history)[-7:] # Calculate evaluation completion rate completion_rates = [] for m in recent_metrics: total = m["evaluations"]["total"] completed = m["evaluations"]["completed"] if total > 0: completion_rates.append(completed / total * 100) if not completion_rates: return None avg_completion_rate = statistics.mean(completion_rates) # Calculate failure rate failure_rates = [] for m in recent_metrics: total = m["evaluations"]["total"] failed = m["evaluations"]["failed"] if total > 0: failure_rates.append(failed / total * 100) avg_failure_rate = statistics.mean(failure_rates) if failure_rates else 0 # Determine pattern if avg_failure_rate > 15: pattern_type = "high_failure_rate" description = f"High evaluation failure rate: {avg_failure_rate:.1f}%" frequency = 0.8 impact_score = 0.9 recommendations = [ "Investigate common failure causes", "Improve error handling and logging", "Review evaluation pipeline stability" ] elif avg_completion_rate < 70: pattern_type = "low_completion_rate" description = f"Low evaluation completion rate: {avg_completion_rate:.1f}%" frequency = 0.7 impact_score = 0.8 recommendations = [ "Analyze evaluation bottlenecks", "Optimize evaluation performance", "Review resource allocation" ] else: return None return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) except Exception as e: logger.error(f"Failed to analyze evaluation pattern: {e}") return None async def _analyze_api_usage_pattern(self) -> Optional[UsagePattern]: """Analyze API key usage patterns.""" try: recent_metrics = list(self.usage_history)[-7:] # Calculate API usage growth usage_counts = [m["api_keys"]["total_usage"] for m in recent_metrics] if len(usage_counts) < 2: return None usage_growth = (usage_counts[-1] - usage_counts[0]) / max(usage_counts[0], 1) * 100 # Calculate active API key ratio active_ratios = [m["api_keys"]["used_24h"] / max(m["api_keys"]["active"], 1) * 100 for m in recent_metrics] avg_active_ratio = statistics.mean(active_ratios) # Determine pattern if usage_growth > 50: pattern_type = "high_api_usage_growth" description = f"High API usage growth: {usage_growth:.1f}% increase" frequency = 0.7 impact_score = 0.8 recommendations = [ "Monitor API rate limits", "Consider API tier upgrades", "Review API usage documentation" ] elif avg_active_ratio < 10: pattern_type = "low_api_adoption" description = f"Low API adoption: {avg_active_ratio:.1f}% active keys" frequency = 0.6 impact_score = 0.7 recommendations = [ "Improve API documentation", "Create API usage tutorials", "Review API pricing and features" ] else: return None return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) except Exception as e: logger.error(f"Failed to analyze API usage pattern: {e}") return None async def _analyze_performance_pattern(self) -> Optional[UsagePattern]: """Analyze performance patterns.""" try: recent_metrics = list(self.usage_history)[-7:] # Analyze query performance avg_times = [m["performance"]["query_performance"].get("avg_execution_time", 0) for m in recent_metrics] slow_queries = [m["performance"]["query_performance"].get("slow_queries_count", 0) for m in recent_metrics] if not avg_times: return None avg_query_time = statistics.mean(avg_times) total_slow_queries = sum(slow_queries) # Determine pattern if avg_query_time > 0.5: # 500ms threshold pattern_type = "slow_query_performance" description = f"Slow query performance: {avg_query_time:.3f}s average" frequency = 0.8 impact_score = 0.9 recommendations = [ "Optimize slow queries", "Review database indexes", "Consider query result caching" ] elif total_slow_queries > 50: pattern_type = "high_slow_query_volume" description = f"High volume of slow queries: {total_slow_queries} in recent period" frequency = 0.7 impact_score = 0.8 recommendations = [ "Investigate query patterns", "Implement query optimization", "Review database configuration" ] else: return None return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) except Exception as e: logger.error(f"Failed to analyze performance pattern: {e}") return None async def _analyze_query_pattern(self) -> Optional[UsagePattern]: """Analyze query patterns.""" try: # Get slow query analysis slow_analysis = await query_optimizer.analyze_slow_queries(20) if not slow_analysis: return None # Analyze query types query_types = defaultdict(int) for analysis in slow_analysis: query_data = analysis["query_data"] query_type = query_data.get("query_type", "UNKNOWN") query_types[query_type] += 1 # Find most problematic query type if query_types: most_common_type = max(query_types, key=query_types.get) count = query_types[most_common_type] if count > 5: pattern_type = "problematic_query_type" description = f"High volume of slow {most_common_type} queries: {count} instances" frequency = 0.6 impact_score = 0.7 recommendations = [ f"Optimize {most_common_type} query patterns", "Review query execution plans", "Consider database schema optimization" ] return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) return None except Exception as e: logger.error(f"Failed to analyze query pattern: {e}") return None async def generate_trend_analysis(self, metric_path: str, days: int = 30) -> Optional[TrendAnalysis]: """Generate trend analysis for a specific metric.""" try: # Get historical data redis_client = await self.get_redis() raw_data = await redis_client.lrange("analytics_metrics", 0, -1) if not raw_data: return None # Parse metrics and extract specific metric metrics_data = [json.loads(data) for data in raw_data] # Extract metric values over time metric_values = [] timestamps = [] for metrics in metrics_data: try: # Navigate metric path (e.g., "users.total" or "evaluations.completed") path_parts = metric_path.split('.') value = metrics for part in path_parts: if isinstance(value, dict) and part in value: value = value[part] else: value = None break if value is not None: metric_values.append(float(value)) timestamps.append(datetime.fromisoformat(metrics["timestamp"])) except (ValueError, KeyError, TypeError): continue if len(metric_values) < 3: return None # Calculate trend if len(metric_values) >= 2: # Simple linear regression for trend detection x = list(range(len(metric_values))) y = metric_values n = len(x) sum_x = sum(x) sum_y = sum(y) sum_xy = sum(x[i] * y[i] for i in range(n)) sum_x2 = sum(x[i] ** 2 for i in range(n)) # Calculate slope slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2) # Determine trend if abs(slope) < 0.01: trend = "stable" elif slope > 0: trend = "increasing" else: trend = "decreasing" # Calculate confidence (simplified) confidence = min(abs(slope) * 10, 1.0) # Calculate change rate change_rate = (metric_values[-1] - metric_values[0]) / metric_values[0] * 100 if metric_values[0] != 0 else 0 # Generate simple forecast (next 5 points) forecast = [] last_value = metric_values[-1] for i in range(1, 6): forecast_value = last_value + (slope * i) forecast.append({ "point": i, "value": forecast_value, "timestamp": (timestamps[-1] + timedelta(days=i)).isoformat() }) return TrendAnalysis(metric_path, trend, confidence, change_rate, forecast) return None except Exception as e: logger.error(f"Failed to generate trend analysis: {e}") return None async def get_business_intelligence_report(self) -> Dict[str, Any]: """Generate comprehensive business intelligence report.""" try: # Collect current metrics current_metrics = await self.collect_usage_metrics() # Detect patterns patterns = await self.detect_usage_patterns() # Generate trend analyses trend_analyses = [] key_metrics = [ "users.total", "users.active_24h", "evaluations.total", "evaluations.completed", "api_keys.total_usage" ] for metric in key_metrics: trend = await self.generate_trend_analysis(metric) if trend: trend_analyses.append(trend) # Calculate business KPIs kpis = self._calculate_business_kpis(current_metrics) # Generate recommendations recommendations = self._generate_business_recommendations(patterns, trend_analyses, kpis) report = { "timestamp": datetime.utcnow().isoformat(), "current_metrics": current_metrics, "detected_patterns": [p.__dict__ for p in patterns], "trend_analyses": [t.__dict__ for t in trend_analyses], "business_kpis": kpis, "recommendations": recommendations, "data_quality": { "metrics_collected": len(self.usage_history), "data_points_analyzed": len(list(self.usage_history)), "analysis_confidence": "high" if len(self.usage_history) > 50 else "medium" } } return report except Exception as e: logger.error(f"Failed to generate business intelligence report: {e}") return {"error": str(e)} def _calculate_business_kpis(self, current_metrics: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Calculate business KPIs.""" if not current_metrics: return {} try: users = current_metrics["users"] evals = current_metrics["evaluations"] api_keys = current_metrics["api_keys"] kpis = { "user_metrics": { "user_growth_rate": 0, # Would need historical data "user_engagement_rate": users["active_24h"] / max(users["active"], 1) * 100, "user_verification_rate": users["verified"] / max(users["total"], 1) * 100, "new_user_acquisition_rate": users["new_24h"] }, "evaluation_metrics": { "evaluation_completion_rate": evals["completed"] / max(evals["total"], 1) * 100, "evaluation_failure_rate": evals["failed"] / max(evals["total"], 1) * 100, "avg_execution_time_sec": evals["avg_execution_time_sec"], "evaluation_velocity": evals["new_24h"] }, "api_metrics": { "api_adoption_rate": api_keys["used_24h"] / max(api_keys["active"], 1) * 100, "api_usage_per_active_key": api_keys["total_usage"] / max(api_keys["active"], 1), "api_key_utilization": api_keys["active"] / max(api_keys["total"], 1) * 100 }, "overall_health": { "system_health_score": self._calculate_health_score(current_metrics), "growth_indicator": "positive" if users["new_24h"] > 0 else "stable", "performance_indicator": "good" if evals["avg_execution_time_sec"] < 30 else "needs_attention" } } return kpis except Exception as e: logger.error(f"Failed to calculate business KPIs: {e}") return {} def _calculate_health_score(self, metrics: Dict[str, Any]) -> float: """Calculate overall system health score.""" try: scores = [] # User engagement score (0-100) user_engagement = metrics["users"]["active_24h"] / max(metrics["users"]["active"], 1) * 100 scores.append(min(user_engagement, 100)) # Evaluation success score (0-100) eval_success = metrics["evaluations"]["completed"] / max(metrics["evaluations"]["total"], 1) * 100 scores.append(min(eval_success, 100)) # API utilization score (0-100) api_util = metrics["api_keys"]["used_24h"] / max(metrics["api_keys"]["active"], 1) * 100 scores.append(min(api_util, 100)) # Performance score (0-100) avg_time = metrics["evaluations"]["avg_execution_time_sec"] perf_score = max(0, 100 - (avg_time * 2)) # Deduct 2 points per second scores.append(min(perf_score, 100)) return statistics.mean(scores) if scores else 0.0 except Exception: return 0.0 def _generate_business_recommendations(self, patterns: List[UsagePattern], trends: List[TrendAnalysis], kpis: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate business recommendations.""" recommendations = [] # Pattern-based recommendations for pattern in patterns: if pattern.impact_score > 0.7: recommendations.append({ "type": "pattern_based", "priority": "high" if pattern.impact_score > 0.8 else "medium", "title": pattern.description, "actions": pattern.recommendations, "impact_score": pattern.impact_score }) # Trend-based recommendations for trend in trends: if trend.confidence > 0.7 and trend.trend == "decreasing": recommendations.append({ "type": "trend_based", "priority": "high", "title": f"Declining trend in {trend.metric}", "actions": [ f"Investigate causes of declining {trend.metric}", "Implement corrective measures", "Monitor trend closely" ], "confidence": trend.confidence }) # KPI-based recommendations if kpis: user_engagement = kpis.get("user_metrics", {}).get("user_engagement_rate", 0) if user_engagement < 30: recommendations.append({ "type": "kpi_based", "priority": "medium", "title": "Low user engagement detected", "actions": [ "Review user onboarding process", "Implement engagement campaigns", "Analyze user behavior patterns" ], "current_value": user_engagement }) return recommendations[:10] # Top 10 recommendations # Global analytics engine instance analytics_engine = AnalyticsEngine() # Scheduled analytics task async def scheduled_analytics_task(): """Run scheduled analytics collection and analysis.""" logger.info("Starting scheduled analytics collection...") try: # Collect current metrics metrics = await analytics_engine.collect_usage_metrics() if metrics: # Generate business intelligence report report = await analytics_engine.get_business_intelligence_report() # Store report in Redis redis_client = await analytics_engine.get_redis() await redis_client.setex( "business_intelligence_report", 3600, # 1 hour json.dumps(report, default=str) ) logger.info("Scheduled analytics completed successfully") else: logger.warning("No metrics collected in scheduled analytics") except Exception as e: logger.error(f"Scheduled analytics failed: {e}") if __name__ == "__main__": import sys async def main(): command = sys.argv[1] if len(sys.argv) > 1 else "help" if command == "collect": metrics = await analytics_engine.collect_usage_metrics() if metrics: print(json.dumps(metrics, indent=2)) else: print("Failed to collect metrics") elif command == "patterns": patterns = await analytics_engine.detect_usage_patterns() print(f"Detected {len(patterns)} patterns:") for pattern in patterns: print(f" - {pattern.pattern_type}: {pattern.description}") print(f" Recommendations: {', '.join(pattern.recommendations[:2])}") elif command == "trends": trend = await analytics_engine.generate_trend_analysis("users.total") if trend: print(json.dumps(trend.__dict__, indent=2)) else: print("No trend data available") elif command == "report": report = await analytics_engine.get_business_intelligence_report() print(json.dumps(report, indent=2, default=str)) else: print("Usage: python analytics_engine.py ") print("Commands: collect, patterns, trends, report") asyncio.run(main())