Spaces:
Paused
Paused
| from fastapi import APIRouter, Depends, Query, HTTPException | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, func, cast, Date, and_ | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Order, Product, User | |
| router = APIRouter() | |
| async def get_sales_analytics( | |
| start_date: datetime = Query(default=None), | |
| end_date: datetime = Query(default=None), | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| if not start_date: | |
| start_date = datetime.now() - timedelta(days=30) | |
| if not end_date: | |
| end_date = datetime.now() | |
| # Build query conditions | |
| conditions = [ | |
| Order.created_at.between(start_date, end_date), | |
| Order.status.in_(['completed', 'delivered']) | |
| ] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Order.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| # Non-superusers can only see their branch's analytics | |
| conditions.append(Order.branch_id == current_user.branch_id) | |
| # Daily sales query | |
| stmt = select( | |
| cast(Order.created_at, Date).label('date'), | |
| func.sum(Order.total_amount).label('total_sales'), | |
| func.count().label('order_count') | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| cast(Order.created_at, Date) | |
| ).order_by( | |
| cast(Order.created_at, Date) | |
| ) | |
| result = await db.execute(stmt) | |
| daily_sales = result.all() | |
| # Calculate totals | |
| total_revenue = sum(day.total_sales for day in daily_sales) | |
| total_orders = sum(day.order_count for day in daily_sales) | |
| avg_order_value = total_revenue / total_orders if total_orders > 0 else 0 | |
| return { | |
| "daily_sales": [ | |
| {"date": day.date, "total_sales": day.total_sales, "order_count": day.order_count} | |
| for day in daily_sales | |
| ], | |
| "total_revenue": total_revenue, | |
| "total_orders": total_orders, | |
| "average_order_value": avg_order_value | |
| } | |
| async def get_product_analytics( | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Build base conditions | |
| conditions = [] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Product.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| conditions.append(Product.branch_id == current_user.branch_id) | |
| # Top selling products | |
| stmt = select( | |
| Product, | |
| func.sum(Order.total_amount).label('total_revenue'), | |
| func.count().label('total_orders') | |
| ).join( | |
| Order, Product.id == Order.id | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| Product.id | |
| ).order_by( | |
| func.sum(Order.total_amount).desc() | |
| ).limit(10) | |
| result = await db.execute(stmt) | |
| top_products = result.all() | |
| # Count total and low stock products | |
| total_products = await db.scalar( | |
| select(func.count()).select_from(Product).where(and_(*conditions)) | |
| ) | |
| low_stock_conditions = conditions + [Product.inventory_count < 10] | |
| low_stock_count = await db.scalar( | |
| select(func.count()).select_from(Product).where(and_(*low_stock_conditions)) | |
| ) | |
| return { | |
| "top_products": [ | |
| { | |
| "id": product.id, | |
| "name": product.name, | |
| "total_revenue": revenue, | |
| "total_orders": orders | |
| } | |
| for product, revenue, orders in top_products | |
| ], | |
| "total_products": total_products, | |
| "low_stock_products": low_stock_count | |
| } | |
| async def get_customer_analytics( | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Build base conditions | |
| conditions = [] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Order.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| conditions.append(Order.branch_id == current_user.branch_id) | |
| # Customer statistics | |
| stmt = select( | |
| User, | |
| func.sum(Order.total_amount).label('total_spent'), | |
| func.count().label('total_orders') | |
| ).join( | |
| Order, User.id == Order.customer_id | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| User.id | |
| ).order_by( | |
| func.sum(Order.total_amount).desc() | |
| ) | |
| result = await db.execute(stmt) | |
| customer_data = result.all() | |
| total_customers = len(customer_data) | |
| total_revenue = sum(spent for _, spent, _ in customer_data) | |
| avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0 | |
| # Customer segments | |
| segments = { | |
| "high_value": len([c for c, spent, _ in customer_data if spent > 1000]), | |
| "medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]), | |
| "low_value": len([c for c, spent, _ in customer_data if spent < 500]) | |
| } | |
| return { | |
| "total_customers": total_customers, | |
| "average_customer_value": avg_customer_value, | |
| "customer_segments": segments, | |
| "top_customers": [ | |
| { | |
| "id": customer.id, | |
| "email": customer.email, | |
| "total_spent": spent, | |
| "total_orders": orders | |
| } | |
| for customer, spent, orders in customer_data[:10] # Top 10 customers | |
| ] | |
| } | |
| async def get_dashboard_analytics( | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """Get a comprehensive dashboard with key metrics""" | |
| # Get last 30 days of sales data | |
| start_date = datetime.now() - timedelta(days=30) | |
| end_date = datetime.now() | |
| sales_data = await get_sales_analytics(start_date, end_date, branch_id, current_user, db) | |
| product_data = await get_product_analytics(branch_id, current_user, db) | |
| customer_data = await get_customer_analytics(branch_id, current_user, db) | |
| return { | |
| "sales_summary": { | |
| "total_revenue": sales_data["total_revenue"], | |
| "total_orders": sales_data["total_orders"], | |
| "average_order_value": sales_data["average_order_value"], | |
| "daily_sales": sales_data["daily_sales"][-7:] # Last 7 days | |
| }, | |
| "product_summary": { | |
| "total_products": product_data["total_products"], | |
| "low_stock_products": product_data["low_stock_products"], | |
| "top_selling_products": product_data["top_products"][:5] # Top 5 products | |
| }, | |
| "customer_summary": { | |
| "total_customers": customer_data["total_customers"], | |
| "average_customer_value": customer_data["average_customer_value"], | |
| "customer_segments": customer_data["customer_segments"] | |
| } | |
| } |