from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, cast, Date, and_, distinct, case from datetime import datetime, timedelta from typing import Dict, Any, Optional from ..core.dependencies import get_current_active_user from ..db.database import get_db from ..db.models import Order, Product, User, Brand, OrderItem router = APIRouter() @router.get("/sales") async def get_sales_analytics( start_date: Optional[str] = Query(None), end_date: Optional[str] = Query(None), branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ) -> Dict[str, Any]: # Parse dates or use last 30 days as default try: end = datetime.now() if not end_date else datetime.fromisoformat(end_date) start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date) except ValueError: # Handle invalid date format end = datetime.now() start = end - timedelta(days=30) # Build query conditions conditions = [ Order.created_at.between(start, end), Order.status.in_(['completed', 'delivered']) ] # Add branch filter if branch_id: if not current_user.is_superuser and branch_id != current_user.branch_id: raise HTTPException( status_code=403, detail="You can only view analytics from your own branch" ) conditions.append(Order.branch_id == branch_id) elif not current_user.is_superuser: conditions.append(Order.branch_id == current_user.branch_id) # Daily sales query stmt = select( cast(Order.created_at, Date).label('date'), func.sum(Order.total_amount).label('total_sales'), func.count().label('order_count') ).where( and_(*conditions) ).group_by( cast(Order.created_at, Date) ).order_by( cast(Order.created_at, Date) ) result = await db.execute(stmt) daily_sales = result.all() # Calculate totals total_revenue = sum(day.total_sales or 0 for day in daily_sales) total_orders = sum(day.order_count or 0 for day in daily_sales) avg_order_value = total_revenue / total_orders if total_orders > 0 else 0 return { "daily_sales": [ { "date": day.date.isoformat(), "total_sales": float(day.total_sales or 0), "order_count": day.order_count or 0 } for day in daily_sales ], "total_revenue": float(total_revenue), "total_orders": total_orders, "average_order_value": float(avg_order_value) } @router.get("/products") async def get_product_analytics( branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ) -> Dict[str, Any]: # Build base conditions conditions = [] # Add branch filter if branch_id: if not current_user.is_superuser and branch_id != current_user.branch_id: raise HTTPException( status_code=403, detail="You can only view analytics from your own branch" ) conditions.append(Product.branch_id == branch_id) elif not current_user.is_superuser: conditions.append(Product.branch_id == current_user.branch_id) # Top selling products stmt = select( Product, func.sum(OrderItem.price * OrderItem.quantity).label('total_revenue'), func.count().label('total_orders') ).join( OrderItem, Product.id == OrderItem.product_id ).join( Order, OrderItem.order_id == Order.id ).where( and_(*conditions, Order.status.in_(['completed', 'delivered'])) ).group_by( Product.id ).order_by( func.sum(OrderItem.price * OrderItem.quantity).desc() ).limit(10) result = await db.execute(stmt) top_products = result.all() # Count total and low stock products total_products = await db.scalar( select(func.count()).select_from(Product).where(and_(*conditions)) ) low_stock_conditions = conditions + [Product.inventory_count < 10] low_stock_count = await db.scalar( select(func.count()).select_from(Product).where(and_(*low_stock_conditions)) ) return { "top_products": [ { "id": product.id, "name": product.name, "category": product.category, "total_revenue": float(revenue or 0), "total_orders": int(orders or 0), "inventory_count": product.inventory_count } for product, revenue, orders in top_products ], "total_products": total_products or 0, "low_stock_products": low_stock_count or 0 } @router.get("/customers") async def get_customer_analytics( branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ) -> Dict[str, Any]: # Build base conditions conditions = [] # Add branch filter if branch_id: if not current_user.is_superuser and branch_id != current_user.branch_id: raise HTTPException( status_code=403, detail="You can only view analytics from your own branch" ) conditions.append(Order.branch_id == branch_id) elif not current_user.is_superuser: conditions.append(Order.branch_id == current_user.branch_id) # Customer statistics stmt = select( User, func.sum(Order.total_amount).label('total_spent'), func.count().label('total_orders') ).join( Order, User.id == Order.customer_id ).where( and_(*conditions) ).group_by( User.id ).order_by( func.sum(Order.total_amount).desc() ) result = await db.execute(stmt) customer_data = result.all() total_customers = len(customer_data) total_revenue = sum(spent for _, spent, _ in customer_data) avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0 # Customer segments segments = { "high_value": len([c for c, spent, _ in customer_data if spent > 1000]), "medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]), "low_value": len([c for c, spent, _ in customer_data if spent < 500]) } return { "total_customers": total_customers, "average_customer_value": avg_customer_value, "customer_segments": segments, "top_customers": [ { "id": customer.id, "email": customer.email, "total_spent": spent, "total_orders": orders } for customer, spent, orders in customer_data[:10] # Top 10 customers ] } @router.get("/dashboard") async def get_dashboard_metrics( start_date: Optional[str] = None, end_date: Optional[str] = None, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ) -> Dict[str, Any]: # Parse dates or use last 30 days as default end = datetime.now() if not end_date else datetime.fromisoformat(end_date) start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date) # Get current period metrics current_metrics = await get_period_metrics(db, start, end, current_user) # Get previous period metrics for comparison prev_start = start - timedelta(days=30) prev_end = end - timedelta(days=30) prev_metrics = await get_period_metrics(db, prev_start, prev_end, current_user) # Calculate deltas and trends metrics = { "revenue": { "total": current_metrics["revenue"], "delta": calculate_delta(current_metrics["revenue"], prev_metrics["revenue"]), "trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down" }, "orders": { "total": current_metrics["orders"], "delta": calculate_delta(current_metrics["orders"], prev_metrics["orders"]), "trend": "up" if current_metrics["orders"] > prev_metrics["orders"] else "down" }, "voids": { "total": current_metrics["voids"], "delta": calculate_delta(current_metrics["voids"], prev_metrics["voids"]), "trend": "up" if current_metrics["voids"] > prev_metrics["voids"] else "down" }, "stock": { "total": current_metrics["stock_total"], "items_below_threshold": current_metrics["stock_low"] } } # Get traffic sources data traffic_sources = { "direct": {"value": 45, "trend": "up", "delta": 12.5}, "social": {"value": 25, "trend": "down", "delta": -5.0}, "marketing": {"value": 20, "trend": "up", "delta": 8.2}, "affiliates": {"value": 10, "trend": "down", "delta": -2.1} } # Get financial overview overview = { "revenue": {"value": current_metrics["revenue"], "trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down"}, "expenses": {"value": current_metrics["expenses"], "trend": "up" if current_metrics["expenses"] > prev_metrics["expenses"] else "down"}, "investment": {"value": current_metrics["investment"]}, "savings": {"value": current_metrics["savings"]} } return { "metrics": metrics, "traffic_sources": traffic_sources, "overview": overview } @router.get("/brands") async def get_brands_analytics( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ) -> Dict[str, Any]: try: # Get brands with their associated metrics query = select( Brand, func.count(Product.id).label("product_count"), func.count(distinct(Product.branch_id)).label("store_count") ).outerjoin(Product).group_by(Brand.id) result = await db.execute(query) brands_data = result.all() # Calculate totals total_brands = len(brands_data) active_brands = sum(1 for row in brands_data if row.Brand.is_active) # Format brand details brands = [ { "name": row.Brand.name, "category": row.Brand.category, "established": row.Brand.established_date.date().isoformat() if row.Brand.established_date else None, "stores": int(row.store_count or 0), "products": int(row.product_count or 0), "status": "Active" if row.Brand.is_active else "Inactive" } for row in brands_data ] return { "total": total_brands, "active": active_brands, "brands": brands } except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to fetch brands analytics: {str(e)}" ) async def get_period_metrics( db: AsyncSession, start_date: datetime, end_date: datetime, current_user: User ) -> Dict[str, Any]: # Base query conditions conditions = [ Order.created_at.between(start_date, end_date) ] if not current_user.is_superuser: conditions.append(Order.branch_id == current_user.branch_id) # Get revenue and orders metrics metrics_query = select( func.sum(Order.total_amount).label("revenue"), func.count().label("orders"), func.sum(case((Order.status == "void", 1), else_=0)).label("voids"), func.sum(Order.expenses).label("expenses") ).where(and_(*conditions)) result = await db.execute(metrics_query) row = result.first() # Get stock metrics stock_query = select( func.count(Product.id).label("total"), func.sum(case((Product.stock_level < Product.reorder_threshold, 1), else_=0)).label("low_stock") ) stock_result = await db.execute(stock_query) stock_row = stock_result.first() return { "revenue": float(row.revenue or 0), "orders": int(row.orders or 0), "voids": int(row.voids or 0), "expenses": float(row.expenses or 0), "investment": 50000.00, # Example fixed value, replace with actual calculation "savings": 25000.00, # Example fixed value, replace with actual calculation "stock_total": int(stock_row.total or 0), "stock_low": int(stock_row.low_stock or 0) } def calculate_delta(current: float, previous: float) -> float: if previous == 0: return 100 if current > 0 else 0 return ((current - previous) / previous) * 100