Spaces:
Paused
Paused
| from fastapi import APIRouter, Depends, Query, HTTPException | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, func, cast, Date, and_, distinct, case | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Order, Product, User, Brand, OrderItem | |
| router = APIRouter() | |
| async def get_sales_analytics( | |
| start_date: Optional[str] = Query(None), | |
| end_date: Optional[str] = Query(None), | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Parse dates or use last 30 days as default | |
| try: | |
| end = datetime.now() if not end_date else datetime.fromisoformat(end_date) | |
| start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date) | |
| except ValueError: | |
| # Handle invalid date format | |
| end = datetime.now() | |
| start = end - timedelta(days=30) | |
| # Build query conditions | |
| conditions = [ | |
| Order.created_at.between(start, end), | |
| Order.status.in_(['completed', 'delivered']) | |
| ] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Order.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| conditions.append(Order.branch_id == current_user.branch_id) | |
| # Daily sales query | |
| stmt = select( | |
| cast(Order.created_at, Date).label('date'), | |
| func.sum(Order.total_amount).label('total_sales'), | |
| func.count().label('order_count') | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| cast(Order.created_at, Date) | |
| ).order_by( | |
| cast(Order.created_at, Date) | |
| ) | |
| result = await db.execute(stmt) | |
| daily_sales = result.all() | |
| # Calculate totals | |
| total_revenue = sum(day.total_sales or 0 for day in daily_sales) | |
| total_orders = sum(day.order_count or 0 for day in daily_sales) | |
| avg_order_value = total_revenue / total_orders if total_orders > 0 else 0 | |
| return { | |
| "daily_sales": [ | |
| { | |
| "date": day.date.isoformat(), | |
| "total_sales": float(day.total_sales or 0), | |
| "order_count": day.order_count or 0 | |
| } | |
| for day in daily_sales | |
| ], | |
| "total_revenue": float(total_revenue), | |
| "total_orders": total_orders, | |
| "average_order_value": float(avg_order_value) | |
| } | |
| async def get_product_analytics( | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Build base conditions | |
| conditions = [] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Product.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| conditions.append(Product.branch_id == current_user.branch_id) | |
| # Top selling products | |
| stmt = select( | |
| Product, | |
| func.sum(OrderItem.price * OrderItem.quantity).label('total_revenue'), | |
| func.count().label('total_orders') | |
| ).join( | |
| OrderItem, Product.id == OrderItem.product_id | |
| ).join( | |
| Order, OrderItem.order_id == Order.id | |
| ).where( | |
| and_(*conditions, Order.status.in_(['completed', 'delivered'])) | |
| ).group_by( | |
| Product.id | |
| ).order_by( | |
| func.sum(OrderItem.price * OrderItem.quantity).desc() | |
| ).limit(10) | |
| result = await db.execute(stmt) | |
| top_products = result.all() | |
| # Count total and low stock products | |
| total_products = await db.scalar( | |
| select(func.count()).select_from(Product).where(and_(*conditions)) | |
| ) | |
| low_stock_conditions = conditions + [Product.inventory_count < 10] | |
| low_stock_count = await db.scalar( | |
| select(func.count()).select_from(Product).where(and_(*low_stock_conditions)) | |
| ) | |
| return { | |
| "top_products": [ | |
| { | |
| "id": product.id, | |
| "name": product.name, | |
| "category": product.category, | |
| "total_revenue": float(revenue or 0), | |
| "total_orders": int(orders or 0), | |
| "inventory_count": product.inventory_count | |
| } | |
| for product, revenue, orders in top_products | |
| ], | |
| "total_products": total_products or 0, | |
| "low_stock_products": low_stock_count or 0 | |
| } | |
| async def get_customer_analytics( | |
| branch_id: Optional[int] = Query(None, description="Filter analytics by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Build base conditions | |
| conditions = [] | |
| # Add branch filter | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view analytics from your own branch" | |
| ) | |
| conditions.append(Order.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| conditions.append(Order.branch_id == current_user.branch_id) | |
| # Customer statistics | |
| stmt = select( | |
| User, | |
| func.sum(Order.total_amount).label('total_spent'), | |
| func.count().label('total_orders') | |
| ).join( | |
| Order, User.id == Order.customer_id | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| User.id | |
| ).order_by( | |
| func.sum(Order.total_amount).desc() | |
| ) | |
| result = await db.execute(stmt) | |
| customer_data = result.all() | |
| total_customers = len(customer_data) | |
| total_revenue = sum(spent for _, spent, _ in customer_data) | |
| avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0 | |
| # Customer segments | |
| segments = { | |
| "high_value": len([c for c, spent, _ in customer_data if spent > 1000]), | |
| "medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]), | |
| "low_value": len([c for c, spent, _ in customer_data if spent < 500]) | |
| } | |
| return { | |
| "total_customers": total_customers, | |
| "average_customer_value": avg_customer_value, | |
| "customer_segments": segments, | |
| "top_customers": [ | |
| { | |
| "id": customer.id, | |
| "email": customer.email, | |
| "total_spent": spent, | |
| "total_orders": orders | |
| } | |
| for customer, spent, orders in customer_data[:10] # Top 10 customers | |
| ] | |
| } | |
| async def get_dashboard_metrics( | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| # Parse dates or use last 30 days as default | |
| end = datetime.now() if not end_date else datetime.fromisoformat(end_date) | |
| start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date) | |
| # Get current period metrics | |
| current_metrics = await get_period_metrics(db, start, end, current_user) | |
| # Get previous period metrics for comparison | |
| prev_start = start - timedelta(days=30) | |
| prev_end = end - timedelta(days=30) | |
| prev_metrics = await get_period_metrics(db, prev_start, prev_end, current_user) | |
| # Calculate deltas and trends | |
| metrics = { | |
| "revenue": { | |
| "total": current_metrics["revenue"], | |
| "delta": calculate_delta(current_metrics["revenue"], prev_metrics["revenue"]), | |
| "trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down" | |
| }, | |
| "orders": { | |
| "total": current_metrics["orders"], | |
| "delta": calculate_delta(current_metrics["orders"], prev_metrics["orders"]), | |
| "trend": "up" if current_metrics["orders"] > prev_metrics["orders"] else "down" | |
| }, | |
| "voids": { | |
| "total": current_metrics["voids"], | |
| "delta": calculate_delta(current_metrics["voids"], prev_metrics["voids"]), | |
| "trend": "up" if current_metrics["voids"] > prev_metrics["voids"] else "down" | |
| }, | |
| "stock": { | |
| "total": current_metrics["stock_total"], | |
| "items_below_threshold": current_metrics["stock_low"] | |
| } | |
| } | |
| # Get traffic sources data | |
| traffic_sources = { | |
| "direct": {"value": 45, "trend": "up", "delta": 12.5}, | |
| "social": {"value": 25, "trend": "down", "delta": -5.0}, | |
| "marketing": {"value": 20, "trend": "up", "delta": 8.2}, | |
| "affiliates": {"value": 10, "trend": "down", "delta": -2.1} | |
| } | |
| # Get financial overview | |
| overview = { | |
| "revenue": {"value": current_metrics["revenue"], "trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down"}, | |
| "expenses": {"value": current_metrics["expenses"], "trend": "up" if current_metrics["expenses"] > prev_metrics["expenses"] else "down"}, | |
| "investment": {"value": current_metrics["investment"]}, | |
| "savings": {"value": current_metrics["savings"]} | |
| } | |
| return { | |
| "metrics": metrics, | |
| "traffic_sources": traffic_sources, | |
| "overview": overview | |
| } | |
| async def get_brands_analytics( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| try: | |
| # Get brands with their associated metrics | |
| query = select( | |
| Brand, | |
| func.count(Product.id).label("product_count"), | |
| func.count(distinct(Product.branch_id)).label("store_count") | |
| ).outerjoin(Product).group_by(Brand.id) | |
| result = await db.execute(query) | |
| brands_data = result.all() | |
| # Calculate totals | |
| total_brands = len(brands_data) | |
| active_brands = sum(1 for row in brands_data if row.Brand.is_active) | |
| # Format brand details | |
| brands = [ | |
| { | |
| "name": row.Brand.name, | |
| "category": row.Brand.category, | |
| "established": row.Brand.established_date.date().isoformat() if row.Brand.established_date else None, | |
| "stores": int(row.store_count or 0), | |
| "products": int(row.product_count or 0), | |
| "status": "Active" if row.Brand.is_active else "Inactive" | |
| } | |
| for row in brands_data | |
| ] | |
| return { | |
| "total": total_brands, | |
| "active": active_brands, | |
| "brands": brands | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to fetch brands analytics: {str(e)}" | |
| ) | |
| async def get_period_metrics( | |
| db: AsyncSession, | |
| start_date: datetime, | |
| end_date: datetime, | |
| current_user: User | |
| ) -> Dict[str, Any]: | |
| # Base query conditions | |
| conditions = [ | |
| Order.created_at.between(start_date, end_date) | |
| ] | |
| if not current_user.is_superuser: | |
| conditions.append(Order.branch_id == current_user.branch_id) | |
| # Get revenue and orders metrics | |
| metrics_query = select( | |
| func.sum(Order.total_amount).label("revenue"), | |
| func.count().label("orders"), | |
| func.sum(case((Order.status == "void", 1), else_=0)).label("voids"), | |
| func.sum(Order.expenses).label("expenses") | |
| ).where(and_(*conditions)) | |
| result = await db.execute(metrics_query) | |
| row = result.first() | |
| # Get stock metrics | |
| stock_query = select( | |
| func.count(Product.id).label("total"), | |
| func.sum(case((Product.stock_level < Product.reorder_threshold, 1), else_=0)).label("low_stock") | |
| ) | |
| stock_result = await db.execute(stock_query) | |
| stock_row = stock_result.first() | |
| return { | |
| "revenue": float(row.revenue or 0), | |
| "orders": int(row.orders or 0), | |
| "voids": int(row.voids or 0), | |
| "expenses": float(row.expenses or 0), | |
| "investment": 50000.00, # Example fixed value, replace with actual calculation | |
| "savings": 25000.00, # Example fixed value, replace with actual calculation | |
| "stock_total": int(stock_row.total or 0), | |
| "stock_low": int(stock_row.low_stock or 0) | |
| } | |
| def calculate_delta(current: float, previous: float) -> float: | |
| if previous == 0: | |
| return 100 if current > 0 else 0 | |
| return ((current - previous) / previous) * 100 |