Admin-Desk2 / app /api /analytics.py
Fred808's picture
Update app/api/analytics.py
edded39 verified
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, cast, Date, and_, distinct, case
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Order, Product, User, Brand, OrderItem
router = APIRouter()
@router.get("/sales")
async def get_sales_analytics(
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Parse dates or use last 30 days as default
try:
end = datetime.now() if not end_date else datetime.fromisoformat(end_date)
start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date)
except ValueError:
# Handle invalid date format
end = datetime.now()
start = end - timedelta(days=30)
# Build query conditions
conditions = [
Order.created_at.between(start, end),
Order.status.in_(['completed', 'delivered'])
]
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Order.branch_id == branch_id)
elif not current_user.is_superuser:
conditions.append(Order.branch_id == current_user.branch_id)
# Daily sales query
stmt = select(
cast(Order.created_at, Date).label('date'),
func.sum(Order.total_amount).label('total_sales'),
func.count().label('order_count')
).where(
and_(*conditions)
).group_by(
cast(Order.created_at, Date)
).order_by(
cast(Order.created_at, Date)
)
result = await db.execute(stmt)
daily_sales = result.all()
# Calculate totals
total_revenue = sum(day.total_sales or 0 for day in daily_sales)
total_orders = sum(day.order_count or 0 for day in daily_sales)
avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
return {
"daily_sales": [
{
"date": day.date.isoformat(),
"total_sales": float(day.total_sales or 0),
"order_count": day.order_count or 0
}
for day in daily_sales
],
"total_revenue": float(total_revenue),
"total_orders": total_orders,
"average_order_value": float(avg_order_value)
}
@router.get("/products")
async def get_product_analytics(
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Build base conditions
conditions = []
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Product.branch_id == branch_id)
elif not current_user.is_superuser:
conditions.append(Product.branch_id == current_user.branch_id)
# Top selling products
stmt = select(
Product,
func.sum(OrderItem.price * OrderItem.quantity).label('total_revenue'),
func.count().label('total_orders')
).join(
OrderItem, Product.id == OrderItem.product_id
).join(
Order, OrderItem.order_id == Order.id
).where(
and_(*conditions, Order.status.in_(['completed', 'delivered']))
).group_by(
Product.id
).order_by(
func.sum(OrderItem.price * OrderItem.quantity).desc()
).limit(10)
result = await db.execute(stmt)
top_products = result.all()
# Count total and low stock products
total_products = await db.scalar(
select(func.count()).select_from(Product).where(and_(*conditions))
)
low_stock_conditions = conditions + [Product.inventory_count < 10]
low_stock_count = await db.scalar(
select(func.count()).select_from(Product).where(and_(*low_stock_conditions))
)
return {
"top_products": [
{
"id": product.id,
"name": product.name,
"category": product.category,
"total_revenue": float(revenue or 0),
"total_orders": int(orders or 0),
"inventory_count": product.inventory_count
}
for product, revenue, orders in top_products
],
"total_products": total_products or 0,
"low_stock_products": low_stock_count or 0
}
@router.get("/customers")
async def get_customer_analytics(
branch_id: Optional[int] = Query(None, description="Filter analytics by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Build base conditions
conditions = []
# Add branch filter
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view analytics from your own branch"
)
conditions.append(Order.branch_id == branch_id)
elif not current_user.is_superuser:
conditions.append(Order.branch_id == current_user.branch_id)
# Customer statistics
stmt = select(
User,
func.sum(Order.total_amount).label('total_spent'),
func.count().label('total_orders')
).join(
Order, User.id == Order.customer_id
).where(
and_(*conditions)
).group_by(
User.id
).order_by(
func.sum(Order.total_amount).desc()
)
result = await db.execute(stmt)
customer_data = result.all()
total_customers = len(customer_data)
total_revenue = sum(spent for _, spent, _ in customer_data)
avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0
# Customer segments
segments = {
"high_value": len([c for c, spent, _ in customer_data if spent > 1000]),
"medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]),
"low_value": len([c for c, spent, _ in customer_data if spent < 500])
}
return {
"total_customers": total_customers,
"average_customer_value": avg_customer_value,
"customer_segments": segments,
"top_customers": [
{
"id": customer.id,
"email": customer.email,
"total_spent": spent,
"total_orders": orders
}
for customer, spent, orders in customer_data[:10] # Top 10 customers
]
}
@router.get("/dashboard")
async def get_dashboard_metrics(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
# Parse dates or use last 30 days as default
end = datetime.now() if not end_date else datetime.fromisoformat(end_date)
start = (end - timedelta(days=30)) if not start_date else datetime.fromisoformat(start_date)
# Get current period metrics
current_metrics = await get_period_metrics(db, start, end, current_user)
# Get previous period metrics for comparison
prev_start = start - timedelta(days=30)
prev_end = end - timedelta(days=30)
prev_metrics = await get_period_metrics(db, prev_start, prev_end, current_user)
# Calculate deltas and trends
metrics = {
"revenue": {
"total": current_metrics["revenue"],
"delta": calculate_delta(current_metrics["revenue"], prev_metrics["revenue"]),
"trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down"
},
"orders": {
"total": current_metrics["orders"],
"delta": calculate_delta(current_metrics["orders"], prev_metrics["orders"]),
"trend": "up" if current_metrics["orders"] > prev_metrics["orders"] else "down"
},
"voids": {
"total": current_metrics["voids"],
"delta": calculate_delta(current_metrics["voids"], prev_metrics["voids"]),
"trend": "up" if current_metrics["voids"] > prev_metrics["voids"] else "down"
},
"stock": {
"total": current_metrics["stock_total"],
"items_below_threshold": current_metrics["stock_low"]
}
}
# Get traffic sources data
traffic_sources = {
"direct": {"value": 45, "trend": "up", "delta": 12.5},
"social": {"value": 25, "trend": "down", "delta": -5.0},
"marketing": {"value": 20, "trend": "up", "delta": 8.2},
"affiliates": {"value": 10, "trend": "down", "delta": -2.1}
}
# Get financial overview
overview = {
"revenue": {"value": current_metrics["revenue"], "trend": "up" if current_metrics["revenue"] > prev_metrics["revenue"] else "down"},
"expenses": {"value": current_metrics["expenses"], "trend": "up" if current_metrics["expenses"] > prev_metrics["expenses"] else "down"},
"investment": {"value": current_metrics["investment"]},
"savings": {"value": current_metrics["savings"]}
}
return {
"metrics": metrics,
"traffic_sources": traffic_sources,
"overview": overview
}
@router.get("/brands")
async def get_brands_analytics(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, Any]:
try:
# Get brands with their associated metrics
query = select(
Brand,
func.count(Product.id).label("product_count"),
func.count(distinct(Product.branch_id)).label("store_count")
).outerjoin(Product).group_by(Brand.id)
result = await db.execute(query)
brands_data = result.all()
# Calculate totals
total_brands = len(brands_data)
active_brands = sum(1 for row in brands_data if row.Brand.is_active)
# Format brand details
brands = [
{
"name": row.Brand.name,
"category": row.Brand.category,
"established": row.Brand.established_date.date().isoformat() if row.Brand.established_date else None,
"stores": int(row.store_count or 0),
"products": int(row.product_count or 0),
"status": "Active" if row.Brand.is_active else "Inactive"
}
for row in brands_data
]
return {
"total": total_brands,
"active": active_brands,
"brands": brands
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to fetch brands analytics: {str(e)}"
)
async def get_period_metrics(
db: AsyncSession,
start_date: datetime,
end_date: datetime,
current_user: User
) -> Dict[str, Any]:
# Base query conditions
conditions = [
Order.created_at.between(start_date, end_date)
]
if not current_user.is_superuser:
conditions.append(Order.branch_id == current_user.branch_id)
# Get revenue and orders metrics
metrics_query = select(
func.sum(Order.total_amount).label("revenue"),
func.count().label("orders"),
func.sum(case((Order.status == "void", 1), else_=0)).label("voids"),
func.sum(Order.expenses).label("expenses")
).where(and_(*conditions))
result = await db.execute(metrics_query)
row = result.first()
# Get stock metrics
stock_query = select(
func.count(Product.id).label("total"),
func.sum(case((Product.stock_level < Product.reorder_threshold, 1), else_=0)).label("low_stock")
)
stock_result = await db.execute(stock_query)
stock_row = stock_result.first()
return {
"revenue": float(row.revenue or 0),
"orders": int(row.orders or 0),
"voids": int(row.voids or 0),
"expenses": float(row.expenses or 0),
"investment": 50000.00, # Example fixed value, replace with actual calculation
"savings": 25000.00, # Example fixed value, replace with actual calculation
"stock_total": int(stock_row.total or 0),
"stock_low": int(stock_row.low_stock or 0)
}
def calculate_delta(current: float, previous: float) -> float:
if previous == 0:
return 100 if current > 0 else 0
return ((current - previous) / previous) * 100