from datetime import datetime, timedelta from sqlalchemy import func, and_, select, extract from ..db.database import db from ..utils.cache import cache from ..db.models import StaffActivity, PerformanceMetric, User, ActivityType from typing import Dict, List, Any, Optional import numpy as np from collections import defaultdict class AnalyticsService: @staticmethod async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]: cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$match": { "created_at": { "$gte": start_date, "$lte": end_date }, "status": {"$in": ["completed", "delivered"]} } }, { "$group": { "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}}, "total_sales": {"$sum": "$total_amount"}, "order_count": {"$sum": 1} } }, {"$sort": {"_id": 1}} ] sales_data = await db.db["orders"].aggregate(pipeline).to_list(None) result = { "daily_sales": sales_data, "total_revenue": sum(day["total_sales"] for day in sales_data), "total_orders": sum(day["order_count"] for day in sales_data), "average_order_value": sum(day["total_sales"] for day in sales_data) / (sum(day["order_count"] for day in sales_data) or 1) } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result @staticmethod async def get_product_analytics() -> Dict[str, Any]: cache_key = "product_analytics" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$unwind": "$products" }, { "$group": { "_id": "$products.product_id", "total_quantity": {"$sum": "$products.quantity"}, "total_revenue": { "$sum": { "$multiply": ["$products.price", "$products.quantity"] } } } }, { "$sort": {"total_revenue": -1} }, { "$limit": 10 } ] top_products = await db.db["orders"].aggregate(pipeline).to_list(None) # Get product details for product in top_products: product_detail = await db.db["products"].find_one({"_id": product["_id"]}) if product_detail: product["name"] = product_detail["name"] product["category"] = product_detail["category"] result = { "top_products": top_products, "total_products": await db.db["products"].count_documents({}), "low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}}) } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result @staticmethod async def get_customer_analytics() -> Dict[str, Any]: cache_key = "customer_analytics" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data pipeline = [ { "$group": { "_id": "$customer_id", "total_orders": {"$sum": 1}, "total_spent": {"$sum": "$total_amount"}, "last_order": {"$max": "$created_at"} } }, { "$sort": {"total_spent": -1} } ] customer_data = await db.db["orders"].aggregate(pipeline).to_list(None) result = { "total_customers": len(customer_data), "top_customers": customer_data[:10], "average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1), "customer_segments": { "high_value": len([c for c in customer_data if c["total_spent"] > 1000]), "medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]), "low_value": len([c for c in customer_data if c["total_spent"] < 500]) } } await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour return result class StaffAnalyticsService: @staticmethod async def record_activity( user_id: int, branch_id: int, activity_type: ActivityType, details: dict, duration: Optional[float] = None ) -> StaffActivity: """Record a staff activity with performance scoring and notifications""" # Calculate performance score based on activity type and details score = None if activity_type == ActivityType.SALE: # Score based on sale amount and speed amount = details.get('amount', 0) duration = details.get('duration', 0) # duration in minutes if duration > 0: score = min((amount / 100) * (5 / duration), 10) # Max score of 10 elif activity_type == ActivityType.CUSTOMER_SERVICE: # Score based on interaction quality satisfaction = details.get('customer_satisfaction', 0) resolution_time = details.get('resolution_time', 0) if resolution_time > 0: score = min((satisfaction * 2) * (10 / resolution_time), 10) async with db() as session: activity = StaffActivity( user_id=user_id, branch_id=branch_id, activity_type=activity_type, details=details, duration=duration, performance_score=score ) session.add(activity) await session.commit() await session.refresh(activity) # Get current metrics before update prev_metrics = await StaffAnalyticsService._get_current_metrics(user_id, branch_id) # Update daily performance metrics new_metrics = await StaffAnalyticsService._update_performance_metrics( user_id, branch_id, activity ) return activity, prev_metrics, new_metrics @staticmethod async def _get_current_metrics(user_id: int, branch_id: int) -> Optional[Dict[str, Any]]: """Get current day's metrics for a user""" today = datetime.utcnow().date() async with db() as session: stmt = select(PerformanceMetric).where( and_( PerformanceMetric.user_id == user_id, PerformanceMetric.branch_id == branch_id, func.date(PerformanceMetric.metric_date) == today ) ) result = await session.execute(stmt) metric = result.scalar_one_or_none() if metric: return { "total_sales": metric.total_sales, "transaction_count": metric.transaction_count, "void_count": metric.void_count, "efficiency_score": metric.efficiency_score, "customer_interaction_count": metric.customer_interaction_count } return None @staticmethod async def _update_performance_metrics( user_id: int, branch_id: int, activity: StaffActivity ) -> Dict[str, Any]: """Update daily performance metrics based on new activity""" today = datetime.utcnow().date() async with db() as session: # Get or create today's metrics stmt = select(PerformanceMetric).where( and_( PerformanceMetric.user_id == user_id, PerformanceMetric.branch_id == branch_id, func.date(PerformanceMetric.metric_date) == today ) ) result = await session.execute(stmt) metric = result.scalar_one_or_none() if not metric: metric = PerformanceMetric( user_id=user_id, branch_id=branch_id, metric_date=datetime.utcnow() ) session.add(metric) # Update metrics based on activity type if activity.activity_type == ActivityType.SALE: metric.total_sales += activity.details.get('amount', 0) metric.transaction_count += 1 metric.average_transaction_value = metric.total_sales / metric.transaction_count elif activity.activity_type == ActivityType.VOID: metric.void_count += 1 elif activity.activity_type == ActivityType.CUSTOMER_SERVICE: metric.customer_interaction_count += 1 elif activity.activity_type in [ActivityType.LOGIN, ActivityType.LOGOUT]: if activity.duration: metric.login_time += activity.duration # Calculate efficiency score # Weight different factors in the score weights = { 'sales': 0.4, 'speed': 0.2, 'accuracy': 0.2, 'customer_service': 0.2 } # Calculate component scores sales_score = min((metric.total_sales / 1000) * 10, 10) # Scale sales to 0-10 speed_score = 10 * (1 - (metric.login_time / (8 * 60))) # Assuming 8-hour day accuracy_score = 10 * (1 - (metric.void_count / max(metric.transaction_count, 1))) cs_score = min((metric.customer_interaction_count / 10) * 10, 10) metric.efficiency_score = ( (sales_score * weights['sales']) + (speed_score * weights['speed']) + (accuracy_score * weights['accuracy']) + (cs_score * weights['customer_service']) ) await session.commit() await session.refresh(metric) return { "total_sales": metric.total_sales, "transaction_count": metric.transaction_count, "average_transaction_value": metric.average_transaction_value, "void_count": metric.void_count, "customer_interaction_count": metric.customer_interaction_count, "login_time": metric.login_time, "efficiency_score": metric.efficiency_score } @staticmethod async def get_staff_performance( branch_id: Optional[int] = None, user_id: Optional[int] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Get comprehensive staff performance metrics""" if not start_date: start_date = datetime.utcnow() - timedelta(days=30) if not end_date: end_date = datetime.utcnow() cache_key = f"staff_performance:{branch_id}:{user_id}:{start_date.date()}:{end_date.date()}" cached_data = await cache.get_cache(cache_key) if cached_data: return cached_data async with db() as session: # Base query conditions conditions = [ PerformanceMetric.metric_date.between(start_date, end_date) ] if branch_id: conditions.append(PerformanceMetric.branch_id == branch_id) if user_id: conditions.append(PerformanceMetric.user_id == user_id) # Get aggregated metrics metrics_stmt = select( PerformanceMetric.user_id, func.sum(PerformanceMetric.total_sales).label('total_sales'), func.sum(PerformanceMetric.transaction_count).label('transaction_count'), func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'), func.sum(PerformanceMetric.void_count).label('void_count'), func.sum(PerformanceMetric.customer_interaction_count).label('customer_interactions'), func.sum(PerformanceMetric.login_time).label('total_login_time'), func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency_score') ).where( and_(*conditions) ).group_by( PerformanceMetric.user_id ) result = await session.execute(metrics_stmt) metrics_data = result.all() # Get user details user_ids = [m.user_id for m in metrics_data] users_stmt = select(User).where(User.id.in_(user_ids)) users = (await session.execute(users_stmt)).scalars().all() users_dict = {u.id: u for u in users} # Format response data performance_data = [] for metric in metrics_data: user = users_dict.get(metric.user_id) if user: performance_data.append({ "user_id": user.id, "username": user.username, "full_name": user.full_name, "metrics": { "total_sales": metric.total_sales, "transaction_count": metric.transaction_count, "average_transaction_value": metric.avg_transaction_value, "void_count": metric.void_count, "customer_interactions": metric.customer_interactions, "total_login_time": metric.total_login_time, "efficiency_score": metric.avg_efficiency_score } }) # Calculate branch averages if branch_id is specified branch_averages = None if branch_id: avg_stmt = select( func.avg(PerformanceMetric.total_sales).label('avg_sales'), func.avg(PerformanceMetric.transaction_count).label('avg_transactions'), func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'), func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency') ).where( and_( PerformanceMetric.branch_id == branch_id, PerformanceMetric.metric_date.between(start_date, end_date) ) ) avg_result = await session.execute(avg_stmt) branch_avg = avg_result.one() branch_averages = { "average_daily_sales": branch_avg.avg_sales, "average_daily_transactions": branch_avg.avg_transactions, "average_transaction_value": branch_avg.avg_transaction_value, "average_efficiency_score": branch_avg.avg_efficiency } response = { "staff_performance": performance_data, "date_range": { "start_date": start_date.isoformat(), "end_date": end_date.isoformat() } } if branch_averages: response["branch_averages"] = branch_averages # Cache the response for 1 hour await cache.set_cache(cache_key, response, expire=3600) return response analytics = AnalyticsService() staff_analytics = StaffAnalyticsService()