Desk-Back2 / app /services /analytics.py
Fred808's picture
Upload 32 files
8dafdf7 verified
from datetime import datetime, timedelta
from ..db.database import db
from ..utils.cache import cache
from typing import Dict, List, Any
class AnalyticsService:
@staticmethod
async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$match": {
"created_at": {
"$gte": start_date,
"$lte": end_date
},
"status": {"$in": ["completed", "delivered"]}
}
},
{
"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
"total_sales": {"$sum": "$total_amount"},
"order_count": {"$sum": 1}
}
},
{"$sort": {"_id": 1}}
]
sales_data = await db.db["orders"].aggregate(pipeline).to_list(None)
result = {
"daily_sales": sales_data,
"total_revenue": sum(day["total_sales"] for day in sales_data),
"total_orders": sum(day["order_count"] for day in sales_data),
"average_order_value": sum(day["total_sales"] for day in sales_data) /
(sum(day["order_count"] for day in sales_data) or 1)
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
@staticmethod
async def get_product_analytics() -> Dict[str, Any]:
cache_key = "product_analytics"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$unwind": "$products"
},
{
"$group": {
"_id": "$products.product_id",
"total_quantity": {"$sum": "$products.quantity"},
"total_revenue": {
"$sum": {
"$multiply": ["$products.price", "$products.quantity"]
}
}
}
},
{
"$sort": {"total_revenue": -1}
},
{
"$limit": 10
}
]
top_products = await db.db["orders"].aggregate(pipeline).to_list(None)
# Get product details
for product in top_products:
product_detail = await db.db["products"].find_one({"_id": product["_id"]})
if product_detail:
product["name"] = product_detail["name"]
product["category"] = product_detail["category"]
result = {
"top_products": top_products,
"total_products": await db.db["products"].count_documents({}),
"low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}})
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
@staticmethod
async def get_customer_analytics() -> Dict[str, Any]:
cache_key = "customer_analytics"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$group": {
"_id": "$customer_id",
"total_orders": {"$sum": 1},
"total_spent": {"$sum": "$total_amount"},
"last_order": {"$max": "$created_at"}
}
},
{
"$sort": {"total_spent": -1}
}
]
customer_data = await db.db["orders"].aggregate(pipeline).to_list(None)
result = {
"total_customers": len(customer_data),
"top_customers": customer_data[:10],
"average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1),
"customer_segments": {
"high_value": len([c for c in customer_data if c["total_spent"] > 1000]),
"medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]),
"low_value": len([c for c in customer_data if c["total_spent"] < 500])
}
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
analytics = AnalyticsService()