from datetime import datetime, timedelta from ..db.database import db from ..utils.cache import cache from typing import Dict, List, Any class AnalyticsService: @staticmethod async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]: cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$match": { "created_at": { "$gte": start_date, "$lte": end_date }, "status": {"$in": ["completed", "delivered"]} } }, { "$group": { "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}}, "total_sales": {"$sum": "$total_amount"}, "order_count": {"$sum": 1} } }, {"$sort": {"_id": 1}} ] sales_data = await db.db["orders"].aggregate(pipeline).to_list(None) result = { "daily_sales": sales_data, "total_revenue": sum(day["total_sales"] for day in sales_data), "total_orders": sum(day["order_count"] for day in sales_data), "average_order_value": sum(day["total_sales"] for day in sales_data) / (sum(day["order_count"] for day in sales_data) or 1) } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result @staticmethod async def get_product_analytics() -> Dict[str, Any]: cache_key = "product_analytics" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$unwind": "$products" }, { "$group": { "_id": "$products.product_id", "total_quantity": {"$sum": "$products.quantity"}, "total_revenue": { "$sum": { "$multiply": ["$products.price", "$products.quantity"] } } } }, { "$sort": {"total_revenue": -1} }, { "$limit": 10 } ] top_products = await db.db["orders"].aggregate(pipeline).to_list(None) # Get product details for product in top_products: product_detail = await db.db["products"].find_one({"_id": product["_id"]}) if product_detail: product["name"] = product_detail["name"] product["category"] = product_detail["category"] result = { "top_products": top_products, "total_products": await db.db["products"].count_documents({}), "low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}}) } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result @staticmethod async def get_customer_analytics() -> Dict[str, Any]: cache_key = "customer_analytics" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$group": { "_id": "$customer_id", "total_orders": {"$sum": 1}, "total_spent": {"$sum": "$total_amount"}, "last_order": {"$max": "$created_at"} } }, { "$sort": {"total_spent": -1} } ] customer_data = await db.db["orders"].aggregate(pipeline).to_list(None) result = { "total_customers": len(customer_data), "top_customers": customer_data[:10], "average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1), "customer_segments": { "high_value": len([c for c in customer_data if c["total_spent"] > 1000]), "medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]), "low_value": len([c for c in customer_data if c["total_spent"] < 500]) } } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result analytics = AnalyticsService()