Spaces:
Paused
Paused
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional, List | |
| from ..core.config import settings | |
| from ..utils.logger import logger, log_health_check | |
| from ..services.analytics import staff_analytics | |
| from ..db.models import ActivityType | |
| from ..routes.websocket import broadcast_staff_update | |
| from ..utils.retry import with_retry, retry_with_backoff | |
| from ..services.performance_notifications import performance_notifications | |
| import httpx | |
| import asyncio | |
| import aiohttp | |
| class POSAnalyticsService: | |
| def __init__(self): | |
| self.pos_api_url = settings.POS_API_URL | |
| self.pos_api_key = settings.POS_API_KEY | |
| self.pos_api_version = settings.POS_API_VERSION | |
| self._headers = { | |
| "Authorization": f"Bearer {self.pos_api_key}", | |
| "X-API-Version": self.pos_api_version, | |
| "Content-Type": "application/json" | |
| } | |
| self._last_sync_time = None | |
| self._metrics_cache = {} | |
| self._failed_operations = [] | |
| self._last_error = None | |
| async def sync_pos_activity(self, pos_data: Dict[str, Any]) -> bool: | |
| """Sync staff activity data from POS system with retry mechanism""" | |
| try: | |
| # Map POS activity types to our ActivityType enum | |
| activity_mapping = { | |
| "sale": ActivityType.SALE, | |
| "void": ActivityType.VOID, | |
| "refund": ActivityType.REFUND, | |
| "inventory": ActivityType.INVENTORY, | |
| "customer_service": ActivityType.CUSTOMER_SERVICE, | |
| "login": ActivityType.LOGIN, | |
| "logout": ActivityType.LOGOUT | |
| } | |
| activity_type = activity_mapping.get(pos_data["activity_type"].lower()) | |
| if not activity_type: | |
| logger.warning(f"Unknown POS activity type: {pos_data['activity_type']}") | |
| return False | |
| # Record the activity in our system | |
| activity, prev_metrics, new_metrics = await staff_analytics.record_activity( | |
| user_id=pos_data["user_id"], | |
| branch_id=pos_data["branch_id"], | |
| activity_type=activity_type, | |
| details=pos_data["details"], | |
| duration=pos_data.get("duration") | |
| ) | |
| # Process notifications for the activity | |
| await performance_notifications.process_activity_notifications( | |
| user_id=pos_data["user_id"], | |
| branch_id=pos_data["branch_id"], | |
| activity=activity, | |
| prev_metrics=prev_metrics, | |
| new_metrics=new_metrics | |
| ) | |
| # Get updated metrics for the branch and broadcast | |
| metrics = await staff_analytics.get_staff_performance( | |
| branch_id=pos_data["branch_id"], | |
| start_date=datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) | |
| ) | |
| await broadcast_staff_update(metrics) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error syncing POS activity: {str(e)}") | |
| raise | |
| async def get_pos_metrics(self, user_id: int, branch_id: int, date: datetime) -> Optional[Dict[str, Any]]: | |
| """Fetch staff metrics from POS system with retry mechanism""" | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{self.pos_api_url}/api/v1/staff/metrics", | |
| headers=self._headers, | |
| params={ | |
| "user_id": user_id, | |
| "branch_id": branch_id, | |
| "date": date.date().isoformat() | |
| }, | |
| timeout=10.0 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPError as e: | |
| logger.error(f"HTTP error fetching POS metrics: {str(e)}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error fetching POS metrics: {str(e)}") | |
| return None | |
| async def sync_all_metrics(self, branch_id: Optional[int] = None) -> bool: | |
| """Sync all metrics from POS system""" | |
| try: | |
| # Reset failed operations list | |
| self._failed_operations = [] | |
| # Sync various metrics | |
| tasks = [ | |
| self._sync_sales_metrics(), | |
| self._sync_inventory_metrics(), | |
| self._sync_staff_metrics(), | |
| self._sync_customer_metrics() | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Check for failures | |
| for result in results: | |
| if isinstance(result, Exception): | |
| self._failed_operations.append(str(result)) | |
| logger.error(f"Metric sync failed: {str(result)}") | |
| self._last_sync_time = datetime.utcnow() | |
| self._last_error = None if not self._failed_operations else self._failed_operations[-1] | |
| # Process metrics and sync staff activities | |
| sync_tasks = [] | |
| processed_branches = set() | |
| for metric in (r for r in results if not isinstance(r, Exception)): | |
| activities = self._convert_metrics_to_activities(metric) | |
| if "branch_id" in metric: | |
| processed_branches.add(metric["branch_id"]) | |
| for activity in activities: | |
| sync_tasks.append(self.sync_pos_activity(activity)) | |
| # Run all sync tasks concurrently with individual retries | |
| activity_results = await asyncio.gather(*sync_tasks, return_exceptions=True) | |
| # Broadcast updates for each affected branch | |
| for branch_id in processed_branches: | |
| metrics = await staff_analytics.get_staff_performance(branch_id=branch_id) | |
| await broadcast_staff_update(metrics) | |
| # Check overall success | |
| success = len(self._failed_operations) == 0 and all( | |
| result is True if not isinstance(result, Exception) else False | |
| for result in activity_results | |
| ) | |
| if not success: | |
| logger.warning("Some metrics failed to sync") | |
| return success | |
| except Exception as e: | |
| self._last_error = str(e) | |
| logger.error(f"Failed to sync metrics: {str(e)}") | |
| return False | |
| def _convert_metrics_to_activities(self, metric: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Convert POS metrics into individual activity records""" | |
| activities = [] | |
| timestamp = datetime.utcnow().isoformat() | |
| # Convert sales metrics | |
| if metric.get("sales_amount"): | |
| activities.append({ | |
| "user_id": metric["user_id"], | |
| "branch_id": metric["branch_id"], | |
| "activity_type": "sale", | |
| "details": { | |
| "amount": metric["sales_amount"], | |
| "transaction_count": metric["transaction_count"] | |
| }, | |
| "timestamp": timestamp | |
| }) | |
| # Convert void transactions | |
| if metric.get("void_count"): | |
| activities.append({ | |
| "user_id": metric["user_id"], | |
| "branch_id": metric["branch_id"], | |
| "activity_type": "void", | |
| "details": { | |
| "count": metric["void_count"], | |
| "amount": metric.get("void_amount", 0) | |
| }, | |
| "timestamp": timestamp | |
| }) | |
| # Convert customer service interactions | |
| if metric.get("customer_interactions"): | |
| activities.append({ | |
| "user_id": metric["user_id"], | |
| "branch_id": metric["branch_id"], | |
| "activity_type": "customer_service", | |
| "details": { | |
| "interaction_count": metric["customer_interactions"], | |
| "satisfaction_score": metric.get("customer_satisfaction", 0) | |
| }, | |
| "timestamp": timestamp | |
| }) | |
| # Convert login time | |
| if metric.get("login_duration"): | |
| activities.append({ | |
| "user_id": metric["user_id"], | |
| "branch_id": metric["branch_id"], | |
| "activity_type": "login", | |
| "details": { | |
| "session_type": "pos", | |
| "login_time": metric["login_duration"] | |
| }, | |
| "timestamp": timestamp, | |
| "duration": metric["login_duration"] | |
| }) | |
| return activities | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Check POS integration health status""" | |
| try: | |
| status = "healthy" | |
| details = { | |
| "last_sync": self._last_sync_time, | |
| "failed_operations": len(self._failed_operations), | |
| "cache_size": len(self._metrics_cache) | |
| } | |
| if self._last_error: | |
| details["last_error"] = str(self._last_error) | |
| if (datetime.utcnow() - self._last_sync_time) > timedelta(minutes=30): | |
| status = "warning" | |
| if len(self._failed_operations) > 5: | |
| status = "degraded" | |
| # Check POS API connectivity | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{settings.POS_API_URL}/health", | |
| timeout=5 | |
| ) as response: | |
| if response.status == 200: | |
| details["api_status"] = "connected" | |
| else: | |
| details["api_status"] = "error" | |
| status = "unhealthy" | |
| except Exception as e: | |
| details["api_status"] = "connection_failed" | |
| details["api_error"] = str(e) | |
| status = "unhealthy" | |
| log_health_check("pos_integration", status, details) | |
| return { | |
| "status": status, | |
| "details": details | |
| } | |
| except Exception as e: | |
| logger.error(f"POS health check failed: {str(e)}") | |
| return { | |
| "status": "error", | |
| "details": {"error": str(e)} | |
| } | |
| async def _sync_sales_metrics(self) -> Dict[str, Any]: | |
| """Sync sales metrics from POS""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{settings.POS_API_URL}/metrics/sales", | |
| headers=self._get_auth_headers() | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| self._metrics_cache["sales"] = data | |
| return data | |
| else: | |
| raise Exception(f"Failed to sync sales metrics: {response.status}") | |
| async def _sync_inventory_metrics(self) -> Dict[str, Any]: | |
| """Sync inventory metrics from POS""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{settings.POS_API_URL}/metrics/inventory", | |
| headers=self._get_auth_headers() | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| self._metrics_cache["inventory"] = data | |
| return data | |
| else: | |
| raise Exception(f"Failed to sync inventory metrics: {response.status}") | |
| async def _sync_staff_metrics(self) -> Dict[str, Any]: | |
| """Sync staff performance metrics from POS""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{settings.POS_API_URL}/metrics/staff", | |
| headers=self._get_auth_headers() | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| self._metrics_cache["staff"] = data | |
| return data | |
| else: | |
| raise Exception(f"Failed to sync staff metrics: {response.status}") | |
| async def _sync_customer_metrics(self) -> Dict[str, Any]: | |
| """Sync customer metrics from POS""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{settings.POS_API_URL}/metrics/customers", | |
| headers=self._get_auth_headers() | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| self._metrics_cache["customers"] = data | |
| return data | |
| else: | |
| raise Exception(f"Failed to sync customer metrics: {response.status}") | |
| def _get_auth_headers(self) -> Dict[str, str]: | |
| """Get authentication headers for POS API""" | |
| return { | |
| "Authorization": f"Bearer {settings.POS_API_KEY}", | |
| "X-API-Version": settings.POS_API_VERSION | |
| } | |
| def get_cached_metrics(self, metric_type: str) -> Optional[Dict[str, Any]]: | |
| """Get cached metrics by type""" | |
| return self._metrics_cache.get(metric_type) | |
| def get_sync_status(self) -> Dict[str, Any]: | |
| """Get current sync status""" | |
| return { | |
| "last_sync": self._last_sync_time, | |
| "failed_operations": self._failed_operations, | |
| "cache_status": { | |
| k: "cached" for k in self._metrics_cache.keys() | |
| }, | |
| "last_error": self._last_error | |
| } | |
| pos_analytics = POSAnalyticsService() |