zenith-backend / app /infrastructure /business_analytics.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Business Analytics Dashboard Implementation
"""
import json
import logging
import os
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class MetricType(Enum):
USERS = "users"
REQUESTS = "requests"
REVENUE = "revenue"
CONVERSION = "conversion"
PERFORMANCE = "performance"
ENGAGEMENT = "engagement"
BUSINESS_KPI = "business_kpi"
class BusinessMetrics:
"""Business metrics collection and analysis"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.metrics_retention_days = int(os.getenv("METRICS_RETENTION_DAYS", "90"))
self.real_time_window_hours = int(os.getenv("REAL_TIME_WINDOW_HOURS", "24"))
self.daily_aggregation_enabled = os.getenv("DAILY_AGGREGATION", "true").lower() == "true"
self.weekly_report_enabled = os.getenv("WEEKLY_REPORT_ENABLED", "true").lower() == "true"
async def track_user_activity(self, user_id: str, action: str, metadata: dict[str, Any] = None):
"""Track user activity"""
try:
activity = {
"user_id": user_id,
"action": action,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
"session_id": metadata.get("session_id") if metadata else None,
}
# Store in Redis
if self.redis:
await self.redis.lpush("user_activities", json.dumps(activity))
await self.redis.expire(
f"user_activities:{user_id}", self.metrics_retention_days * 86400
) # Days to seconds
# Update real-time counters
await self.redis.hincrby("daily_user_actions", action, 1)
await self.redis.hincrby("real_time_metrics", "user_activities", 1)
logger.debug(f"User activity tracked: {user_id} - {action}")
except Exception as e:
logger.error(f"Error tracking user activity: {e}")
async def track_request_metrics(self, request_data: dict[str, Any]):
"""Track request metrics for business analytics"""
try:
timestamp = datetime.utcnow().isoformat()
# Extract key metrics
user_id = request_data.get("user_id")
endpoint = request_data.get("endpoint")
method = request_data.get("method", "GET")
status_code = request_data.get("status_code", 200)
response_time = request_data.get("response_time_ms", 0)
user_agent = request_data.get("user_agent")
ip_address = request_data.get("ip_address")
referer = request_data.get("referer")
content_type = request_data.get("content_type", "")
content_length = request_data.get("content_length", 0)
# Request categorization
category = self._categorize_request(endpoint, method, status_code)
metrics = {
"timestamp": timestamp,
"user_id": user_id,
"endpoint": endpoint,
"category": category,
"method": method,
"status_code": status_code,
"response_time_ms": response_time,
"is_success": 200 <= status_code < 300,
"user_agent": user_agent,
"ip_address": ip_address,
"referer": referer,
"content_type": content_type,
"content_length": content_length,
}
# Store in Redis
if self.redis:
await self.redis.lpush("request_metrics", json.dumps(metrics))
await self.redis.ltrim("request_metrics", 0, 10000) # Keep last 10K requests
# Update real-time counters
await self.redis.hincrby("daily_metrics", "requests", 1)
if status_code >= 400:
await self.redis.hincrby("daily_metrics", "errors", 1)
if status_code >= 500:
await self.redis.hincrby("daily_metrics", "server_errors", 1)
# Category-specific counters
await self.redis.hincrby("daily_metrics", f"requests_{category}", 1)
if status_code >= 400:
await self.redis.hincrby("daily_metrics", f"errors_{category}", 1)
# Response time tracking
if response_time > 0:
await self.redis.lpush("response_times", str(response_time))
await self.redis.ltrim("response_times", 0, 1000)
logger.debug(f"Request metrics tracked: {endpoint} - {status_code}")
except Exception as e:
logger.error(f"Error tracking request metrics: {e}")
def _categorize_request(self, endpoint: str, method: str, status_code: int) -> str:
"""Categorize request for analytics"""
endpoint_lower = endpoint.lower()
# API endpoints
if endpoint_lower.startswith("/api/auth"):
return "authentication"
elif endpoint_lower.startswith("/api/users"):
return "user_management"
elif endpoint_lower.startswith("/api/analytics"):
return "analytics"
elif endpoint_lower.startswith("/api/reports"):
return "reporting"
elif endpoint_lower.startswith("/api/admin"):
return "administration"
# Public pages
elif endpoint_lower in ["/", "/home", "/dashboard", "/login"]:
return "page_view"
elif endpoint_lower.startswith("/api/"):
if method == "GET":
return "api_read"
else:
return "api_write"
# Error categorization
if status_code >= 500:
return "server_error"
elif status_code >= 400:
return "client_error"
else:
return "success"
async def track_revenue_metrics(
self,
amount: float,
currency: str = "USD",
product_id: str = None,
user_id: str = None,
metadata: dict[str, Any] = None,
):
"""Track revenue metrics"""
try:
revenue = {
"amount": amount,
"currency": currency,
"product_id": product_id,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
# Store in Redis
if self.redis:
await self.redis.lpush("revenue_metrics", json.dumps(revenue))
await self.redis.ltrim("revenue_metrics", 0, 10000)
# Update revenue counters
await self.redis.hincrby("daily_metrics", "revenue_total", int(amount * 100)) # Store in cents
await self.redis.hincrby("daily_metrics", "transactions", 1)
# Currency-specific
await self.redis.hincrby("daily_metrics", f"revenue_{currency.lower()}", int(amount * 100))
logger.info(f"Revenue tracked: {amount} {currency}")
except Exception as e:
logger.error(f"Error tracking revenue: {e}")
async def track_conversion_metrics(
self, conversion_type: str, value: float, user_id: str = None, metadata: dict[str, Any] = None
):
"""Track conversion metrics"""
try:
conversion = {
"type": conversion_type,
"value": value,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
# Store in Redis
if self.redis:
await self.redis.lpush("conversion_metrics", json.dumps(conversion))
await self.redis.ltrim("conversion_metrics", 0, 10000)
# Update conversion counters
await self.redis.hincrby("daily_metrics", f"conversions_{conversion_type}", 1)
await self.redis.hincrby("daily_metrics", "conversions_total", 1)
logger.info(f"Conversion tracked: {conversion_type} - {value}")
except Exception as e:
logger.error(f"Error tracking conversion: {e}")
async def track_engagement_metrics(self, user_id: str, event_type: str, properties: dict[str, Any]):
"""Track user engagement metrics"""
try:
engagement = {
"user_id": user_id,
"event_type": event_type,
"properties": properties,
"timestamp": datetime.utcnow().isoformat(),
"session_duration_seconds": properties.get("session_duration", 0),
}
# Store in Redis
if self.redis:
await self.redis.lpush("engagement_metrics", json.dumps(engagement))
await self.redis.ltrim("engagement_metrics", 0, 10000)
# Update engagement counters
await self.redis.hincrby("daily_metrics", "engagement_events", 1)
# Event-specific tracking
await self.redis.hincrby("daily_metrics", f"engagement_{event_type}", 1)
logger.debug(f"Engagement tracked: {user_id} - {event_type}")
except Exception as e:
logger.error(f"Error tracking engagement: {e}")
async def _calculate_retention_rate(self, date: str) -> float:
"""Calculate retention rate"""
try:
# Get users who were active yesterday
yesterday_active = await self.redis.smembers(f"active_users:{date}")
if not yesterday_active:
return 100.0
# Check how many are still active today
today_active = await self.redis.smembers(f"active_users:{datetime.utcnow().strftime('%Y-%m-%d')}")
if not today_active:
return 0.0
return (len(today_active) / len(yesterday_active)) * 100
except Exception as e:
logger.error(f"Error calculating retention rate: {e}")
return 0.0
async def _calculate_conversion_rate(self, date: str) -> float:
"""Calculate conversion rate"""
try:
total_visitors = await self.redis.get(f"total_visitors:{date}")
total_conversions = await self.redis.get(f"total_conversions:{date}")
if total_visitors and total_conversions:
return (int(total_conversions) / int(total_visitors)) * 100
except Exception as e:
logger.error(f"Error calculating conversion rate: {e}")
return 0.0
async def _calculate_avg_session_duration(self, date: str) -> float:
"""Calculate average session duration"""
try:
# Get session durations for date
durations = await self.redis.lrange(f"session_durations:{date}", 0, 100)
if durations:
# Parse durations from Redis
duration_values = [float(d) for d in durations]
return sum(duration_values) / len(duration_values)
return 0.0
except Exception as e:
logger.error(f"Error calculating average session duration: {e}")
return 0.0
async def _calculate_engagement_score(self, date: str) -> float:
"""Calculate engagement score"""
try:
# Get engagement metrics for date
if self.redis:
events = await self.redis.lrange(f"engagement_metrics:{date}", 0, 1000)
if not events:
return 0.0
# Simple engagement score based on events and session duration
engagement_score = 0
session_durations = []
for event_json in events:
try:
event = json.loads(event_json)
if event.get("event_type") == "session_end":
session_durations.append(event.get("session_duration_seconds", 0))
# Count different engagement events
engagement_score += 1
except Exception:
pass
# Factor in session duration
if session_durations:
avg_duration = sum(session_durations) / len(session_durations)
engagement_score += min(avg_duration / 300, 2) # Cap at 2 points
# Normalize score
return min(engagement_score, 100)
return 0.0
except Exception as e:
logger.error(f"Error calculating engagement score: {e}")
return 0.0
async def _calculate_avg_response_time(self) -> float:
"""Calculate average response time from recent data"""
try:
if self.redis:
response_times = await self.redis.lrange("response_times", 0, 1000)
if response_times:
times = [float(rt) for rt in response_times]
return sum(times) / len(times)
return 0.0
except Exception as e:
logger.error(f"Error calculating average response time: {e}")
return 0.0
async def get_real_time_metrics(self) -> dict[str, Any]:
"""Get real-time business metrics"""
try:
if not self.redis:
return {}
current_time = datetime.utcnow()
# Get metrics from last N hours
real_time = {
"timestamp": current_time.isoformat(),
"active_users": await self.redis.get("active_users_realtime") or 0,
"user_activities_last_hour": await self.redis.llen("user_activities", 0, 1000),
"requests_last_hour": await self.redis.llen("request_metrics", 0, 1000),
"engagement_events_last_hour": await self.redis.llen("engagement_metrics", 0, 1000),
"conversions_last_hour": await self.redis.llen("conversion_metrics", 0, 1000),
}
# Calculate rates per hour
real_time["requests_per_hour"] = real_time["requests_last_hour"]
real_time["conversions_per_hour"] = real_time["conversions_last_hour"]
real_time["engagement_events_per_hour"] = real_time["engagement_events_last_hour"]
real_time["error_rate"] = self._calculate_real_time_error_rate()
return real_time
except Exception as e:
logger.error(f"Error getting real-time metrics: {e}")
return {"error": str(e)}
async def _calculate_real_time_error_rate(self) -> float:
"""Calculate real-time error rate"""
try:
if not self.redis:
return 0.0
requests = await self.redis.lrange("request_metrics", 0, 1000)
if not requests:
return 0.0
recent_requests = [json.loads(req) for req in requests]
recent_errors = [req for req in recent_requests if json.loads(req).get("status_code", 200) >= 400]
return (len(recent_errors) / len(recent_requests)) * 100
except Exception as e:
logger.error(f"Error calculating real-time error rate: {e}")
return 0.0
async def get_analytics_dashboard_data(self, time_range: str = "7d") -> dict[str, Any]:
"""Get data for analytics dashboard"""
try:
# Parse time range
days = int(time_range.rstrip("d"))
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
# Get metrics for the time range
dashboard_data = {
"time_range": time_range,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"summary": await self._get_time_range_summary(start_date, end_date),
"user_metrics": await self._get_user_time_range_metrics(start_date, end_date),
"revenue_metrics": await self._get_revenue_time_range_metrics(start_date, end_date),
"conversion_metrics": await self._get_conversion_time_range_metrics(start_date, end_date),
"performance_metrics": await self._get_performance_time_range_metrics(start_date, end_date),
"engagement_metrics": await self._get_engagement_time_range_metrics(start_date, end_date),
}
return dashboard_data
except Exception as e:
logger.error(f"Error getting dashboard data: {e}")
return {"error": str(e)}
async def _get_time_range_summary(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get summary metrics for time range"""
summary = {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"days": (end_date - start_date).days,
}
# Get KPIs for each day in range
current_date = start_date
total_metrics = {}
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_kpis = await self.redis.get(f"business_kpis:{date_str}")
if day_kpis:
kpis_data = json.loads(day_kpis)
for key, value in kpis_data.items():
if key not in total_metrics:
total_metrics[key] = []
total_metrics[key].append(value)
current_date += timedelta(days=1)
# Aggregate metrics
for key, values in total_metrics.items():
if values:
if key == "revenue":
summary[key] = {
"total": sum(v.get("revenue", {}).get("total", 0) for v in values),
"transactions": sum(v.get("revenue", {}).get("transactions", 0) for v in values),
"daily_average": sum(v.get("revenue", {}).get("total", 0) for v in values) / len(values) / 100,
}
elif key == "users":
summary[key] = {
"total_active": sum(v.get("users", {}).get("active", 0) for v in values),
"total_new": sum(v.get("users", {}).get("new", 0) for v in values),
"daily_average_active": sum(v.get("users", {}).get("active", 0) for v in values) / len(values),
"daily_average_new": sum(v.get("users", {}).get("new", 0) for v in values) / len(values),
}
# Add other metrics as needed
else:
summary[key] = values[-1] if values else {}
return summary
async def _get_user_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get user metrics for time range"""
user_metrics = {"new_users": 0, "active_users": 0, "returning_users": 0, "user_retention_rate": 0}
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_metrics = await self.redis.hgetall(f"daily_metrics:{date_str}")
if day_metrics:
user_metrics["new_users"] += day_metrics.get("new_users", 0)
user_metrics["active_users"] += day_metrics.get("active_users", 0)
user_metrics["returning_users"] += day_metrics.get("returning_users", 0)
current_date += timedelta(days=1)
# Calculate retention rate for the period
if user_metrics["new_users"] > 0:
retention_sum = 0
retention_count = 0
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
retention_rate = await self._calculate_retention_rate(date_str)
retention_sum += retention_rate
retention_count += 1
current_date += timedelta(days=1)
if retention_count > 0:
user_metrics["user_retention_rate"] = retention_sum / retention_count
return user_metrics
async def _get_revenue_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get revenue metrics for time range"""
revenue_metrics = {
"total_revenue": 0,
"total_transactions": 0,
"average_transaction_value": 0,
"daily_average": 0,
}
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_kpis = await self.redis.get(f"business_kpis:{date_str}")
if day_kpis:
kpis_data = json.loads(day_kpis)
revenue = kpis_data.get("revenue", {})
revenue_metrics["total_revenue"] += revenue.get("total", 0)
revenue_metrics["total_transactions"] += revenue.get("transactions", 0)
revenue_metrics["daily_average"] += revenue.get("daily_average", 0)
current_date += timedelta(days=1)
if revenue_metrics["total_transactions"] > 0:
revenue_metrics["average_transaction_value"] = (
revenue_metrics["total_revenue"] / revenue_metrics["total_transactions"]
)
return revenue_metrics
async def _get_conversion_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get conversion metrics for time range"""
conversion_metrics = {"total_conversions": 0, "conversion_rate": 0, "conversions_by_type": {}}
current_date = start_date
total_visitors = 0
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_kpis = await self.redis.get(f"business_kpis:{date_str}")
if day_kpis:
kpis_data = json.loads(day_kpis)
conversions = kpis_data.get("conversions", {})
conversion_metrics["total_conversions"] += conversions.get("total", 0)
conversion_metrics["conversions_by_type"] = conversions
# Accumulate unique conversion types
for conv_type, count in conversions.items():
if conv_type not in conversion_metrics["conversions_by_type"]:
conversion_metrics["conversions_by_type"][conv_type] = count
else:
conversion_metrics["conversions_by_type"][conv_type] += count
current_date += timedelta(days=1)
# Calculate daily average conversion rate
if total_visitors > 0:
conversion_metrics["conversion_rate"] = conversion_metrics["total_conversions"] / total_visitors
return conversion_metrics
async def _get_performance_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get performance metrics for time range"""
performance_metrics = {"total_requests": 0, "error_rate": 0, "server_error_rate": 0, "avg_response_time": 0}
current_date = start_date
response_times = []
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_kpis = await self.redis.get(f"business_kpis:{date_str}")
if day_kpis:
kpis_data = json.loads(day_kpis)
performance = kpis_data.get("performance", {})
performance_metrics["total_requests"] += performance.get("total_requests", 0)
performance_metrics["error_rate"] = performance.get("error_rate", 0)
performance_metrics["server_error_rate"] = performance.get("server_error_rate", 0)
performance_metrics["avg_response_time"] = performance.get("avg_response_time_ms", 0) / 1000
# Collect response times for average calculation
date_response_times = await self.redis.lrange(f"response_times:{date_str}", 0, 1000)
if date_response_times:
response_times.extend([float(rt) for rt in date_response_times])
current_date += timedelta(days=1)
if response_times:
performance_metrics["avg_response_time"] = sum(response_times) / len(response_times)
# Calculate error rates
if performance_metrics["total_requests"] > 0:
performance_metrics["error_rate"] = (
(performance_metrics["error_sum"] + performance_metrics["server_error_sum"])
/ performance_metrics["total_requests"]
* 100
)
return performance_metrics
async def _get_engagement_time_range_metrics(self, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""Get engagement metrics for time range"""
engagement_metrics = {"total_events": 0, "engagement_score": 0, "avg_session_duration": 0}
current_date = start_date
engagement_scores = []
session_durations = []
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
day_kpis = await self.redis.get(f"business_kpis:{date_str}")
if day_kpis:
kpis_data = json.loads(day_kpis)
engagement = kpis_data.get("engagement", {})
engagement_metrics["total_events"] += engagement.get("total_events", 0)
engagement_metrics["engagement_score"] += engagement.get("engagement_score", 0)
engagement_metrics["avg_session_duration"] += engagement.get("avg_session_duration_seconds", 0)
current_date += timedelta(days=1)
if engagement_scores:
engagement_metrics["engagement_score"] = sum(engagement_scores) / len(engagement_scores)
if session_durations:
engagement_metrics["avg_session_duration"] = sum(session_durations) / len(session_durations)
return engagement_metrics
class BusinessAnalyticsDashboard:
"""Business analytics dashboard"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.business_metrics = BusinessMetrics(redis_client)
async def get_dashboard_data(self, time_range: str = "7d", metrics: list[str] = None) -> dict[str, Any]:
"""Get dashboard data"""
try:
data = {
"dashboard_config": {
"available_time_ranges": ["1d", "7d", "30d", "90d"],
"available_metrics": [
"users",
"revenue",
"conversions",
"engagement",
"performance",
"business_kpis",
],
"default_time_range": "7d",
},
"data": await self.business_metrics.get_analytics_dashboard_data(time_range),
"real_time_metrics": await self.business_metrics.get_real_time_metrics(),
"last_updated": datetime.utcnow().isoformat(),
}
return data
except Exception as e:
logger.error(f"Error getting dashboard data: {e}")
return {"error": str(e)}
async def generate_report(self, report_type: str, time_range: str = "7d") -> dict[str, Any]:
"""Generate business report"""
try:
if report_type == "executive":
return await self._generate_executive_report(time_range)
elif report_type == "detailed":
return await self._generate_detailed_report(time_range)
elif report_type == "performance":
return await self._generate_performance_report(time_range)
elif report_type == "revenue":
return await self._generate_revenue_report(time_range)
else:
return {"error": "Unknown report type"}
except Exception as e:
logger.error(f"Error generating report: {e}")
return {"error": str(e)}
async def _generate_executive_report(self, time_range: str) -> dict[str, Any]:
"""Generate executive summary report"""
try:
dashboard_data = await self.get_dashboard_data(time_range)
summary = dashboard_data.get("data", {}).get("summary", {})
report = {
"report_type": "executive_summary",
"time_range": time_range,
"generated_at": datetime.utcnow().isoformat(),
"key_metrics": {
"total_revenue": summary.get("revenue", {}).get("total_revenue", 0),
"total_users": summary.get("users", {}).get("total_new", 0),
"conversion_rate": summary.get("conversions", {}).get("conversion_rate", 0),
"engagement_score": summary.get("engagement", {}).get("engagement_score", 0),
"avg_response_time": summary.get("performance", {}).get("avg_response_time_ms", 0),
},
"trends": await self._calculate_trends(dashboard_data.get("data")),
"recommendations": await self._generate_recommendations(dashboard_data),
}
return report
except Exception as e:
logger.error(f"Error generating executive report: {e}")
return {"error": str(e)}
async def _calculate_trends(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Calculate business trends"""
try:
if not data or "data" not in data or "summary" not in data["data"]:
return []
summary = data["data"]["summary"]
trends = []
# Revenue trend
revenue_summary = summary.get("revenue", {})
if revenue_summary.get("daily_average", 0) > 0:
trends.append(
{
"metric": "revenue",
"trend": self._calculate_trend(data, "revenue", "daily_average"),
"description": f"Daily average revenue: ${revenue_summary['daily_average']:.2f}",
"growth_rate": f"{summary['comparison_with_yesterday'].get('revenue_growth', 0):.1f}%",
}
)
# User growth trend
user_summary = summary.get("users", {})
if user_summary.get("daily_average_new", 0) > 0:
trends.append(
{
"metric": "user_growth",
"trend": self._calculate_trend(data, "new_users", "daily_average_new"),
"description": f"Daily new users: {user_summary['daily_average_new']: .0f}",
"growth_rate": f"{summary['comparison_with_yesterday'].get('user_growth', 0):.1f}%",
}
)
# Conversion trend
conversion_summary = summary.get("conversions", {})
if conversion_summary.get("conversion_rate", 0) > 0:
trends.append(
{
"metric": "conversion_rate",
"trend": self._calculate_trend(data, "conversion_rate", "conversion_rate"),
"description": f"Conversion rate: {conversion_summary['conversion_rate']: .1f}%",
}
)
return trends
except Exception as e:
logger.error(f"Error calculating trends: {e}")
return []
def _calculate_trend(self, data: dict[str, Any], metric: str, average_key: str) -> str:
"""Calculate trend direction"""
try:
if "summary" not in data["data"]:
return "stable"
current = data["data"]["summary"]
comparison = current.get("comparison_with_yesterday", {})
current_value = current.get(metric, 0)
previous_value = comparison.get(metric, 0)
if current_value > previous_value:
return "up"
elif current_value < previous_value:
return "down"
else:
return "stable"
except Exception as e:
logger.error(f"Error calculating trend: {e}")
return "unknown"
async def _generate_recommendations(self, dashboard_data: dict[str, Any]) -> list[str]:
"""Generate business recommendations"""
recommendations = []
try:
summary = dashboard_data["data"]["summary"]
# Performance recommendations
performance = summary.get("performance", {})
if performance.get("error_rate", 0) > 5:
recommendations.append("Investigate and optimize high error rate")
if performance.get("avg_response_time_ms", 0) > 1000:
recommendations.append("Optimize backend performance for better user experience")
# Conversion recommendations
conversions = summary.get("conversions", {})
if conversions.get("conversion_rate", 0) < 2:
recommendations.append("Optimize user onboarding and conversion funnel")
# User engagement recommendations
engagement = summary.get("engagement", {})
if engagement.get("engagement_score", 0) < 50:
recommendations.append("Improve user engagement through feature enhancements")
# Revenue recommendations
revenue = summary.get("revenue", {})
if revenue.get("daily_average", 0) < 100:
recommendations.append("Implement monetization strategies or pricing adjustments")
return recommendations
except Exception as e:
logger.error(f"Error generating recommendations: {e}")
return []
# Global business analytics instance
business_analytics = BusinessAnalyticsDashboard()
# Initialization function
async def initialize_business_analytics(redis_client=None):
"""Initialize business analytics system"""
analytics = BusinessAnalyticsDashboard(redis_client)
logger.info("Business analytics system initialized")
return analytics