| """ |
| Built-in Analytics Engine for AegisLM SaaS Backend. |
| |
| Production-ready usage analytics with pattern detection, |
| trend analysis, and business intelligence. |
| """ |
|
|
| import asyncio |
| import json |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Any, Optional, Tuple |
| from sqlalchemy import text, func, and_, or_ |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from collections import defaultdict, deque |
| import logging |
| import statistics |
|
|
| from .database import async_engine, get_redis |
| from .performance_monitor import performance_monitor |
| from .query_optimizer import query_optimizer |
| from .config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class UsagePattern: |
| """Usage pattern definition.""" |
| |
| def __init__(self, pattern_type: str, description: str, frequency: float, |
| impact_score: float, recommendations: List[str]): |
| self.pattern_type = pattern_type |
| self.description = description |
| self.frequency = frequency |
| self.impact_score = impact_score |
| self.recommendations = recommendations |
|
|
|
|
| class TrendAnalysis: |
| """Trend analysis results.""" |
| |
| def __init__(self, metric: str, trend: str, confidence: float, |
| change_rate: float, forecast: List[Dict[str, Any]]): |
| self.metric = metric |
| self.trend = trend |
| self.confidence = confidence |
| self.change_rate = change_rate |
| self.forecast = forecast |
|
|
|
|
| class AnalyticsEngine: |
| """Built-in analytics engine for usage patterns and business intelligence.""" |
| |
| def __init__(self): |
| self.redis_client = None |
| self.analytics_retention_days = getattr(settings, 'ANALYTICS_RETENTION_DAYS', 90) |
| self.pattern_detection_window = getattr(settings, 'PATTERN_DETECTION_WINDOW', 7) |
| |
| |
| self.usage_history = deque(maxlen=1000) |
| self.query_patterns = defaultdict(int) |
| self.user_activity = defaultdict(list) |
| |
| async def get_redis(self): |
| """Get Redis client.""" |
| if not self.redis_client: |
| self.redis_client = await get_redis() |
| return self.redis_client |
| |
| async def collect_usage_metrics(self): |
| """Collect current usage metrics.""" |
| try: |
| async with async_engine.begin() as conn: |
| |
| result = await conn.execute(text(""" |
| SELECT |
| COUNT(*) as total_users, |
| COUNT(*) FILTER (WHERE is_active = true) as active_users, |
| COUNT(*) FILTER (WHERE is_verified = true) as verified_users, |
| COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as new_users_24h, |
| COUNT(*) FILTER (WHERE last_login_at >= NOW() - INTERVAL '24 hours') as active_users_24h |
| FROM users |
| """)) |
| user_stats = result.fetchone() |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| COUNT(*) as total_evaluations, |
| COUNT(*) FILTER (WHERE status = 'completed') as completed_evaluations, |
| COUNT(*) FILTER (WHERE status = 'running') as running_evaluations, |
| COUNT(*) FILTER (WHERE status = 'failed') as failed_evaluations, |
| COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '24 hours') as evaluations_24h, |
| AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_execution_time_sec |
| FROM evaluations |
| WHERE created_at >= NOW() - INTERVAL '7 days' |
| """)) |
| eval_stats = result.fetchone() |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| COUNT(*) as total_api_keys, |
| COUNT(*) FILTER (WHERE is_active = true) as active_api_keys, |
| SUM(usage_count) as total_usage, |
| COUNT(*) FILTER (WHERE last_used_at >= NOW() - INTERVAL '24 hours') as used_24h |
| FROM api_keys |
| """)) |
| api_stats = result.fetchone() |
| |
| |
| perf_summary = await performance_monitor.get_performance_summary() |
| |
| |
| metrics = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "users": { |
| "total": user_stats.total_users, |
| "active": user_stats.active_users, |
| "verified": user_stats.verified_users, |
| "new_24h": user_stats.new_users_24h, |
| "active_24h": user_stats.active_users_24h |
| }, |
| "evaluations": { |
| "total": eval_stats.total_evaluations or 0, |
| "completed": eval_stats.completed_evaluations or 0, |
| "running": eval_stats.running_evaluations or 0, |
| "failed": eval_stats.failed_evaluations or 0, |
| "new_24h": eval_stats.evaluations_24h or 0, |
| "avg_execution_time_sec": float(eval_stats.avg_execution_time_sec or 0) |
| }, |
| "api_keys": { |
| "total": api_stats.total_api_keys, |
| "active": api_stats.active_api_keys, |
| "total_usage": api_stats.total_usage or 0, |
| "used_24h": api_stats.used_24h |
| }, |
| "performance": { |
| "query_performance": perf_summary.get("query_performance", {}), |
| "connection_pool": perf_summary.get("connection_pool", {}), |
| "database": perf_summary.get("database", {}) |
| } |
| } |
| |
| |
| self.usage_history.append(metrics) |
| |
| |
| redis_client = await self.get_redis() |
| await redis_client.lpush( |
| "analytics_metrics", |
| json.dumps(metrics) |
| ) |
| await redis_client.ltrim("analytics_metrics", 0, 999) |
| await redis_client.expire("analytics_metrics", self.analytics_retention_days * 24 * 3600) |
| |
| return metrics |
| |
| except Exception as e: |
| logger.error(f"Failed to collect usage metrics: {e}") |
| return None |
| |
| async def detect_usage_patterns(self) -> List[UsagePattern]: |
| """Detect usage patterns from collected data.""" |
| patterns = [] |
| |
| try: |
| if len(self.usage_history) < 10: |
| return patterns |
| |
| |
| user_pattern = await self._analyze_user_activity_pattern() |
| if user_pattern: |
| patterns.append(user_pattern) |
| |
| |
| eval_pattern = await self._analyze_evaluation_pattern() |
| if eval_pattern: |
| patterns.append(eval_pattern) |
| |
| |
| api_pattern = await self._analyze_api_usage_pattern() |
| if api_pattern: |
| patterns.append(api_pattern) |
| |
| |
| perf_pattern = await self._analyze_performance_pattern() |
| if perf_pattern: |
| patterns.append(perf_pattern) |
| |
| |
| query_pattern = await self._analyze_query_pattern() |
| if query_pattern: |
| patterns.append(query_pattern) |
| |
| return patterns |
| |
| except Exception as e: |
| logger.error(f"Failed to detect usage patterns: {e}") |
| return [] |
| |
| async def _analyze_user_activity_pattern(self) -> Optional[UsagePattern]: |
| """Analyze user activity patterns.""" |
| try: |
| recent_metrics = list(self.usage_history)[-7:] |
| |
| |
| user_counts = [m["users"]["total"] for m in recent_metrics] |
| if len(user_counts) < 2: |
| return None |
| |
| growth_rate = (user_counts[-1] - user_counts[0]) / user_counts[0] * 100 if user_counts[0] > 0 else 0 |
| |
| |
| active_ratios = [m["users"]["active_24h"] / max(m["users"]["active"], 1) * 100 for m in recent_metrics] |
| avg_active_ratio = statistics.mean(active_ratios) |
| |
| |
| if growth_rate > 10: |
| pattern_type = "high_user_growth" |
| description = f"High user growth detected: {growth_rate:.1f}% growth rate" |
| frequency = 0.8 |
| impact_score = 0.9 |
| recommendations = [ |
| "Ensure database capacity can handle growth", |
| "Monitor user onboarding funnel", |
| "Consider scaling customer support" |
| ] |
| elif avg_active_ratio < 20: |
| pattern_type = "low_user_engagement" |
| description = f"Low user engagement: {avg_active_ratio:.1f}% active users" |
| frequency = 0.7 |
| impact_score = 0.8 |
| recommendations = [ |
| "Investigate user onboarding process", |
| "Implement user engagement campaigns", |
| "Review product features for usability" |
| ] |
| else: |
| return None |
| |
| return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze user activity pattern: {e}") |
| return None |
| |
| async def _analyze_evaluation_pattern(self) -> Optional[UsagePattern]: |
| """Analyze evaluation usage patterns.""" |
| try: |
| recent_metrics = list(self.usage_history)[-7:] |
| |
| |
| completion_rates = [] |
| for m in recent_metrics: |
| total = m["evaluations"]["total"] |
| completed = m["evaluations"]["completed"] |
| if total > 0: |
| completion_rates.append(completed / total * 100) |
| |
| if not completion_rates: |
| return None |
| |
| avg_completion_rate = statistics.mean(completion_rates) |
| |
| |
| failure_rates = [] |
| for m in recent_metrics: |
| total = m["evaluations"]["total"] |
| failed = m["evaluations"]["failed"] |
| if total > 0: |
| failure_rates.append(failed / total * 100) |
| |
| avg_failure_rate = statistics.mean(failure_rates) if failure_rates else 0 |
| |
| |
| if avg_failure_rate > 15: |
| pattern_type = "high_failure_rate" |
| description = f"High evaluation failure rate: {avg_failure_rate:.1f}%" |
| frequency = 0.8 |
| impact_score = 0.9 |
| recommendations = [ |
| "Investigate common failure causes", |
| "Improve error handling and logging", |
| "Review evaluation pipeline stability" |
| ] |
| elif avg_completion_rate < 70: |
| pattern_type = "low_completion_rate" |
| description = f"Low evaluation completion rate: {avg_completion_rate:.1f}%" |
| frequency = 0.7 |
| impact_score = 0.8 |
| recommendations = [ |
| "Analyze evaluation bottlenecks", |
| "Optimize evaluation performance", |
| "Review resource allocation" |
| ] |
| else: |
| return None |
| |
| return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze evaluation pattern: {e}") |
| return None |
| |
| async def _analyze_api_usage_pattern(self) -> Optional[UsagePattern]: |
| """Analyze API key usage patterns.""" |
| try: |
| recent_metrics = list(self.usage_history)[-7:] |
| |
| |
| usage_counts = [m["api_keys"]["total_usage"] for m in recent_metrics] |
| if len(usage_counts) < 2: |
| return None |
| |
| usage_growth = (usage_counts[-1] - usage_counts[0]) / max(usage_counts[0], 1) * 100 |
| |
| |
| active_ratios = [m["api_keys"]["used_24h"] / max(m["api_keys"]["active"], 1) * 100 for m in recent_metrics] |
| avg_active_ratio = statistics.mean(active_ratios) |
| |
| |
| if usage_growth > 50: |
| pattern_type = "high_api_usage_growth" |
| description = f"High API usage growth: {usage_growth:.1f}% increase" |
| frequency = 0.7 |
| impact_score = 0.8 |
| recommendations = [ |
| "Monitor API rate limits", |
| "Consider API tier upgrades", |
| "Review API usage documentation" |
| ] |
| elif avg_active_ratio < 10: |
| pattern_type = "low_api_adoption" |
| description = f"Low API adoption: {avg_active_ratio:.1f}% active keys" |
| frequency = 0.6 |
| impact_score = 0.7 |
| recommendations = [ |
| "Improve API documentation", |
| "Create API usage tutorials", |
| "Review API pricing and features" |
| ] |
| else: |
| return None |
| |
| return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze API usage pattern: {e}") |
| return None |
| |
| async def _analyze_performance_pattern(self) -> Optional[UsagePattern]: |
| """Analyze performance patterns.""" |
| try: |
| recent_metrics = list(self.usage_history)[-7:] |
| |
| |
| avg_times = [m["performance"]["query_performance"].get("avg_execution_time", 0) for m in recent_metrics] |
| slow_queries = [m["performance"]["query_performance"].get("slow_queries_count", 0) for m in recent_metrics] |
| |
| if not avg_times: |
| return None |
| |
| avg_query_time = statistics.mean(avg_times) |
| total_slow_queries = sum(slow_queries) |
| |
| |
| if avg_query_time > 0.5: |
| pattern_type = "slow_query_performance" |
| description = f"Slow query performance: {avg_query_time:.3f}s average" |
| frequency = 0.8 |
| impact_score = 0.9 |
| recommendations = [ |
| "Optimize slow queries", |
| "Review database indexes", |
| "Consider query result caching" |
| ] |
| elif total_slow_queries > 50: |
| pattern_type = "high_slow_query_volume" |
| description = f"High volume of slow queries: {total_slow_queries} in recent period" |
| frequency = 0.7 |
| impact_score = 0.8 |
| recommendations = [ |
| "Investigate query patterns", |
| "Implement query optimization", |
| "Review database configuration" |
| ] |
| else: |
| return None |
| |
| return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze performance pattern: {e}") |
| return None |
| |
| async def _analyze_query_pattern(self) -> Optional[UsagePattern]: |
| """Analyze query patterns.""" |
| try: |
| |
| slow_analysis = await query_optimizer.analyze_slow_queries(20) |
| |
| if not slow_analysis: |
| return None |
| |
| |
| query_types = defaultdict(int) |
| for analysis in slow_analysis: |
| query_data = analysis["query_data"] |
| query_type = query_data.get("query_type", "UNKNOWN") |
| query_types[query_type] += 1 |
| |
| |
| if query_types: |
| most_common_type = max(query_types, key=query_types.get) |
| count = query_types[most_common_type] |
| |
| if count > 5: |
| pattern_type = "problematic_query_type" |
| description = f"High volume of slow {most_common_type} queries: {count} instances" |
| frequency = 0.6 |
| impact_score = 0.7 |
| recommendations = [ |
| f"Optimize {most_common_type} query patterns", |
| "Review query execution plans", |
| "Consider database schema optimization" |
| ] |
| |
| return UsagePattern(pattern_type, description, frequency, impact_score, recommendations) |
| |
| return None |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze query pattern: {e}") |
| return None |
| |
| async def generate_trend_analysis(self, metric_path: str, days: int = 30) -> Optional[TrendAnalysis]: |
| """Generate trend analysis for a specific metric.""" |
| try: |
| |
| redis_client = await self.get_redis() |
| raw_data = await redis_client.lrange("analytics_metrics", 0, -1) |
| |
| if not raw_data: |
| return None |
| |
| |
| metrics_data = [json.loads(data) for data in raw_data] |
| |
| |
| metric_values = [] |
| timestamps = [] |
| |
| for metrics in metrics_data: |
| try: |
| |
| path_parts = metric_path.split('.') |
| value = metrics |
| |
| for part in path_parts: |
| if isinstance(value, dict) and part in value: |
| value = value[part] |
| else: |
| value = None |
| break |
| |
| if value is not None: |
| metric_values.append(float(value)) |
| timestamps.append(datetime.fromisoformat(metrics["timestamp"])) |
| |
| except (ValueError, KeyError, TypeError): |
| continue |
| |
| if len(metric_values) < 3: |
| return None |
| |
| |
| if len(metric_values) >= 2: |
| |
| x = list(range(len(metric_values))) |
| y = metric_values |
| |
| n = len(x) |
| sum_x = sum(x) |
| sum_y = sum(y) |
| sum_xy = sum(x[i] * y[i] for i in range(n)) |
| sum_x2 = sum(x[i] ** 2 for i in range(n)) |
| |
| |
| slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2) |
| |
| |
| if abs(slope) < 0.01: |
| trend = "stable" |
| elif slope > 0: |
| trend = "increasing" |
| else: |
| trend = "decreasing" |
| |
| |
| confidence = min(abs(slope) * 10, 1.0) |
| |
| |
| change_rate = (metric_values[-1] - metric_values[0]) / metric_values[0] * 100 if metric_values[0] != 0 else 0 |
| |
| |
| forecast = [] |
| last_value = metric_values[-1] |
| for i in range(1, 6): |
| forecast_value = last_value + (slope * i) |
| forecast.append({ |
| "point": i, |
| "value": forecast_value, |
| "timestamp": (timestamps[-1] + timedelta(days=i)).isoformat() |
| }) |
| |
| return TrendAnalysis(metric_path, trend, confidence, change_rate, forecast) |
| |
| return None |
| |
| except Exception as e: |
| logger.error(f"Failed to generate trend analysis: {e}") |
| return None |
| |
| async def get_business_intelligence_report(self) -> Dict[str, Any]: |
| """Generate comprehensive business intelligence report.""" |
| try: |
| |
| current_metrics = await self.collect_usage_metrics() |
| |
| |
| patterns = await self.detect_usage_patterns() |
| |
| |
| trend_analyses = [] |
| key_metrics = [ |
| "users.total", |
| "users.active_24h", |
| "evaluations.total", |
| "evaluations.completed", |
| "api_keys.total_usage" |
| ] |
| |
| for metric in key_metrics: |
| trend = await self.generate_trend_analysis(metric) |
| if trend: |
| trend_analyses.append(trend) |
| |
| |
| kpis = self._calculate_business_kpis(current_metrics) |
| |
| |
| recommendations = self._generate_business_recommendations(patterns, trend_analyses, kpis) |
| |
| report = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "current_metrics": current_metrics, |
| "detected_patterns": [p.__dict__ for p in patterns], |
| "trend_analyses": [t.__dict__ for t in trend_analyses], |
| "business_kpis": kpis, |
| "recommendations": recommendations, |
| "data_quality": { |
| "metrics_collected": len(self.usage_history), |
| "data_points_analyzed": len(list(self.usage_history)), |
| "analysis_confidence": "high" if len(self.usage_history) > 50 else "medium" |
| } |
| } |
| |
| return report |
| |
| except Exception as e: |
| logger.error(f"Failed to generate business intelligence report: {e}") |
| return {"error": str(e)} |
| |
| def _calculate_business_kpis(self, current_metrics: Optional[Dict[str, Any]]) -> Dict[str, Any]: |
| """Calculate business KPIs.""" |
| if not current_metrics: |
| return {} |
| |
| try: |
| users = current_metrics["users"] |
| evals = current_metrics["evaluations"] |
| api_keys = current_metrics["api_keys"] |
| |
| kpis = { |
| "user_metrics": { |
| "user_growth_rate": 0, |
| "user_engagement_rate": users["active_24h"] / max(users["active"], 1) * 100, |
| "user_verification_rate": users["verified"] / max(users["total"], 1) * 100, |
| "new_user_acquisition_rate": users["new_24h"] |
| }, |
| "evaluation_metrics": { |
| "evaluation_completion_rate": evals["completed"] / max(evals["total"], 1) * 100, |
| "evaluation_failure_rate": evals["failed"] / max(evals["total"], 1) * 100, |
| "avg_execution_time_sec": evals["avg_execution_time_sec"], |
| "evaluation_velocity": evals["new_24h"] |
| }, |
| "api_metrics": { |
| "api_adoption_rate": api_keys["used_24h"] / max(api_keys["active"], 1) * 100, |
| "api_usage_per_active_key": api_keys["total_usage"] / max(api_keys["active"], 1), |
| "api_key_utilization": api_keys["active"] / max(api_keys["total"], 1) * 100 |
| }, |
| "overall_health": { |
| "system_health_score": self._calculate_health_score(current_metrics), |
| "growth_indicator": "positive" if users["new_24h"] > 0 else "stable", |
| "performance_indicator": "good" if evals["avg_execution_time_sec"] < 30 else "needs_attention" |
| } |
| } |
| |
| return kpis |
| |
| except Exception as e: |
| logger.error(f"Failed to calculate business KPIs: {e}") |
| return {} |
| |
| def _calculate_health_score(self, metrics: Dict[str, Any]) -> float: |
| """Calculate overall system health score.""" |
| try: |
| scores = [] |
| |
| |
| user_engagement = metrics["users"]["active_24h"] / max(metrics["users"]["active"], 1) * 100 |
| scores.append(min(user_engagement, 100)) |
| |
| |
| eval_success = metrics["evaluations"]["completed"] / max(metrics["evaluations"]["total"], 1) * 100 |
| scores.append(min(eval_success, 100)) |
| |
| |
| api_util = metrics["api_keys"]["used_24h"] / max(metrics["api_keys"]["active"], 1) * 100 |
| scores.append(min(api_util, 100)) |
| |
| |
| avg_time = metrics["evaluations"]["avg_execution_time_sec"] |
| perf_score = max(0, 100 - (avg_time * 2)) |
| scores.append(min(perf_score, 100)) |
| |
| return statistics.mean(scores) if scores else 0.0 |
| |
| except Exception: |
| return 0.0 |
| |
| def _generate_business_recommendations(self, patterns: List[UsagePattern], |
| trends: List[TrendAnalysis], |
| kpis: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """Generate business recommendations.""" |
| recommendations = [] |
| |
| |
| for pattern in patterns: |
| if pattern.impact_score > 0.7: |
| recommendations.append({ |
| "type": "pattern_based", |
| "priority": "high" if pattern.impact_score > 0.8 else "medium", |
| "title": pattern.description, |
| "actions": pattern.recommendations, |
| "impact_score": pattern.impact_score |
| }) |
| |
| |
| for trend in trends: |
| if trend.confidence > 0.7 and trend.trend == "decreasing": |
| recommendations.append({ |
| "type": "trend_based", |
| "priority": "high", |
| "title": f"Declining trend in {trend.metric}", |
| "actions": [ |
| f"Investigate causes of declining {trend.metric}", |
| "Implement corrective measures", |
| "Monitor trend closely" |
| ], |
| "confidence": trend.confidence |
| }) |
| |
| |
| if kpis: |
| user_engagement = kpis.get("user_metrics", {}).get("user_engagement_rate", 0) |
| if user_engagement < 30: |
| recommendations.append({ |
| "type": "kpi_based", |
| "priority": "medium", |
| "title": "Low user engagement detected", |
| "actions": [ |
| "Review user onboarding process", |
| "Implement engagement campaigns", |
| "Analyze user behavior patterns" |
| ], |
| "current_value": user_engagement |
| }) |
| |
| return recommendations[:10] |
|
|
|
|
| |
| analytics_engine = AnalyticsEngine() |
|
|
|
|
| |
| async def scheduled_analytics_task(): |
| """Run scheduled analytics collection and analysis.""" |
| logger.info("Starting scheduled analytics collection...") |
| |
| try: |
| |
| metrics = await analytics_engine.collect_usage_metrics() |
| |
| if metrics: |
| |
| report = await analytics_engine.get_business_intelligence_report() |
| |
| |
| redis_client = await analytics_engine.get_redis() |
| await redis_client.setex( |
| "business_intelligence_report", |
| 3600, |
| json.dumps(report, default=str) |
| ) |
| |
| logger.info("Scheduled analytics completed successfully") |
| else: |
| logger.warning("No metrics collected in scheduled analytics") |
| |
| except Exception as e: |
| logger.error(f"Scheduled analytics failed: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| |
| async def main(): |
| command = sys.argv[1] if len(sys.argv) > 1 else "help" |
| |
| if command == "collect": |
| metrics = await analytics_engine.collect_usage_metrics() |
| if metrics: |
| print(json.dumps(metrics, indent=2)) |
| else: |
| print("Failed to collect metrics") |
| |
| elif command == "patterns": |
| patterns = await analytics_engine.detect_usage_patterns() |
| print(f"Detected {len(patterns)} patterns:") |
| for pattern in patterns: |
| print(f" - {pattern.pattern_type}: {pattern.description}") |
| print(f" Recommendations: {', '.join(pattern.recommendations[:2])}") |
| |
| elif command == "trends": |
| trend = await analytics_engine.generate_trend_analysis("users.total") |
| if trend: |
| print(json.dumps(trend.__dict__, indent=2)) |
| else: |
| print("No trend data available") |
| |
| elif command == "report": |
| report = await analytics_engine.get_business_intelligence_report() |
| print(json.dumps(report, indent=2, default=str)) |
| |
| else: |
| print("Usage: python analytics_engine.py <command>") |
| print("Commands: collect, patterns, trends, report") |
| |
| asyncio.run(main()) |
|
|