from datetime import datetime from typing import Dict, Any, List, Optional from ..services.notifications import create_and_broadcast_notification from ..db.models import ActivityType from ..utils.logger import logger class PerformanceNotificationService: # Performance thresholds for notifications THRESHOLDS = { "high_efficiency": 9.0, # Notify for efficiency scores above 9 "low_efficiency": 5.0, # Warn for efficiency scores below 5 "sales_milestone": 10000, # Notify for sales milestones (every 10k) "high_void_rate": 0.1, # Warn when void rate exceeds 10% "customer_service": 20, # Notify for high customer service interactions } @staticmethod async def check_and_notify_performance( user_id: int, branch_id: int, metrics: Dict[str, Any] ): """Check performance metrics and send notifications if thresholds are met""" try: notifications = [] # Check efficiency score if metrics["efficiency_score"] >= PerformanceNotificationService.THRESHOLDS["high_efficiency"]: notifications.append({ "title": "Outstanding Performance!", "message": "Your efficiency score is exceptional. Keep up the great work!", "type": "performance_achievement", "data": { "metric": "efficiency_score", "value": metrics["efficiency_score"] } }) elif metrics["efficiency_score"] <= PerformanceNotificationService.THRESHOLDS["low_efficiency"]: notifications.append({ "title": "Performance Alert", "message": "Your efficiency score needs improvement. Contact your supervisor for support.", "type": "performance_alert", "data": { "metric": "efficiency_score", "value": metrics["efficiency_score"] } }) # Check sales milestones milestone = PerformanceNotificationService.THRESHOLDS["sales_milestone"] if metrics["total_sales"] > 0 and metrics["total_sales"] % milestone < 100: notifications.append({ "title": "Sales Milestone Achieved!", "message": f"Congratulations! You've reached {int(metrics['total_sales'] / 1000)}k in sales.", "type": "sales_milestone", "data": { "metric": "total_sales", "value": metrics["total_sales"] } }) # Check void rate if metrics["transaction_count"] > 0: void_rate = metrics["void_count"] / metrics["transaction_count"] if void_rate > PerformanceNotificationService.THRESHOLDS["high_void_rate"]: notifications.append({ "title": "High Void Rate Alert", "message": "Your void transaction rate is above average. Please review procedures.", "type": "void_rate_alert", "data": { "metric": "void_rate", "value": void_rate } }) # Check customer service interactions if metrics["customer_interaction_count"] >= PerformanceNotificationService.THRESHOLDS["customer_service"]: notifications.append({ "title": "Customer Service Achievement", "message": "Great job handling customer interactions today!", "type": "customer_service_achievement", "data": { "metric": "customer_interactions", "value": metrics["customer_interaction_count"] } }) # Send all notifications for notif in notifications: await create_and_broadcast_notification( user_id=str(user_id), title=notif["title"], message=notif["message"], notification_type=notif["type"], data=notif["data"] ) # If it's an alert, also notify supervisor if "alert" in notif["type"]: await create_and_broadcast_notification( user_id="supervisor", title=f"Staff Alert: {notif['title']}", message=f"Staff member {user_id} in branch {branch_id}: {notif['message']}", notification_type=f"supervisor_{notif['type']}", data={ **notif["data"], "staff_id": user_id, "branch_id": branch_id } ) except Exception as e: logger.error(f"Error in performance notifications: {str(e)}") @staticmethod async def notify_performance_insights( branch_id: int, insights: List[Dict[str, Any]] ): """Send notifications for performance insights""" try: for insight in insights: if insight["type"] in ["warning", "positive"]: await create_and_broadcast_notification( user_id="supervisor", title=f"Branch Performance Insight", message=insight["message"], notification_type=f"performance_insight_{insight['type']}", data={ "branch_id": branch_id, "category": insight["category"], "insight_type": insight["type"] } ) except Exception as e: logger.error(f"Error in insight notifications: {str(e)}") @staticmethod async def notify_realtime_alerts( branch_id: int, current_metrics: Dict[str, Any], previous_metrics: Optional[Dict[str, Any]] = None ): """Send notifications for significant real-time changes""" try: if not previous_metrics: return alerts = [] # Check for significant drops in performance if (previous_metrics["efficiency_score"] - current_metrics["efficiency_score"]) > 2: alerts.append({ "title": "Sudden Performance Drop", "message": "Notable decrease in staff performance detected.", "type": "realtime_performance_alert" }) # Check for unusual void patterns current_void_rate = ( current_metrics["void_count"] / current_metrics["transaction_count"] if current_metrics["transaction_count"] > 0 else 0 ) previous_void_rate = ( previous_metrics["void_count"] / previous_metrics["transaction_count"] if previous_metrics["transaction_count"] > 0 else 0 ) if current_void_rate > previous_void_rate * 2: alerts.append({ "title": "Unusual Void Pattern", "message": "Significant increase in void transactions detected.", "type": "realtime_void_alert" }) # Send alerts for alert in alerts: await create_and_broadcast_notification( user_id="supervisor", title=alert["title"], message=alert["message"], notification_type=alert["type"], data={ "branch_id": branch_id, "current_metrics": current_metrics, "previous_metrics": previous_metrics } ) except Exception as e: logger.error(f"Error in realtime alert notifications: {str(e)}") @staticmethod async def process_activity_notifications( user_id: int, branch_id: int, activity: Any, prev_metrics: Optional[Dict[str, Any]], new_metrics: Dict[str, Any] ): """Process notifications for a new staff activity""" await PerformanceNotificationService.check_and_notify_performance( user_id=user_id, branch_id=branch_id, metrics=new_metrics ) if prev_metrics: await PerformanceNotificationService.notify_realtime_alerts( branch_id=branch_id, current_metrics=new_metrics, previous_metrics=prev_metrics ) performance_notifications = PerformanceNotificationService()