ALM-2 / backend /core /analytics_engine.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Built-in Analytics Engine for AegisLM SaaS Backend.
Production-ready usage analytics with pattern detection,
trend analysis, and business intelligence.
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy import text, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from collections import defaultdict, deque
import logging
import statistics
from .database import async_engine, get_redis
from .performance_monitor import performance_monitor
from .query_optimizer import query_optimizer
from .config import settings
logger = logging.getLogger(__name__)
class UsagePattern:
"""Usage pattern definition."""
def __init__(self, pattern_type: str, description: str, frequency: float,
impact_score: float, recommendations: List[str]):
self.pattern_type = pattern_type
self.description = description
self.frequency = frequency
self.impact_score = impact_score
self.recommendations = recommendations
class TrendAnalysis:
"""Trend analysis results."""
def __init__(self, metric: str, trend: str, confidence: float,
change_rate: float, forecast: List[Dict[str, Any]]):
self.metric = metric
self.trend = trend # 'increasing', 'decreasing', 'stable'
self.confidence = confidence
self.change_rate = change_rate
self.forecast = forecast
class AnalyticsEngine:
"""Built-in analytics engine for usage patterns and business intelligence."""
def __init__(self):
self.redis_client = None
self.analytics_retention_days = getattr(settings, 'ANALYTICS_RETENTION_DAYS', 90)
self.pattern_detection_window = getattr(settings, 'PATTERN_DETECTION_WINDOW', 7) # days
# Analytics data storage
self.usage_history = deque(maxlen=1000) # Last 1000 data points
self.query_patterns = defaultdict(int)
self.user_activity = defaultdict(list)
async def get_redis(self):
"""Get Redis client."""
if not self.redis_client:
self.redis_client = await get_redis()
return self.redis_client
async def collect_usage_metrics(self):
"""Collect current usage metrics."""
try:
async with async_engine.begin() as conn:
# Get user activity metrics
result = await conn.execute(text("""
SELECT
COUNT(*) as total_users,
COUNT(*) FILTER (WHERE is_active = true) as active_users,
COUNT(*) FILTER (WHERE is_verified = true) as verified_users,
COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as new_users_24h,
COUNT(*) FILTER (WHERE last_login_at >= NOW() - INTERVAL '24 hours') as active_users_24h
FROM users
"""))
user_stats = result.fetchone()
# Get evaluation metrics
result = await conn.execute(text("""
SELECT
COUNT(*) as total_evaluations,
COUNT(*) FILTER (WHERE status = 'completed') as completed_evaluations,
COUNT(*) FILTER (WHERE status = 'running') as running_evaluations,
COUNT(*) FILTER (WHERE status = 'failed') as failed_evaluations,
COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as evaluations_24h,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_execution_time_sec
FROM evaluations
WHERE created_at >= NOW() - INTERVAL '7 days'
"""))
eval_stats = result.fetchone()
# Get API key usage
result = await conn.execute(text("""
SELECT
COUNT(*) as total_api_keys,
COUNT(*) FILTER (WHERE is_active = true) as active_api_keys,
SUM(usage_count) as total_usage,
COUNT(*) FILTER (WHERE last_used_at >= NOW() - INTERVAL '24 hours') as used_24h
FROM api_keys
"""))
api_stats = result.fetchone()
# Get database performance metrics
perf_summary = await performance_monitor.get_performance_summary()
# Compile metrics
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"users": {
"total": user_stats.total_users,
"active": user_stats.active_users,
"verified": user_stats.verified_users,
"new_24h": user_stats.new_users_24h,
"active_24h": user_stats.active_users_24h
},
"evaluations": {
"total": eval_stats.total_evaluations or 0,
"completed": eval_stats.completed_evaluations or 0,
"running": eval_stats.running_evaluations or 0,
"failed": eval_stats.failed_evaluations or 0,
"new_24h": eval_stats.evaluations_24h or 0,
"avg_execution_time_sec": float(eval_stats.avg_execution_time_sec or 0)
},
"api_keys": {
"total": api_stats.total_api_keys,
"active": api_stats.active_api_keys,
"total_usage": api_stats.total_usage or 0,
"used_24h": api_stats.used_24h
},
"performance": {
"query_performance": perf_summary.get("query_performance", {}),
"connection_pool": perf_summary.get("connection_pool", {}),
"database": perf_summary.get("database", {})
}
}
# Store in analytics history
self.usage_history.append(metrics)
# Store in Redis
redis_client = await self.get_redis()
await redis_client.lpush(
"analytics_metrics",
json.dumps(metrics)
)
await redis_client.ltrim("analytics_metrics", 0, 999) # Keep last 1000
await redis_client.expire("analytics_metrics", self.analytics_retention_days * 24 * 3600)
return metrics
except Exception as e:
logger.error(f"Failed to collect usage metrics: {e}")
return None
async def detect_usage_patterns(self) -> List[UsagePattern]:
"""Detect usage patterns from collected data."""
patterns = []
try:
if len(self.usage_history) < 10:
return patterns
# Analyze user activity patterns
user_pattern = await self._analyze_user_activity_pattern()
if user_pattern:
patterns.append(user_pattern)
# Analyze evaluation patterns
eval_pattern = await self._analyze_evaluation_pattern()
if eval_pattern:
patterns.append(eval_pattern)
# Analyze API usage patterns
api_pattern = await self._analyze_api_usage_pattern()
if api_pattern:
patterns.append(api_pattern)
# Analyze performance patterns
perf_pattern = await self._analyze_performance_pattern()
if perf_pattern:
patterns.append(perf_pattern)
# Analyze query patterns
query_pattern = await self._analyze_query_pattern()
if query_pattern:
patterns.append(query_pattern)
return patterns
except Exception as e:
logger.error(f"Failed to detect usage patterns: {e}")
return []
async def _analyze_user_activity_pattern(self) -> Optional[UsagePattern]:
"""Analyze user activity patterns."""
try:
recent_metrics = list(self.usage_history)[-7:] # Last 7 data points
# Calculate user growth rate
user_counts = [m["users"]["total"] for m in recent_metrics]
if len(user_counts) < 2:
return None
growth_rate = (user_counts[-1] - user_counts[0]) / user_counts[0] * 100 if user_counts[0] > 0 else 0
# Calculate active user ratio
active_ratios = [m["users"]["active_24h"] / max(m["users"]["active"], 1) * 100 for m in recent_metrics]
avg_active_ratio = statistics.mean(active_ratios)
# Determine pattern
if growth_rate > 10:
pattern_type = "high_user_growth"
description = f"High user growth detected: {growth_rate:.1f}% growth rate"
frequency = 0.8
impact_score = 0.9
recommendations = [
"Ensure database capacity can handle growth",
"Monitor user onboarding funnel",
"Consider scaling customer support"
]
elif avg_active_ratio < 20:
pattern_type = "low_user_engagement"
description = f"Low user engagement: {avg_active_ratio:.1f}% active users"
frequency = 0.7
impact_score = 0.8
recommendations = [
"Investigate user onboarding process",
"Implement user engagement campaigns",
"Review product features for usability"
]
else:
return None
return UsagePattern(pattern_type, description, frequency, impact_score, recommendations)
except Exception as e:
logger.error(f"Failed to analyze user activity pattern: {e}")
return None
async def _analyze_evaluation_pattern(self) -> Optional[UsagePattern]:
"""Analyze evaluation usage patterns."""
try:
recent_metrics = list(self.usage_history)[-7:]
# Calculate evaluation completion rate
completion_rates = []
for m in recent_metrics:
total = m["evaluations"]["total"]
completed = m["evaluations"]["completed"]
if total > 0:
completion_rates.append(completed / total * 100)
if not completion_rates:
return None
avg_completion_rate = statistics.mean(completion_rates)
# Calculate failure rate
failure_rates = []
for m in recent_metrics:
total = m["evaluations"]["total"]
failed = m["evaluations"]["failed"]
if total > 0:
failure_rates.append(failed / total * 100)
avg_failure_rate = statistics.mean(failure_rates) if failure_rates else 0
# Determine pattern
if avg_failure_rate > 15:
pattern_type = "high_failure_rate"
description = f"High evaluation failure rate: {avg_failure_rate:.1f}%"
frequency = 0.8
impact_score = 0.9
recommendations = [
"Investigate common failure causes",
"Improve error handling and logging",
"Review evaluation pipeline stability"
]
elif avg_completion_rate < 70:
pattern_type = "low_completion_rate"
description = f"Low evaluation completion rate: {avg_completion_rate:.1f}%"
frequency = 0.7
impact_score = 0.8
recommendations = [
"Analyze evaluation bottlenecks",
"Optimize evaluation performance",
"Review resource allocation"
]
else:
return None
return UsagePattern(pattern_type, description, frequency, impact_score, recommendations)
except Exception as e:
logger.error(f"Failed to analyze evaluation pattern: {e}")
return None
async def _analyze_api_usage_pattern(self) -> Optional[UsagePattern]:
"""Analyze API key usage patterns."""
try:
recent_metrics = list(self.usage_history)[-7:]
# Calculate API usage growth
usage_counts = [m["api_keys"]["total_usage"] for m in recent_metrics]
if len(usage_counts) < 2:
return None
usage_growth = (usage_counts[-1] - usage_counts[0]) / max(usage_counts[0], 1) * 100
# Calculate active API key ratio
active_ratios = [m["api_keys"]["used_24h"] / max(m["api_keys"]["active"], 1) * 100 for m in recent_metrics]
avg_active_ratio = statistics.mean(active_ratios)
# Determine pattern
if usage_growth > 50:
pattern_type = "high_api_usage_growth"
description = f"High API usage growth: {usage_growth:.1f}% increase"
frequency = 0.7
impact_score = 0.8
recommendations = [
"Monitor API rate limits",
"Consider API tier upgrades",
"Review API usage documentation"
]
elif avg_active_ratio < 10:
pattern_type = "low_api_adoption"
description = f"Low API adoption: {avg_active_ratio:.1f}% active keys"
frequency = 0.6
impact_score = 0.7
recommendations = [
"Improve API documentation",
"Create API usage tutorials",
"Review API pricing and features"
]
else:
return None
return UsagePattern(pattern_type, description, frequency, impact_score, recommendations)
except Exception as e:
logger.error(f"Failed to analyze API usage pattern: {e}")
return None
async def _analyze_performance_pattern(self) -> Optional[UsagePattern]:
"""Analyze performance patterns."""
try:
recent_metrics = list(self.usage_history)[-7:]
# Analyze query performance
avg_times = [m["performance"]["query_performance"].get("avg_execution_time", 0) for m in recent_metrics]
slow_queries = [m["performance"]["query_performance"].get("slow_queries_count", 0) for m in recent_metrics]
if not avg_times:
return None
avg_query_time = statistics.mean(avg_times)
total_slow_queries = sum(slow_queries)
# Determine pattern
if avg_query_time > 0.5: # 500ms threshold
pattern_type = "slow_query_performance"
description = f"Slow query performance: {avg_query_time:.3f}s average"
frequency = 0.8
impact_score = 0.9
recommendations = [
"Optimize slow queries",
"Review database indexes",
"Consider query result caching"
]
elif total_slow_queries > 50:
pattern_type = "high_slow_query_volume"
description = f"High volume of slow queries: {total_slow_queries} in recent period"
frequency = 0.7
impact_score = 0.8
recommendations = [
"Investigate query patterns",
"Implement query optimization",
"Review database configuration"
]
else:
return None
return UsagePattern(pattern_type, description, frequency, impact_score, recommendations)
except Exception as e:
logger.error(f"Failed to analyze performance pattern: {e}")
return None
async def _analyze_query_pattern(self) -> Optional[UsagePattern]:
"""Analyze query patterns."""
try:
# Get slow query analysis
slow_analysis = await query_optimizer.analyze_slow_queries(20)
if not slow_analysis:
return None
# Analyze query types
query_types = defaultdict(int)
for analysis in slow_analysis:
query_data = analysis["query_data"]
query_type = query_data.get("query_type", "UNKNOWN")
query_types[query_type] += 1
# Find most problematic query type
if query_types:
most_common_type = max(query_types, key=query_types.get)
count = query_types[most_common_type]
if count > 5:
pattern_type = "problematic_query_type"
description = f"High volume of slow {most_common_type} queries: {count} instances"
frequency = 0.6
impact_score = 0.7
recommendations = [
f"Optimize {most_common_type} query patterns",
"Review query execution plans",
"Consider database schema optimization"
]
return UsagePattern(pattern_type, description, frequency, impact_score, recommendations)
return None
except Exception as e:
logger.error(f"Failed to analyze query pattern: {e}")
return None
async def generate_trend_analysis(self, metric_path: str, days: int = 30) -> Optional[TrendAnalysis]:
"""Generate trend analysis for a specific metric."""
try:
# Get historical data
redis_client = await self.get_redis()
raw_data = await redis_client.lrange("analytics_metrics", 0, -1)
if not raw_data:
return None
# Parse metrics and extract specific metric
metrics_data = [json.loads(data) for data in raw_data]
# Extract metric values over time
metric_values = []
timestamps = []
for metrics in metrics_data:
try:
# Navigate metric path (e.g., "users.total" or "evaluations.completed")
path_parts = metric_path.split('.')
value = metrics
for part in path_parts:
if isinstance(value, dict) and part in value:
value = value[part]
else:
value = None
break
if value is not None:
metric_values.append(float(value))
timestamps.append(datetime.fromisoformat(metrics["timestamp"]))
except (ValueError, KeyError, TypeError):
continue
if len(metric_values) < 3:
return None
# Calculate trend
if len(metric_values) >= 2:
# Simple linear regression for trend detection
x = list(range(len(metric_values)))
y = metric_values
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(x[i] ** 2 for i in range(n))
# Calculate slope
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
# Determine trend
if abs(slope) < 0.01:
trend = "stable"
elif slope > 0:
trend = "increasing"
else:
trend = "decreasing"
# Calculate confidence (simplified)
confidence = min(abs(slope) * 10, 1.0)
# Calculate change rate
change_rate = (metric_values[-1] - metric_values[0]) / metric_values[0] * 100 if metric_values[0] != 0 else 0
# Generate simple forecast (next 5 points)
forecast = []
last_value = metric_values[-1]
for i in range(1, 6):
forecast_value = last_value + (slope * i)
forecast.append({
"point": i,
"value": forecast_value,
"timestamp": (timestamps[-1] + timedelta(days=i)).isoformat()
})
return TrendAnalysis(metric_path, trend, confidence, change_rate, forecast)
return None
except Exception as e:
logger.error(f"Failed to generate trend analysis: {e}")
return None
async def get_business_intelligence_report(self) -> Dict[str, Any]:
"""Generate comprehensive business intelligence report."""
try:
# Collect current metrics
current_metrics = await self.collect_usage_metrics()
# Detect patterns
patterns = await self.detect_usage_patterns()
# Generate trend analyses
trend_analyses = []
key_metrics = [
"users.total",
"users.active_24h",
"evaluations.total",
"evaluations.completed",
"api_keys.total_usage"
]
for metric in key_metrics:
trend = await self.generate_trend_analysis(metric)
if trend:
trend_analyses.append(trend)
# Calculate business KPIs
kpis = self._calculate_business_kpis(current_metrics)
# Generate recommendations
recommendations = self._generate_business_recommendations(patterns, trend_analyses, kpis)
report = {
"timestamp": datetime.utcnow().isoformat(),
"current_metrics": current_metrics,
"detected_patterns": [p.__dict__ for p in patterns],
"trend_analyses": [t.__dict__ for t in trend_analyses],
"business_kpis": kpis,
"recommendations": recommendations,
"data_quality": {
"metrics_collected": len(self.usage_history),
"data_points_analyzed": len(list(self.usage_history)),
"analysis_confidence": "high" if len(self.usage_history) > 50 else "medium"
}
}
return report
except Exception as e:
logger.error(f"Failed to generate business intelligence report: {e}")
return {"error": str(e)}
def _calculate_business_kpis(self, current_metrics: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate business KPIs."""
if not current_metrics:
return {}
try:
users = current_metrics["users"]
evals = current_metrics["evaluations"]
api_keys = current_metrics["api_keys"]
kpis = {
"user_metrics": {
"user_growth_rate": 0, # Would need historical data
"user_engagement_rate": users["active_24h"] / max(users["active"], 1) * 100,
"user_verification_rate": users["verified"] / max(users["total"], 1) * 100,
"new_user_acquisition_rate": users["new_24h"]
},
"evaluation_metrics": {
"evaluation_completion_rate": evals["completed"] / max(evals["total"], 1) * 100,
"evaluation_failure_rate": evals["failed"] / max(evals["total"], 1) * 100,
"avg_execution_time_sec": evals["avg_execution_time_sec"],
"evaluation_velocity": evals["new_24h"]
},
"api_metrics": {
"api_adoption_rate": api_keys["used_24h"] / max(api_keys["active"], 1) * 100,
"api_usage_per_active_key": api_keys["total_usage"] / max(api_keys["active"], 1),
"api_key_utilization": api_keys["active"] / max(api_keys["total"], 1) * 100
},
"overall_health": {
"system_health_score": self._calculate_health_score(current_metrics),
"growth_indicator": "positive" if users["new_24h"] > 0 else "stable",
"performance_indicator": "good" if evals["avg_execution_time_sec"] < 30 else "needs_attention"
}
}
return kpis
except Exception as e:
logger.error(f"Failed to calculate business KPIs: {e}")
return {}
def _calculate_health_score(self, metrics: Dict[str, Any]) -> float:
"""Calculate overall system health score."""
try:
scores = []
# User engagement score (0-100)
user_engagement = metrics["users"]["active_24h"] / max(metrics["users"]["active"], 1) * 100
scores.append(min(user_engagement, 100))
# Evaluation success score (0-100)
eval_success = metrics["evaluations"]["completed"] / max(metrics["evaluations"]["total"], 1) * 100
scores.append(min(eval_success, 100))
# API utilization score (0-100)
api_util = metrics["api_keys"]["used_24h"] / max(metrics["api_keys"]["active"], 1) * 100
scores.append(min(api_util, 100))
# Performance score (0-100)
avg_time = metrics["evaluations"]["avg_execution_time_sec"]
perf_score = max(0, 100 - (avg_time * 2)) # Deduct 2 points per second
scores.append(min(perf_score, 100))
return statistics.mean(scores) if scores else 0.0
except Exception:
return 0.0
def _generate_business_recommendations(self, patterns: List[UsagePattern],
trends: List[TrendAnalysis],
kpis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate business recommendations."""
recommendations = []
# Pattern-based recommendations
for pattern in patterns:
if pattern.impact_score > 0.7:
recommendations.append({
"type": "pattern_based",
"priority": "high" if pattern.impact_score > 0.8 else "medium",
"title": pattern.description,
"actions": pattern.recommendations,
"impact_score": pattern.impact_score
})
# Trend-based recommendations
for trend in trends:
if trend.confidence > 0.7 and trend.trend == "decreasing":
recommendations.append({
"type": "trend_based",
"priority": "high",
"title": f"Declining trend in {trend.metric}",
"actions": [
f"Investigate causes of declining {trend.metric}",
"Implement corrective measures",
"Monitor trend closely"
],
"confidence": trend.confidence
})
# KPI-based recommendations
if kpis:
user_engagement = kpis.get("user_metrics", {}).get("user_engagement_rate", 0)
if user_engagement < 30:
recommendations.append({
"type": "kpi_based",
"priority": "medium",
"title": "Low user engagement detected",
"actions": [
"Review user onboarding process",
"Implement engagement campaigns",
"Analyze user behavior patterns"
],
"current_value": user_engagement
})
return recommendations[:10] # Top 10 recommendations
# Global analytics engine instance
analytics_engine = AnalyticsEngine()
# Scheduled analytics task
async def scheduled_analytics_task():
"""Run scheduled analytics collection and analysis."""
logger.info("Starting scheduled analytics collection...")
try:
# Collect current metrics
metrics = await analytics_engine.collect_usage_metrics()
if metrics:
# Generate business intelligence report
report = await analytics_engine.get_business_intelligence_report()
# Store report in Redis
redis_client = await analytics_engine.get_redis()
await redis_client.setex(
"business_intelligence_report",
3600, # 1 hour
json.dumps(report, default=str)
)
logger.info("Scheduled analytics completed successfully")
else:
logger.warning("No metrics collected in scheduled analytics")
except Exception as e:
logger.error(f"Scheduled analytics failed: {e}")
if __name__ == "__main__":
import sys
async def main():
command = sys.argv[1] if len(sys.argv) > 1 else "help"
if command == "collect":
metrics = await analytics_engine.collect_usage_metrics()
if metrics:
print(json.dumps(metrics, indent=2))
else:
print("Failed to collect metrics")
elif command == "patterns":
patterns = await analytics_engine.detect_usage_patterns()
print(f"Detected {len(patterns)} patterns:")
for pattern in patterns:
print(f" - {pattern.pattern_type}: {pattern.description}")
print(f" Recommendations: {', '.join(pattern.recommendations[:2])}")
elif command == "trends":
trend = await analytics_engine.generate_trend_analysis("users.total")
if trend:
print(json.dumps(trend.__dict__, indent=2))
else:
print("No trend data available")
elif command == "report":
report = await analytics_engine.get_business_intelligence_report()
print(json.dumps(report, indent=2, default=str))
else:
print("Usage: python analytics_engine.py <command>")
print("Commands: collect, patterns, trends, report")
asyncio.run(main())