from datetime import datetime, timedelta from typing import Dict, Any, Optional, List from ..core.config import settings from ..utils.logger import logger, log_health_check from ..services.analytics import staff_analytics from ..db.models import ActivityType from ..routes.websocket import broadcast_staff_update from ..utils.retry import with_retry, retry_with_backoff from ..services.performance_notifications import performance_notifications import httpx import asyncio import aiohttp class POSAnalyticsService: def __init__(self): self.pos_api_url = settings.POS_API_URL self.pos_api_key = settings.POS_API_KEY self.pos_api_version = settings.POS_API_VERSION self._headers = { "Authorization": f"Bearer {self.pos_api_key}", "X-API-Version": self.pos_api_version, "Content-Type": "application/json" } self._last_sync_time = None self._metrics_cache = {} self._failed_operations = [] self._last_error = None @with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError)) async def sync_pos_activity(self, pos_data: Dict[str, Any]) -> bool: """Sync staff activity data from POS system with retry mechanism""" try: # Map POS activity types to our ActivityType enum activity_mapping = { "sale": ActivityType.SALE, "void": ActivityType.VOID, "refund": ActivityType.REFUND, "inventory": ActivityType.INVENTORY, "customer_service": ActivityType.CUSTOMER_SERVICE, "login": ActivityType.LOGIN, "logout": ActivityType.LOGOUT } activity_type = activity_mapping.get(pos_data["activity_type"].lower()) if not activity_type: logger.warning(f"Unknown POS activity type: {pos_data['activity_type']}") return False # Record the activity in our system activity, prev_metrics, new_metrics = await staff_analytics.record_activity( user_id=pos_data["user_id"], branch_id=pos_data["branch_id"], activity_type=activity_type, details=pos_data["details"], duration=pos_data.get("duration") ) # Process notifications for the activity await performance_notifications.process_activity_notifications( user_id=pos_data["user_id"], branch_id=pos_data["branch_id"], activity=activity, prev_metrics=prev_metrics, new_metrics=new_metrics ) # Get updated metrics for the branch and broadcast metrics = await staff_analytics.get_staff_performance( branch_id=pos_data["branch_id"], start_date=datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) ) await broadcast_staff_update(metrics) return True except Exception as e: logger.error(f"Error syncing POS activity: {str(e)}") raise @with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError)) async def get_pos_metrics(self, user_id: int, branch_id: int, date: datetime) -> Optional[Dict[str, Any]]: """Fetch staff metrics from POS system with retry mechanism""" try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.pos_api_url}/api/v1/staff/metrics", headers=self._headers, params={ "user_id": user_id, "branch_id": branch_id, "date": date.date().isoformat() }, timeout=10.0 ) response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.error(f"HTTP error fetching POS metrics: {str(e)}") raise except Exception as e: logger.error(f"Error fetching POS metrics: {str(e)}") return None @retry_with_backoff(max_retries=3) async def sync_all_metrics(self, branch_id: Optional[int] = None) -> bool: """Sync all metrics from POS system""" try: # Reset failed operations list self._failed_operations = [] # Sync various metrics tasks = [ self._sync_sales_metrics(), self._sync_inventory_metrics(), self._sync_staff_metrics(), self._sync_customer_metrics() ] results = await asyncio.gather(*tasks, return_exceptions=True) # Check for failures for result in results: if isinstance(result, Exception): self._failed_operations.append(str(result)) logger.error(f"Metric sync failed: {str(result)}") self._last_sync_time = datetime.utcnow() self._last_error = None if not self._failed_operations else self._failed_operations[-1] # Process metrics and sync staff activities sync_tasks = [] processed_branches = set() for metric in (r for r in results if not isinstance(r, Exception)): activities = self._convert_metrics_to_activities(metric) if "branch_id" in metric: processed_branches.add(metric["branch_id"]) for activity in activities: sync_tasks.append(self.sync_pos_activity(activity)) # Run all sync tasks concurrently with individual retries activity_results = await asyncio.gather(*sync_tasks, return_exceptions=True) # Broadcast updates for each affected branch for branch_id in processed_branches: metrics = await staff_analytics.get_staff_performance(branch_id=branch_id) await broadcast_staff_update(metrics) # Check overall success success = len(self._failed_operations) == 0 and all( result is True if not isinstance(result, Exception) else False for result in activity_results ) if not success: logger.warning("Some metrics failed to sync") return success except Exception as e: self._last_error = str(e) logger.error(f"Failed to sync metrics: {str(e)}") return False def _convert_metrics_to_activities(self, metric: Dict[str, Any]) -> List[Dict[str, Any]]: """Convert POS metrics into individual activity records""" activities = [] timestamp = datetime.utcnow().isoformat() # Convert sales metrics if metric.get("sales_amount"): activities.append({ "user_id": metric["user_id"], "branch_id": metric["branch_id"], "activity_type": "sale", "details": { "amount": metric["sales_amount"], "transaction_count": metric["transaction_count"] }, "timestamp": timestamp }) # Convert void transactions if metric.get("void_count"): activities.append({ "user_id": metric["user_id"], "branch_id": metric["branch_id"], "activity_type": "void", "details": { "count": metric["void_count"], "amount": metric.get("void_amount", 0) }, "timestamp": timestamp }) # Convert customer service interactions if metric.get("customer_interactions"): activities.append({ "user_id": metric["user_id"], "branch_id": metric["branch_id"], "activity_type": "customer_service", "details": { "interaction_count": metric["customer_interactions"], "satisfaction_score": metric.get("customer_satisfaction", 0) }, "timestamp": timestamp }) # Convert login time if metric.get("login_duration"): activities.append({ "user_id": metric["user_id"], "branch_id": metric["branch_id"], "activity_type": "login", "details": { "session_type": "pos", "login_time": metric["login_duration"] }, "timestamp": timestamp, "duration": metric["login_duration"] }) return activities async def health_check(self) -> Dict[str, Any]: """Check POS integration health status""" try: status = "healthy" details = { "last_sync": self._last_sync_time, "failed_operations": len(self._failed_operations), "cache_size": len(self._metrics_cache) } if self._last_error: details["last_error"] = str(self._last_error) if (datetime.utcnow() - self._last_sync_time) > timedelta(minutes=30): status = "warning" if len(self._failed_operations) > 5: status = "degraded" # Check POS API connectivity try: async with aiohttp.ClientSession() as session: async with session.get( f"{settings.POS_API_URL}/health", timeout=5 ) as response: if response.status == 200: details["api_status"] = "connected" else: details["api_status"] = "error" status = "unhealthy" except Exception as e: details["api_status"] = "connection_failed" details["api_error"] = str(e) status = "unhealthy" log_health_check("pos_integration", status, details) return { "status": status, "details": details } except Exception as e: logger.error(f"POS health check failed: {str(e)}") return { "status": "error", "details": {"error": str(e)} } @retry_with_backoff(max_retries=2) async def _sync_sales_metrics(self) -> Dict[str, Any]: """Sync sales metrics from POS""" async with aiohttp.ClientSession() as session: async with session.get( f"{settings.POS_API_URL}/metrics/sales", headers=self._get_auth_headers() ) as response: if response.status == 200: data = await response.json() self._metrics_cache["sales"] = data return data else: raise Exception(f"Failed to sync sales metrics: {response.status}") @retry_with_backoff(max_retries=2) async def _sync_inventory_metrics(self) -> Dict[str, Any]: """Sync inventory metrics from POS""" async with aiohttp.ClientSession() as session: async with session.get( f"{settings.POS_API_URL}/metrics/inventory", headers=self._get_auth_headers() ) as response: if response.status == 200: data = await response.json() self._metrics_cache["inventory"] = data return data else: raise Exception(f"Failed to sync inventory metrics: {response.status}") @retry_with_backoff(max_retries=2) async def _sync_staff_metrics(self) -> Dict[str, Any]: """Sync staff performance metrics from POS""" async with aiohttp.ClientSession() as session: async with session.get( f"{settings.POS_API_URL}/metrics/staff", headers=self._get_auth_headers() ) as response: if response.status == 200: data = await response.json() self._metrics_cache["staff"] = data return data else: raise Exception(f"Failed to sync staff metrics: {response.status}") @retry_with_backoff(max_retries=2) async def _sync_customer_metrics(self) -> Dict[str, Any]: """Sync customer metrics from POS""" async with aiohttp.ClientSession() as session: async with session.get( f"{settings.POS_API_URL}/metrics/customers", headers=self._get_auth_headers() ) as response: if response.status == 200: data = await response.json() self._metrics_cache["customers"] = data return data else: raise Exception(f"Failed to sync customer metrics: {response.status}") def _get_auth_headers(self) -> Dict[str, str]: """Get authentication headers for POS API""" return { "Authorization": f"Bearer {settings.POS_API_KEY}", "X-API-Version": settings.POS_API_VERSION } def get_cached_metrics(self, metric_type: str) -> Optional[Dict[str, Any]]: """Get cached metrics by type""" return self._metrics_cache.get(metric_type) def get_sync_status(self) -> Dict[str, Any]: """Get current sync status""" return { "last_sync": self._last_sync_time, "failed_operations": self._failed_operations, "cache_status": { k: "cached" for k in self._metrics_cache.keys() }, "last_error": self._last_error } pos_analytics = POSAnalyticsService()