Spaces:
Paused
Paused
| """ | |
| Business Analytics Dashboard Implementation | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| class MetricType(Enum): | |
| USERS = "users" | |
| REQUESTS = "requests" | |
| REVENUE = "revenue" | |
| CONVERSION = "conversion" | |
| PERFORMANCE = "performance" | |
| ENGAGEMENT = "engagement" | |
| BUSINESS_KPI = "business_kpi" | |
| class BusinessMetrics: | |
| """Business metrics collection and analysis""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.metrics_retention_days = int(os.getenv("METRICS_RETENTION_DAYS", "90")) | |
| self.real_time_window_hours = int(os.getenv("REAL_TIME_WINDOW_HOURS", "24")) | |
| self.daily_aggregation_enabled = os.getenv("DAILY_AGGREGATION", "true").lower() == "true" | |
| self.weekly_report_enabled = os.getenv("WEEKLY_REPORT_ENABLED", "true").lower() == "true" | |
| async def track_user_activity(self, user_id: str, action: str, metadata: dict[str, Any] = None): | |
| """Track user activity""" | |
| try: | |
| activity = { | |
| "user_id": user_id, | |
| "action": action, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "metadata": metadata or {}, | |
| "session_id": metadata.get("session_id") if metadata else None, | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("user_activities", json.dumps(activity)) | |
| await self.redis.expire( | |
| f"user_activities:{user_id}", self.metrics_retention_days * 86400 | |
| ) # Days to seconds | |
| # Update real-time counters | |
| await self.redis.hincrby("daily_user_actions", action, 1) | |
| await self.redis.hincrby("real_time_metrics", "user_activities", 1) | |
| logger.debug(f"User activity tracked: {user_id} - {action}") | |
| except Exception as e: | |
| logger.error(f"Error tracking user activity: {e}") | |
| async def track_request_metrics(self, request_data: dict[str, Any]): | |
| """Track request metrics for business analytics""" | |
| try: | |
| timestamp = datetime.utcnow().isoformat() | |
| # Extract key metrics | |
| user_id = request_data.get("user_id") | |
| endpoint = request_data.get("endpoint") | |
| method = request_data.get("method", "GET") | |
| status_code = request_data.get("status_code", 200) | |
| response_time = request_data.get("response_time_ms", 0) | |
| user_agent = request_data.get("user_agent") | |
| ip_address = request_data.get("ip_address") | |
| referer = request_data.get("referer") | |
| content_type = request_data.get("content_type", "") | |
| content_length = request_data.get("content_length", 0) | |
| # Request categorization | |
| category = self._categorize_request(endpoint, method, status_code) | |
| metrics = { | |
| "timestamp": timestamp, | |
| "user_id": user_id, | |
| "endpoint": endpoint, | |
| "category": category, | |
| "method": method, | |
| "status_code": status_code, | |
| "response_time_ms": response_time, | |
| "is_success": 200 <= status_code < 300, | |
| "user_agent": user_agent, | |
| "ip_address": ip_address, | |
| "referer": referer, | |
| "content_type": content_type, | |
| "content_length": content_length, | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("request_metrics", json.dumps(metrics)) | |
| await self.redis.ltrim("request_metrics", 0, 10000) # Keep last 10K requests | |
| # Update real-time counters | |
| await self.redis.hincrby("daily_metrics", "requests", 1) | |
| if status_code >= 400: | |
| await self.redis.hincrby("daily_metrics", "errors", 1) | |
| if status_code >= 500: | |
| await self.redis.hincrby("daily_metrics", "server_errors", 1) | |
| # Category-specific counters | |
| await self.redis.hincrby("daily_metrics", f"requests_{category}", 1) | |
| if status_code >= 400: | |
| await self.redis.hincrby("daily_metrics", f"errors_{category}", 1) | |
| # Response time tracking | |
| if response_time > 0: | |
| await self.redis.lpush("response_times", str(response_time)) | |
| await self.redis.ltrim("response_times", 0, 1000) | |
| logger.debug(f"Request metrics tracked: {endpoint} - {status_code}") | |
| except Exception as e: | |
| logger.error(f"Error tracking request metrics: {e}") | |
| def _categorize_request(self, endpoint: str, method: str, status_code: int) -> str: | |
| """Categorize request for analytics""" | |
| endpoint_lower = endpoint.lower() | |
| # API endpoints | |
| if endpoint_lower.startswith("/api/auth"): | |
| return "authentication" | |
| elif endpoint_lower.startswith("/api/users"): | |
| return "user_management" | |
| elif endpoint_lower.startswith("/api/analytics"): | |
| return "analytics" | |
| elif endpoint_lower.startswith("/api/reports"): | |
| return "reporting" | |
| elif endpoint_lower.startswith("/api/admin"): | |
| return "administration" | |
| # Public pages | |
| elif endpoint_lower in ["/", "/home", "/dashboard", "/login"]: | |
| return "page_view" | |
| elif endpoint_lower.startswith("/api/"): | |
| if method == "GET": | |
| return "api_read" | |
| else: | |
| return "api_write" | |
| # Error categorization | |
| if status_code >= 500: | |
| return "server_error" | |
| elif status_code >= 400: | |
| return "client_error" | |
| else: | |
| return "success" | |
| async def track_revenue_metrics( | |
| self, | |
| amount: float, | |
| currency: str = "USD", | |
| product_id: str = None, | |
| user_id: str = None, | |
| metadata: dict[str, Any] = None, | |
| ): | |
| """Track revenue metrics""" | |
| try: | |
| revenue = { | |
| "amount": amount, | |
| "currency": currency, | |
| "product_id": product_id, | |
| "user_id": user_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "metadata": metadata or {}, | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("revenue_metrics", json.dumps(revenue)) | |
| await self.redis.ltrim("revenue_metrics", 0, 10000) | |
| # Update revenue counters | |
| await self.redis.hincrby("daily_metrics", "revenue_total", int(amount * 100)) # Store in cents | |
| await self.redis.hincrby("daily_metrics", "transactions", 1) | |
| # Currency-specific | |
| await self.redis.hincrby("daily_metrics", f"revenue_{currency.lower()}", int(amount * 100)) | |
| logger.info(f"Revenue tracked: {amount} {currency}") | |
| except Exception as e: | |
| logger.error(f"Error tracking revenue: {e}") | |
| async def track_conversion_metrics( | |
| self, conversion_type: str, value: float, user_id: str = None, metadata: dict[str, Any] = None | |
| ): | |
| """Track conversion metrics""" | |
| try: | |
| conversion = { | |
| "type": conversion_type, | |
| "value": value, | |
| "user_id": user_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "metadata": metadata or {}, | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("conversion_metrics", json.dumps(conversion)) | |
| await self.redis.ltrim("conversion_metrics", 0, 10000) | |
| # Update conversion counters | |
| await self.redis.hincrby("daily_metrics", f"conversions_{conversion_type}", 1) | |
| await self.redis.hincrby("daily_metrics", "conversions_total", 1) | |
| logger.info(f"Conversion tracked: {conversion_type} - {value}") | |
| except Exception as e: | |
| logger.error(f"Error tracking conversion: {e}") | |
| async def track_engagement_metrics(self, user_id: str, event_type: str, properties: dict[str, Any]): | |
| """Track user engagement metrics""" | |
| try: | |
| engagement = { | |
| "user_id": user_id, | |
| "event_type": event_type, | |
| "properties": properties, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "session_duration_seconds": properties.get("session_duration", 0), | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("engagement_metrics", json.dumps(engagement)) | |
| await self.redis.ltrim("engagement_metrics", 0, 10000) | |
| # Update engagement counters | |
| await self.redis.hincrby("daily_metrics", "engagement_events", 1) | |
| # Event-specific tracking | |
| await self.redis.hincrby("daily_metrics", f"engagement_{event_type}", 1) | |
| logger.debug(f"Engagement tracked: {user_id} - {event_type}") | |
| except Exception as e: | |
| logger.error(f"Error tracking engagement: {e}") | |
| async def _calculate_retention_rate(self, date: str) -> float: | |
| """Calculate retention rate""" | |
| try: | |
| # Get users who were active yesterday | |
| yesterday_active = await self.redis.smembers(f"active_users:{date}") | |
| if not yesterday_active: | |
| return 100.0 | |
| # Check how many are still active today | |
| today_active = await self.redis.smembers(f"active_users:{datetime.utcnow().strftime('%Y-%m-%d')}") | |
| if not today_active: | |
| return 0.0 | |
| return (len(today_active) / len(yesterday_active)) * 100 | |
| except Exception as e: | |
| logger.error(f"Error calculating retention rate: {e}") | |
| return 0.0 | |
| async def _calculate_conversion_rate(self, date: str) -> float: | |
| """Calculate conversion rate""" | |
| try: | |
| total_visitors = await self.redis.get(f"total_visitors:{date}") | |
| total_conversions = await self.redis.get(f"total_conversions:{date}") | |
| if total_visitors and total_conversions: | |
| return (int(total_conversions) / int(total_visitors)) * 100 | |
| except Exception as e: | |
| logger.error(f"Error calculating conversion rate: {e}") | |
| return 0.0 | |
| async def _calculate_avg_session_duration(self, date: str) -> float: | |
| """Calculate average session duration""" | |
| try: | |
| # Get session durations for date | |
| durations = await self.redis.lrange(f"session_durations:{date}", 0, 100) | |
| if durations: | |
| # Parse durations from Redis | |
| duration_values = [float(d) for d in durations] | |
| return sum(duration_values) / len(duration_values) | |
| return 0.0 | |
| except Exception as e: | |
| logger.error(f"Error calculating average session duration: {e}") | |
| return 0.0 | |
| async def _calculate_engagement_score(self, date: str) -> float: | |
| """Calculate engagement score""" | |
| try: | |
| # Get engagement metrics for date | |
| if self.redis: | |
| events = await self.redis.lrange(f"engagement_metrics:{date}", 0, 1000) | |
| if not events: | |
| return 0.0 | |
| # Simple engagement score based on events and session duration | |
| engagement_score = 0 | |
| session_durations = [] | |
| for event_json in events: | |
| try: | |
| event = json.loads(event_json) | |
| if event.get("event_type") == "session_end": | |
| session_durations.append(event.get("session_duration_seconds", 0)) | |
| # Count different engagement events | |
| engagement_score += 1 | |
| except Exception: | |
| pass | |
| # Factor in session duration | |
| if session_durations: | |
| avg_duration = sum(session_durations) / len(session_durations) | |
| engagement_score += min(avg_duration / 300, 2) # Cap at 2 points | |
| # Normalize score | |
| return min(engagement_score, 100) | |
| return 0.0 | |
| except Exception as e: | |
| logger.error(f"Error calculating engagement score: {e}") | |
| return 0.0 | |
| async def _calculate_avg_response_time(self) -> float: | |
| """Calculate average response time from recent data""" | |
| try: | |
| if self.redis: | |
| response_times = await self.redis.lrange("response_times", 0, 1000) | |
| if response_times: | |
| times = [float(rt) for rt in response_times] | |
| return sum(times) / len(times) | |
| return 0.0 | |
| except Exception as e: | |
| logger.error(f"Error calculating average response time: {e}") | |
| return 0.0 | |
| async def get_real_time_metrics(self) -> dict[str, Any]: | |
| """Get real-time business metrics""" | |
| try: | |
| if not self.redis: | |
| return {} | |
| current_time = datetime.utcnow() | |
| # Get metrics from last N hours | |
| real_time = { | |
| "timestamp": current_time.isoformat(), | |
| "active_users": await self.redis.get("active_users_realtime") or 0, | |
| "user_activities_last_hour": await self.redis.llen("user_activities", 0, 1000), | |
| "requests_last_hour": await self.redis.llen("request_metrics", 0, 1000), | |
| "engagement_events_last_hour": await self.redis.llen("engagement_metrics", 0, 1000), | |
| "conversions_last_hour": await self.redis.llen("conversion_metrics", 0, 1000), | |
| } | |
| # Calculate rates per hour | |
| real_time["requests_per_hour"] = real_time["requests_last_hour"] | |
| real_time["conversions_per_hour"] = real_time["conversions_last_hour"] | |
| real_time["engagement_events_per_hour"] = real_time["engagement_events_last_hour"] | |
| real_time["error_rate"] = self._calculate_real_time_error_rate() | |
| return real_time | |
| except Exception as e: | |
| logger.error(f"Error getting real-time metrics: {e}") | |
| return {"error": str(e)} | |
| async def _calculate_real_time_error_rate(self) -> float: | |
| """Calculate real-time error rate""" | |
| try: | |
| if not self.redis: | |
| return 0.0 | |
| requests = await self.redis.lrange("request_metrics", 0, 1000) | |
| if not requests: | |
| return 0.0 | |
| recent_requests = [json.loads(req) for req in requests] | |
| recent_errors = [req for req in recent_requests if json.loads(req).get("status_code", 200) >= 400] | |
| return (len(recent_errors) / len(recent_requests)) * 100 | |
| except Exception as e: | |
| logger.error(f"Error calculating real-time error rate: {e}") | |
| return 0.0 | |
| async def get_analytics_dashboard_data(self, time_range: str = "7d") -> dict[str, Any]: | |
| """Get data for analytics dashboard""" | |
| try: | |
| # Parse time range | |
| days = int(time_range.rstrip("d")) | |
| end_date = datetime.utcnow() | |
| start_date = end_date - timedelta(days=days) | |
| # Get metrics for the time range | |
| dashboard_data = { | |
| "time_range": time_range, | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat(), | |
| "summary": await self._get_time_range_summary(start_date, end_date), | |
| "user_metrics": await self._get_user_time_range_metrics(start_date, end_date), | |
| "revenue_metrics": await self._get_revenue_time_range_metrics(start_date, end_date), | |
| "conversion_metrics": await self._get_conversion_time_range_metrics(start_date, end_date), | |
| "performance_metrics": await self._get_performance_time_range_metrics(start_date, end_date), | |
| "engagement_metrics": await self._get_engagement_time_range_metrics(start_date, end_date), | |
| } | |
| return dashboard_data | |
| except Exception as e: | |
| logger.error(f"Error getting dashboard data: {e}") | |
| return {"error": str(e)} | |
| async def _get_time_range_summary(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get summary metrics for time range""" | |
| summary = { | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat(), | |
| "days": (end_date - start_date).days, | |
| } | |
| # Get KPIs for each day in range | |
| current_date = start_date | |
| total_metrics = {} | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_kpis = await self.redis.get(f"business_kpis:{date_str}") | |
| if day_kpis: | |
| kpis_data = json.loads(day_kpis) | |
| for key, value in kpis_data.items(): | |
| if key not in total_metrics: | |
| total_metrics[key] = [] | |
| total_metrics[key].append(value) | |
| current_date += timedelta(days=1) | |
| # Aggregate metrics | |
| for key, values in total_metrics.items(): | |
| if values: | |
| if key == "revenue": | |
| summary[key] = { | |
| "total": sum(v.get("revenue", {}).get("total", 0) for v in values), | |
| "transactions": sum(v.get("revenue", {}).get("transactions", 0) for v in values), | |
| "daily_average": sum(v.get("revenue", {}).get("total", 0) for v in values) / len(values) / 100, | |
| } | |
| elif key == "users": | |
| summary[key] = { | |
| "total_active": sum(v.get("users", {}).get("active", 0) for v in values), | |
| "total_new": sum(v.get("users", {}).get("new", 0) for v in values), | |
| "daily_average_active": sum(v.get("users", {}).get("active", 0) for v in values) / len(values), | |
| "daily_average_new": sum(v.get("users", {}).get("new", 0) for v in values) / len(values), | |
| } | |
| # Add other metrics as needed | |
| else: | |
| summary[key] = values[-1] if values else {} | |
| return summary | |
| async def _get_user_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get user metrics for time range""" | |
| user_metrics = {"new_users": 0, "active_users": 0, "returning_users": 0, "user_retention_rate": 0} | |
| current_date = start_date | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_metrics = await self.redis.hgetall(f"daily_metrics:{date_str}") | |
| if day_metrics: | |
| user_metrics["new_users"] += day_metrics.get("new_users", 0) | |
| user_metrics["active_users"] += day_metrics.get("active_users", 0) | |
| user_metrics["returning_users"] += day_metrics.get("returning_users", 0) | |
| current_date += timedelta(days=1) | |
| # Calculate retention rate for the period | |
| if user_metrics["new_users"] > 0: | |
| retention_sum = 0 | |
| retention_count = 0 | |
| current_date = start_date | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| retention_rate = await self._calculate_retention_rate(date_str) | |
| retention_sum += retention_rate | |
| retention_count += 1 | |
| current_date += timedelta(days=1) | |
| if retention_count > 0: | |
| user_metrics["user_retention_rate"] = retention_sum / retention_count | |
| return user_metrics | |
| async def _get_revenue_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get revenue metrics for time range""" | |
| revenue_metrics = { | |
| "total_revenue": 0, | |
| "total_transactions": 0, | |
| "average_transaction_value": 0, | |
| "daily_average": 0, | |
| } | |
| current_date = start_date | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_kpis = await self.redis.get(f"business_kpis:{date_str}") | |
| if day_kpis: | |
| kpis_data = json.loads(day_kpis) | |
| revenue = kpis_data.get("revenue", {}) | |
| revenue_metrics["total_revenue"] += revenue.get("total", 0) | |
| revenue_metrics["total_transactions"] += revenue.get("transactions", 0) | |
| revenue_metrics["daily_average"] += revenue.get("daily_average", 0) | |
| current_date += timedelta(days=1) | |
| if revenue_metrics["total_transactions"] > 0: | |
| revenue_metrics["average_transaction_value"] = ( | |
| revenue_metrics["total_revenue"] / revenue_metrics["total_transactions"] | |
| ) | |
| return revenue_metrics | |
| async def _get_conversion_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get conversion metrics for time range""" | |
| conversion_metrics = {"total_conversions": 0, "conversion_rate": 0, "conversions_by_type": {}} | |
| current_date = start_date | |
| total_visitors = 0 | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_kpis = await self.redis.get(f"business_kpis:{date_str}") | |
| if day_kpis: | |
| kpis_data = json.loads(day_kpis) | |
| conversions = kpis_data.get("conversions", {}) | |
| conversion_metrics["total_conversions"] += conversions.get("total", 0) | |
| conversion_metrics["conversions_by_type"] = conversions | |
| # Accumulate unique conversion types | |
| for conv_type, count in conversions.items(): | |
| if conv_type not in conversion_metrics["conversions_by_type"]: | |
| conversion_metrics["conversions_by_type"][conv_type] = count | |
| else: | |
| conversion_metrics["conversions_by_type"][conv_type] += count | |
| current_date += timedelta(days=1) | |
| # Calculate daily average conversion rate | |
| if total_visitors > 0: | |
| conversion_metrics["conversion_rate"] = conversion_metrics["total_conversions"] / total_visitors | |
| return conversion_metrics | |
| async def _get_performance_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get performance metrics for time range""" | |
| performance_metrics = {"total_requests": 0, "error_rate": 0, "server_error_rate": 0, "avg_response_time": 0} | |
| current_date = start_date | |
| response_times = [] | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_kpis = await self.redis.get(f"business_kpis:{date_str}") | |
| if day_kpis: | |
| kpis_data = json.loads(day_kpis) | |
| performance = kpis_data.get("performance", {}) | |
| performance_metrics["total_requests"] += performance.get("total_requests", 0) | |
| performance_metrics["error_rate"] = performance.get("error_rate", 0) | |
| performance_metrics["server_error_rate"] = performance.get("server_error_rate", 0) | |
| performance_metrics["avg_response_time"] = performance.get("avg_response_time_ms", 0) / 1000 | |
| # Collect response times for average calculation | |
| date_response_times = await self.redis.lrange(f"response_times:{date_str}", 0, 1000) | |
| if date_response_times: | |
| response_times.extend([float(rt) for rt in date_response_times]) | |
| current_date += timedelta(days=1) | |
| if response_times: | |
| performance_metrics["avg_response_time"] = sum(response_times) / len(response_times) | |
| # Calculate error rates | |
| if performance_metrics["total_requests"] > 0: | |
| performance_metrics["error_rate"] = ( | |
| (performance_metrics["error_sum"] + performance_metrics["server_error_sum"]) | |
| / performance_metrics["total_requests"] | |
| * 100 | |
| ) | |
| return performance_metrics | |
| async def _get_engagement_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]: | |
| """Get engagement metrics for time range""" | |
| engagement_metrics = {"total_events": 0, "engagement_score": 0, "avg_session_duration": 0} | |
| current_date = start_date | |
| engagement_scores = [] | |
| session_durations = [] | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| day_kpis = await self.redis.get(f"business_kpis:{date_str}") | |
| if day_kpis: | |
| kpis_data = json.loads(day_kpis) | |
| engagement = kpis_data.get("engagement", {}) | |
| engagement_metrics["total_events"] += engagement.get("total_events", 0) | |
| engagement_metrics["engagement_score"] += engagement.get("engagement_score", 0) | |
| engagement_metrics["avg_session_duration"] += engagement.get("avg_session_duration_seconds", 0) | |
| current_date += timedelta(days=1) | |
| if engagement_scores: | |
| engagement_metrics["engagement_score"] = sum(engagement_scores) / len(engagement_scores) | |
| if session_durations: | |
| engagement_metrics["avg_session_duration"] = sum(session_durations) / len(session_durations) | |
| return engagement_metrics | |
| class BusinessAnalyticsDashboard: | |
| """Business analytics dashboard""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.business_metrics = BusinessMetrics(redis_client) | |
| async def get_dashboard_data(self, time_range: str = "7d", metrics: list[str] = None) -> dict[str, Any]: | |
| """Get dashboard data""" | |
| try: | |
| data = { | |
| "dashboard_config": { | |
| "available_time_ranges": ["1d", "7d", "30d", "90d"], | |
| "available_metrics": [ | |
| "users", | |
| "revenue", | |
| "conversions", | |
| "engagement", | |
| "performance", | |
| "business_kpis", | |
| ], | |
| "default_time_range": "7d", | |
| }, | |
| "data": await self.business_metrics.get_analytics_dashboard_data(time_range), | |
| "real_time_metrics": await self.business_metrics.get_real_time_metrics(), | |
| "last_updated": datetime.utcnow().isoformat(), | |
| } | |
| return data | |
| except Exception as e: | |
| logger.error(f"Error getting dashboard data: {e}") | |
| return {"error": str(e)} | |
| async def generate_report(self, report_type: str, time_range: str = "7d") -> dict[str, Any]: | |
| """Generate business report""" | |
| try: | |
| if report_type == "executive": | |
| return await self._generate_executive_report(time_range) | |
| elif report_type == "detailed": | |
| return await self._generate_detailed_report(time_range) | |
| elif report_type == "performance": | |
| return await self._generate_performance_report(time_range) | |
| elif report_type == "revenue": | |
| return await self._generate_revenue_report(time_range) | |
| else: | |
| return {"error": "Unknown report type"} | |
| except Exception as e: | |
| logger.error(f"Error generating report: {e}") | |
| return {"error": str(e)} | |
| async def _generate_executive_report(self, time_range: str) -> dict[str, Any]: | |
| """Generate executive summary report""" | |
| try: | |
| dashboard_data = await self.get_dashboard_data(time_range) | |
| summary = dashboard_data.get("data", {}).get("summary", {}) | |
| report = { | |
| "report_type": "executive_summary", | |
| "time_range": time_range, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "key_metrics": { | |
| "total_revenue": summary.get("revenue", {}).get("total_revenue", 0), | |
| "total_users": summary.get("users", {}).get("total_new", 0), | |
| "conversion_rate": summary.get("conversions", {}).get("conversion_rate", 0), | |
| "engagement_score": summary.get("engagement", {}).get("engagement_score", 0), | |
| "avg_response_time": summary.get("performance", {}).get("avg_response_time_ms", 0), | |
| }, | |
| "trends": await self._calculate_trends(dashboard_data.get("data")), | |
| "recommendations": await self._generate_recommendations(dashboard_data), | |
| } | |
| return report | |
| except Exception as e: | |
| logger.error(f"Error generating executive report: {e}") | |
| return {"error": str(e)} | |
| async def _calculate_trends(self, data: dict[str, Any]) -> list[dict[str, Any]]: | |
| """Calculate business trends""" | |
| try: | |
| if not data or "data" not in data or "summary" not in data["data"]: | |
| return [] | |
| summary = data["data"]["summary"] | |
| trends = [] | |
| # Revenue trend | |
| revenue_summary = summary.get("revenue", {}) | |
| if revenue_summary.get("daily_average", 0) > 0: | |
| trends.append( | |
| { | |
| "metric": "revenue", | |
| "trend": self._calculate_trend(data, "revenue", "daily_average"), | |
| "description": f"Daily average revenue: ${revenue_summary['daily_average']:.2f}", | |
| "growth_rate": f"{summary['comparison_with_yesterday'].get('revenue_growth', 0):.1f}%", | |
| } | |
| ) | |
| # User growth trend | |
| user_summary = summary.get("users", {}) | |
| if user_summary.get("daily_average_new", 0) > 0: | |
| trends.append( | |
| { | |
| "metric": "user_growth", | |
| "trend": self._calculate_trend(data, "new_users", "daily_average_new"), | |
| "description": f"Daily new users: {user_summary['daily_average_new']: .0f}", | |
| "growth_rate": f"{summary['comparison_with_yesterday'].get('user_growth', 0):.1f}%", | |
| } | |
| ) | |
| # Conversion trend | |
| conversion_summary = summary.get("conversions", {}) | |
| if conversion_summary.get("conversion_rate", 0) > 0: | |
| trends.append( | |
| { | |
| "metric": "conversion_rate", | |
| "trend": self._calculate_trend(data, "conversion_rate", "conversion_rate"), | |
| "description": f"Conversion rate: {conversion_summary['conversion_rate']: .1f}%", | |
| } | |
| ) | |
| return trends | |
| except Exception as e: | |
| logger.error(f"Error calculating trends: {e}") | |
| return [] | |
| def _calculate_trend(self, data: dict[str, Any], metric: str, average_key: str) -> str: | |
| """Calculate trend direction""" | |
| try: | |
| if "summary" not in data["data"]: | |
| return "stable" | |
| current = data["data"]["summary"] | |
| comparison = current.get("comparison_with_yesterday", {}) | |
| current_value = current.get(metric, 0) | |
| previous_value = comparison.get(metric, 0) | |
| if current_value > previous_value: | |
| return "up" | |
| elif current_value < previous_value: | |
| return "down" | |
| else: | |
| return "stable" | |
| except Exception as e: | |
| logger.error(f"Error calculating trend: {e}") | |
| return "unknown" | |
| async def _generate_recommendations(self, dashboard_data: dict[str, Any]) -> list[str]: | |
| """Generate business recommendations""" | |
| recommendations = [] | |
| try: | |
| summary = dashboard_data["data"]["summary"] | |
| # Performance recommendations | |
| performance = summary.get("performance", {}) | |
| if performance.get("error_rate", 0) > 5: | |
| recommendations.append("Investigate and optimize high error rate") | |
| if performance.get("avg_response_time_ms", 0) > 1000: | |
| recommendations.append("Optimize backend performance for better user experience") | |
| # Conversion recommendations | |
| conversions = summary.get("conversions", {}) | |
| if conversions.get("conversion_rate", 0) < 2: | |
| recommendations.append("Optimize user onboarding and conversion funnel") | |
| # User engagement recommendations | |
| engagement = summary.get("engagement", {}) | |
| if engagement.get("engagement_score", 0) < 50: | |
| recommendations.append("Improve user engagement through feature enhancements") | |
| # Revenue recommendations | |
| revenue = summary.get("revenue", {}) | |
| if revenue.get("daily_average", 0) < 100: | |
| recommendations.append("Implement monetization strategies or pricing adjustments") | |
| return recommendations | |
| except Exception as e: | |
| logger.error(f"Error generating recommendations: {e}") | |
| return [] | |
| # Global business analytics instance | |
| business_analytics = BusinessAnalyticsDashboard() | |
| # Initialization function | |
| async def initialize_business_analytics(redis_client=None): | |
| """Initialize business analytics system""" | |
| analytics = BusinessAnalyticsDashboard(redis_client) | |
| logger.info("Business analytics system initialized") | |
| return analytics | |