Spaces:
Paused
Paused
| from datetime import datetime, timedelta | |
| from sqlalchemy import func, and_, select, extract | |
| from ..db.database import db | |
| from ..utils.cache import cache | |
| from ..db.models import StaffActivity, PerformanceMetric, User, ActivityType | |
| from typing import Dict, List, Any, Optional | |
| import numpy as np | |
| from collections import defaultdict | |
| class AnalyticsService: | |
| async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]: | |
| cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| }, | |
| "status": {"$in": ["completed", "delivered"]} | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}}, | |
| "total_sales": {"$sum": "$total_amount"}, | |
| "order_count": {"$sum": 1} | |
| } | |
| }, | |
| {"$sort": {"_id": 1}} | |
| ] | |
| sales_data = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| result = { | |
| "daily_sales": sales_data, | |
| "total_revenue": sum(day["total_sales"] for day in sales_data), | |
| "total_orders": sum(day["order_count"] for day in sales_data), | |
| "average_order_value": sum(day["total_sales"] for day in sales_data) / | |
| (sum(day["order_count"] for day in sales_data) or 1) | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| async def get_product_analytics() -> Dict[str, Any]: | |
| cache_key = "product_analytics" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$unwind": "$products" | |
| }, | |
| { | |
| "$group": { | |
| "_id": "$products.product_id", | |
| "total_quantity": {"$sum": "$products.quantity"}, | |
| "total_revenue": { | |
| "$sum": { | |
| "$multiply": ["$products.price", "$products.quantity"] | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "$sort": {"total_revenue": -1} | |
| }, | |
| { | |
| "$limit": 10 | |
| } | |
| ] | |
| top_products = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| # Get product details | |
| for product in top_products: | |
| product_detail = await db.db["products"].find_one({"_id": product["_id"]}) | |
| if product_detail: | |
| product["name"] = product_detail["name"] | |
| product["category"] = product_detail["category"] | |
| result = { | |
| "top_products": top_products, | |
| "total_products": await db.db["products"].count_documents({}), | |
| "low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}}) | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| async def get_customer_analytics() -> Dict[str, Any]: | |
| cache_key = "customer_analytics" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| pipeline = [ | |
| { | |
| "$group": { | |
| "_id": "$customer_id", | |
| "total_orders": {"$sum": 1}, | |
| "total_spent": {"$sum": "$total_amount"}, | |
| "last_order": {"$max": "$created_at"} | |
| } | |
| }, | |
| { | |
| "$sort": {"total_spent": -1} | |
| } | |
| ] | |
| customer_data = await db.db["orders"].aggregate(pipeline).to_list(None) | |
| result = { | |
| "total_customers": len(customer_data), | |
| "top_customers": customer_data[:10], | |
| "average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1), | |
| "customer_segments": { | |
| "high_value": len([c for c in customer_data if c["total_spent"] > 1000]), | |
| "medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]), | |
| "low_value": len([c for c in customer_data if c["total_spent"] < 500]) | |
| } | |
| } | |
| await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour | |
| return result | |
| class StaffAnalyticsService: | |
| async def record_activity( | |
| user_id: int, | |
| branch_id: int, | |
| activity_type: ActivityType, | |
| details: dict, | |
| duration: Optional[float] = None | |
| ) -> StaffActivity: | |
| """Record a staff activity with performance scoring and notifications""" | |
| # Calculate performance score based on activity type and details | |
| score = None | |
| if activity_type == ActivityType.SALE: | |
| # Score based on sale amount and speed | |
| amount = details.get('amount', 0) | |
| duration = details.get('duration', 0) # duration in minutes | |
| if duration > 0: | |
| score = min((amount / 100) * (5 / duration), 10) # Max score of 10 | |
| elif activity_type == ActivityType.CUSTOMER_SERVICE: | |
| # Score based on interaction quality | |
| satisfaction = details.get('customer_satisfaction', 0) | |
| resolution_time = details.get('resolution_time', 0) | |
| if resolution_time > 0: | |
| score = min((satisfaction * 2) * (10 / resolution_time), 10) | |
| async with db() as session: | |
| activity = StaffActivity( | |
| user_id=user_id, | |
| branch_id=branch_id, | |
| activity_type=activity_type, | |
| details=details, | |
| duration=duration, | |
| performance_score=score | |
| ) | |
| session.add(activity) | |
| await session.commit() | |
| await session.refresh(activity) | |
| # Get current metrics before update | |
| prev_metrics = await StaffAnalyticsService._get_current_metrics(user_id, branch_id) | |
| # Update daily performance metrics | |
| new_metrics = await StaffAnalyticsService._update_performance_metrics( | |
| user_id, branch_id, activity | |
| ) | |
| return activity, prev_metrics, new_metrics | |
| async def _get_current_metrics(user_id: int, branch_id: int) -> Optional[Dict[str, Any]]: | |
| """Get current day's metrics for a user""" | |
| today = datetime.utcnow().date() | |
| async with db() as session: | |
| stmt = select(PerformanceMetric).where( | |
| and_( | |
| PerformanceMetric.user_id == user_id, | |
| PerformanceMetric.branch_id == branch_id, | |
| func.date(PerformanceMetric.metric_date) == today | |
| ) | |
| ) | |
| result = await session.execute(stmt) | |
| metric = result.scalar_one_or_none() | |
| if metric: | |
| return { | |
| "total_sales": metric.total_sales, | |
| "transaction_count": metric.transaction_count, | |
| "void_count": metric.void_count, | |
| "efficiency_score": metric.efficiency_score, | |
| "customer_interaction_count": metric.customer_interaction_count | |
| } | |
| return None | |
| async def _update_performance_metrics( | |
| user_id: int, | |
| branch_id: int, | |
| activity: StaffActivity | |
| ) -> Dict[str, Any]: | |
| """Update daily performance metrics based on new activity""" | |
| today = datetime.utcnow().date() | |
| async with db() as session: | |
| # Get or create today's metrics | |
| stmt = select(PerformanceMetric).where( | |
| and_( | |
| PerformanceMetric.user_id == user_id, | |
| PerformanceMetric.branch_id == branch_id, | |
| func.date(PerformanceMetric.metric_date) == today | |
| ) | |
| ) | |
| result = await session.execute(stmt) | |
| metric = result.scalar_one_or_none() | |
| if not metric: | |
| metric = PerformanceMetric( | |
| user_id=user_id, | |
| branch_id=branch_id, | |
| metric_date=datetime.utcnow() | |
| ) | |
| session.add(metric) | |
| # Update metrics based on activity type | |
| if activity.activity_type == ActivityType.SALE: | |
| metric.total_sales += activity.details.get('amount', 0) | |
| metric.transaction_count += 1 | |
| metric.average_transaction_value = metric.total_sales / metric.transaction_count | |
| elif activity.activity_type == ActivityType.VOID: | |
| metric.void_count += 1 | |
| elif activity.activity_type == ActivityType.CUSTOMER_SERVICE: | |
| metric.customer_interaction_count += 1 | |
| elif activity.activity_type in [ActivityType.LOGIN, ActivityType.LOGOUT]: | |
| if activity.duration: | |
| metric.login_time += activity.duration | |
| # Calculate efficiency score | |
| # Weight different factors in the score | |
| weights = { | |
| 'sales': 0.4, | |
| 'speed': 0.2, | |
| 'accuracy': 0.2, | |
| 'customer_service': 0.2 | |
| } | |
| # Calculate component scores | |
| sales_score = min((metric.total_sales / 1000) * 10, 10) # Scale sales to 0-10 | |
| speed_score = 10 * (1 - (metric.login_time / (8 * 60))) # Assuming 8-hour day | |
| accuracy_score = 10 * (1 - (metric.void_count / max(metric.transaction_count, 1))) | |
| cs_score = min((metric.customer_interaction_count / 10) * 10, 10) | |
| metric.efficiency_score = ( | |
| (sales_score * weights['sales']) + | |
| (speed_score * weights['speed']) + | |
| (accuracy_score * weights['accuracy']) + | |
| (cs_score * weights['customer_service']) | |
| ) | |
| await session.commit() | |
| await session.refresh(metric) | |
| return { | |
| "total_sales": metric.total_sales, | |
| "transaction_count": metric.transaction_count, | |
| "average_transaction_value": metric.average_transaction_value, | |
| "void_count": metric.void_count, | |
| "customer_interaction_count": metric.customer_interaction_count, | |
| "login_time": metric.login_time, | |
| "efficiency_score": metric.efficiency_score | |
| } | |
| async def get_staff_performance( | |
| branch_id: Optional[int] = None, | |
| user_id: Optional[int] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| """Get comprehensive staff performance metrics""" | |
| if not start_date: | |
| start_date = datetime.utcnow() - timedelta(days=30) | |
| if not end_date: | |
| end_date = datetime.utcnow() | |
| cache_key = f"staff_performance:{branch_id}:{user_id}:{start_date.date()}:{end_date.date()}" | |
| cached_data = await cache.get_cache(cache_key) | |
| if cached_data: | |
| return cached_data | |
| async with db() as session: | |
| # Base query conditions | |
| conditions = [ | |
| PerformanceMetric.metric_date.between(start_date, end_date) | |
| ] | |
| if branch_id: | |
| conditions.append(PerformanceMetric.branch_id == branch_id) | |
| if user_id: | |
| conditions.append(PerformanceMetric.user_id == user_id) | |
| # Get aggregated metrics | |
| metrics_stmt = select( | |
| PerformanceMetric.user_id, | |
| func.sum(PerformanceMetric.total_sales).label('total_sales'), | |
| func.sum(PerformanceMetric.transaction_count).label('transaction_count'), | |
| func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'), | |
| func.sum(PerformanceMetric.void_count).label('void_count'), | |
| func.sum(PerformanceMetric.customer_interaction_count).label('customer_interactions'), | |
| func.sum(PerformanceMetric.login_time).label('total_login_time'), | |
| func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency_score') | |
| ).where( | |
| and_(*conditions) | |
| ).group_by( | |
| PerformanceMetric.user_id | |
| ) | |
| result = await session.execute(metrics_stmt) | |
| metrics_data = result.all() | |
| # Get user details | |
| user_ids = [m.user_id for m in metrics_data] | |
| users_stmt = select(User).where(User.id.in_(user_ids)) | |
| users = (await session.execute(users_stmt)).scalars().all() | |
| users_dict = {u.id: u for u in users} | |
| # Format response data | |
| performance_data = [] | |
| for metric in metrics_data: | |
| user = users_dict.get(metric.user_id) | |
| if user: | |
| performance_data.append({ | |
| "user_id": user.id, | |
| "username": user.username, | |
| "full_name": user.full_name, | |
| "metrics": { | |
| "total_sales": metric.total_sales, | |
| "transaction_count": metric.transaction_count, | |
| "average_transaction_value": metric.avg_transaction_value, | |
| "void_count": metric.void_count, | |
| "customer_interactions": metric.customer_interactions, | |
| "total_login_time": metric.total_login_time, | |
| "efficiency_score": metric.avg_efficiency_score | |
| } | |
| }) | |
| # Calculate branch averages if branch_id is specified | |
| branch_averages = None | |
| if branch_id: | |
| avg_stmt = select( | |
| func.avg(PerformanceMetric.total_sales).label('avg_sales'), | |
| func.avg(PerformanceMetric.transaction_count).label('avg_transactions'), | |
| func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'), | |
| func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency') | |
| ).where( | |
| and_( | |
| PerformanceMetric.branch_id == branch_id, | |
| PerformanceMetric.metric_date.between(start_date, end_date) | |
| ) | |
| ) | |
| avg_result = await session.execute(avg_stmt) | |
| branch_avg = avg_result.one() | |
| branch_averages = { | |
| "average_daily_sales": branch_avg.avg_sales, | |
| "average_daily_transactions": branch_avg.avg_transactions, | |
| "average_transaction_value": branch_avg.avg_transaction_value, | |
| "average_efficiency_score": branch_avg.avg_efficiency | |
| } | |
| response = { | |
| "staff_performance": performance_data, | |
| "date_range": { | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat() | |
| } | |
| } | |
| if branch_averages: | |
| response["branch_averages"] = branch_averages | |
| # Cache the response for 1 hour | |
| await cache.set_cache(cache_key, response, expire=3600) | |
| return response | |
| analytics = AnalyticsService() | |
| staff_analytics = StaffAnalyticsService() |