Admin-Desk / app /api /analytics.py
Fred808's picture
Upload 56 files
3e5c7dd verified
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, cast, Date, and_
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Order, Product, User
router = APIRouter()
@router.get("/sales")
async def get_sales_analytics(
start_date: datetime = Query(default=None),
end_date: datetime = Query(default=None),
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Build query conditions
conditions = [
Order.created_at.between(start_date, end_date),
Order.status.in_(['completed', 'delivered'])
]
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Order.branch_id == branch_id)
elif not current_user.is_superuser:
# Non-superusers can only see their branch's analytics
conditions.append(Order.branch_id == current_user.branch_id)
# Daily sales query
stmt = select(
cast(Order.created_at, Date).label('date'),
func.sum(Order.total_amount).label('total_sales'),
func.count().label('order_count')
).where(
and_(*conditions)
).group_by(
cast(Order.created_at, Date)
).order_by(
cast(Order.created_at, Date)
)
result = await db.execute(stmt)
daily_sales = result.all()
# Calculate totals
total_revenue = sum(day.total_sales for day in daily_sales)
total_orders = sum(day.order_count for day in daily_sales)
avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
return {
"daily_sales": [
{"date": day.date, "total_sales": day.total_sales, "order_count": day.order_count}
for day in daily_sales
],
"total_revenue": total_revenue,
"total_orders": total_orders,
"average_order_value": avg_order_value
}
@router.get("/products")
async def get_product_analytics(
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Build base conditions
conditions = []
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Product.branch_id == branch_id)
elif not current_user.is_superuser:
conditions.append(Product.branch_id == current_user.branch_id)
# Top selling products
stmt = select(
Product,
func.sum(Order.total_amount).label('total_revenue'),
func.count().label('total_orders')
).join(
Order, Product.id == Order.id
).where(
and_(*conditions)
).group_by(
Product.id
).order_by(
func.sum(Order.total_amount).desc()
).limit(10)
result = await db.execute(stmt)
top_products = result.all()
# Count total and low stock products
total_products = await db.scalar(
select(func.count()).select_from(Product).where(and_(*conditions))
)
low_stock_conditions = conditions + [Product.inventory_count < 10]
low_stock_count = await db.scalar(
select(func.count()).select_from(Product).where(and_(*low_stock_conditions))
)
return {
"top_products": [
{
"id": product.id,
"name": product.name,
"total_revenue": revenue,
"total_orders": orders
}
for product, revenue, orders in top_products
],
"total_products": total_products,
"low_stock_products": low_stock_count
}
@router.get("/customers")
async def get_customer_analytics(
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Build base conditions
conditions = []
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Order.branch_id == branch_id)
elif not current_user.is_superuser:
conditions.append(Order.branch_id == current_user.branch_id)
# Customer statistics
stmt = select(
User,
func.sum(Order.total_amount).label('total_spent'),
func.count().label('total_orders')
).join(
Order, User.id == Order.customer_id
).where(
and_(*conditions)
).group_by(
User.id
).order_by(
func.sum(Order.total_amount).desc()
)
result = await db.execute(stmt)
customer_data = result.all()
total_customers = len(customer_data)
total_revenue = sum(spent for _, spent, _ in customer_data)
avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0
# Customer segments
segments = {
"high_value": len([c for c, spent, _ in customer_data if spent > 1000]),
"medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]),
"low_value": len([c for c, spent, _ in customer_data if spent < 500])
}
return {
"total_customers": total_customers,
"average_customer_value": avg_customer_value,
"customer_segments": segments,
"top_customers": [
{
"id": customer.id,
"email": customer.email,
"total_spent": spent,
"total_orders": orders
}
for customer, spent, orders in customer_data[:10] # Top 10 customers
]
}
@router.get("/dashboard")
async def get_dashboard_analytics(
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
"""Get a comprehensive dashboard with key metrics"""
# Get last 30 days of sales data
start_date = datetime.now() - timedelta(days=30)
end_date = datetime.now()
sales_data = await get_sales_analytics(start_date, end_date, branch_id, current_user, db)
product_data = await get_product_analytics(branch_id, current_user, db)
customer_data = await get_customer_analytics(branch_id, current_user, db)
return {
"sales_summary": {
"total_revenue": sales_data["total_revenue"],
"total_orders": sales_data["total_orders"],
"average_order_value": sales_data["average_order_value"],
"daily_sales": sales_data["daily_sales"][-7:] # Last 7 days
},
"product_summary": {
"total_products": product_data["total_products"],
"low_stock_products": product_data["low_stock_products"],
"top_selling_products": product_data["top_products"][:5] # Top 5 products
},
"customer_summary": {
"total_customers": customer_data["total_customers"],
"average_customer_value": customer_data["average_customer_value"],
"customer_segments": customer_data["customer_segments"]
}
}