Spaces:
Paused
Paused
| from datetime import datetime, timedelta | |
| from ..db.database import db | |
| from ..utils.cache import cache | |
| from typing import Dict, List, Any | |
| class AnalyticsService: | |
| async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]: | |
| cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| }, | |
| "status": {"$in": ["completed", "delivered"]} | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}}, | |
| "total_sales": {"$sum": "$total_amount"}, | |
| "order_count": {"$sum": 1} | |
| } | |
| }, | |
| {"$sort": {"_id": 1}} | |
| ] | |
| sales_data = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| result = { | |
| "daily_sales": sales_data, | |
| "total_revenue": sum(day["total_sales"] for day in sales_data), | |
| "total_orders": sum(day["order_count"] for day in sales_data), | |
| "average_order_value": sum(day["total_sales"] for day in sales_data) / | |
| (sum(day["order_count"] for day in sales_data) or 1) | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| async def get_product_analytics() -> Dict[str, Any]: | |
| cache_key = "product_analytics" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$unwind": "$products" | |
| }, | |
| { | |
| "$group": { | |
| "_id": "$products.product_id", | |
| "total_quantity": {"$sum": "$products.quantity"}, | |
| "total_revenue": { | |
| "$sum": { | |
| "$multiply": ["$products.price", "$products.quantity"] | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "$sort": {"total_revenue": -1} | |
| }, | |
| { | |
| "$limit": 10 | |
| } | |
| ] | |
| top_products = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| # Get product details | |
| for product in top_products: | |
| product_detail = await db.db["products"].find_one({"_id": product["_id"]}) | |
| if product_detail: | |
| product["name"] = product_detail["name"] | |
| product["category"] = product_detail["category"] | |
| result = { | |
| "top_products": top_products, | |
| "total_products": await db.db["products"].count_documents({}), | |
| "low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}}) | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| async def get_customer_analytics() -> Dict[str, Any]: | |
| cache_key = "customer_analytics" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$group": { | |
| "_id": "$customer_id", | |
| "total_orders": {"$sum": 1}, | |
| "total_spent": {"$sum": "$total_amount"}, | |
| "last_order": {"$max": "$created_at"} | |
| } | |
| }, | |
| { | |
| "$sort": {"total_spent": -1} | |
| } | |
| ] | |
| customer_data = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| result = { | |
| "total_customers": len(customer_data), | |
| "top_customers": customer_data[:10], | |
| "average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1), | |
| "customer_segments": { | |
| "high_value": len([c for c in customer_data if c["total_spent"] > 1000]), | |
| "medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]), | |
| "low_value": len([c for c in customer_data if c["total_spent"] < 500]) | |
| } | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| analytics = AnalyticsService() |