Admin-Desk2 / app /services /pos_analytics.py
Fred808's picture
Upload 94 files
349f43e verified
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from ..core.config import settings
from ..utils.logger import logger, log_health_check
from ..services.analytics import staff_analytics
from ..db.models import ActivityType
from ..routes.websocket import broadcast_staff_update
from ..utils.retry import with_retry, retry_with_backoff
from ..services.performance_notifications import performance_notifications
import httpx
import asyncio
import aiohttp
class POSAnalyticsService:
def __init__(self):
self.pos_api_url = settings.POS_API_URL
self.pos_api_key = settings.POS_API_KEY
self.pos_api_version = settings.POS_API_VERSION
self._headers = {
"Authorization": f"Bearer {self.pos_api_key}",
"X-API-Version": self.pos_api_version,
"Content-Type": "application/json"
}
self._last_sync_time = None
self._metrics_cache = {}
self._failed_operations = []
self._last_error = None
@with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError))
async def sync_pos_activity(self, pos_data: Dict[str, Any]) -> bool:
"""Sync staff activity data from POS system with retry mechanism"""
try:
# Map POS activity types to our ActivityType enum
activity_mapping = {
"sale": ActivityType.SALE,
"void": ActivityType.VOID,
"refund": ActivityType.REFUND,
"inventory": ActivityType.INVENTORY,
"customer_service": ActivityType.CUSTOMER_SERVICE,
"login": ActivityType.LOGIN,
"logout": ActivityType.LOGOUT
}
activity_type = activity_mapping.get(pos_data["activity_type"].lower())
if not activity_type:
logger.warning(f"Unknown POS activity type: {pos_data['activity_type']}")
return False
# Record the activity in our system
activity, prev_metrics, new_metrics = await staff_analytics.record_activity(
user_id=pos_data["user_id"],
branch_id=pos_data["branch_id"],
activity_type=activity_type,
details=pos_data["details"],
duration=pos_data.get("duration")
)
# Process notifications for the activity
await performance_notifications.process_activity_notifications(
user_id=pos_data["user_id"],
branch_id=pos_data["branch_id"],
activity=activity,
prev_metrics=prev_metrics,
new_metrics=new_metrics
)
# Get updated metrics for the branch and broadcast
metrics = await staff_analytics.get_staff_performance(
branch_id=pos_data["branch_id"],
start_date=datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
)
await broadcast_staff_update(metrics)
return True
except Exception as e:
logger.error(f"Error syncing POS activity: {str(e)}")
raise
@with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError))
async def get_pos_metrics(self, user_id: int, branch_id: int, date: datetime) -> Optional[Dict[str, Any]]:
"""Fetch staff metrics from POS system with retry mechanism"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.pos_api_url}/api/v1/staff/metrics",
headers=self._headers,
params={
"user_id": user_id,
"branch_id": branch_id,
"date": date.date().isoformat()
},
timeout=10.0
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"HTTP error fetching POS metrics: {str(e)}")
raise
except Exception as e:
logger.error(f"Error fetching POS metrics: {str(e)}")
return None
@retry_with_backoff(max_retries=3)
async def sync_all_metrics(self, branch_id: Optional[int] = None) -> bool:
"""Sync all metrics from POS system"""
try:
# Reset failed operations list
self._failed_operations = []
# Sync various metrics
tasks = [
self._sync_sales_metrics(),
self._sync_inventory_metrics(),
self._sync_staff_metrics(),
self._sync_customer_metrics()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for failures
for result in results:
if isinstance(result, Exception):
self._failed_operations.append(str(result))
logger.error(f"Metric sync failed: {str(result)}")
self._last_sync_time = datetime.utcnow()
self._last_error = None if not self._failed_operations else self._failed_operations[-1]
# Process metrics and sync staff activities
sync_tasks = []
processed_branches = set()
for metric in (r for r in results if not isinstance(r, Exception)):
activities = self._convert_metrics_to_activities(metric)
if "branch_id" in metric:
processed_branches.add(metric["branch_id"])
for activity in activities:
sync_tasks.append(self.sync_pos_activity(activity))
# Run all sync tasks concurrently with individual retries
activity_results = await asyncio.gather(*sync_tasks, return_exceptions=True)
# Broadcast updates for each affected branch
for branch_id in processed_branches:
metrics = await staff_analytics.get_staff_performance(branch_id=branch_id)
await broadcast_staff_update(metrics)
# Check overall success
success = len(self._failed_operations) == 0 and all(
result is True if not isinstance(result, Exception) else False
for result in activity_results
)
if not success:
logger.warning("Some metrics failed to sync")
return success
except Exception as e:
self._last_error = str(e)
logger.error(f"Failed to sync metrics: {str(e)}")
return False
def _convert_metrics_to_activities(self, metric: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Convert POS metrics into individual activity records"""
activities = []
timestamp = datetime.utcnow().isoformat()
# Convert sales metrics
if metric.get("sales_amount"):
activities.append({
"user_id": metric["user_id"],
"branch_id": metric["branch_id"],
"activity_type": "sale",
"details": {
"amount": metric["sales_amount"],
"transaction_count": metric["transaction_count"]
},
"timestamp": timestamp
})
# Convert void transactions
if metric.get("void_count"):
activities.append({
"user_id": metric["user_id"],
"branch_id": metric["branch_id"],
"activity_type": "void",
"details": {
"count": metric["void_count"],
"amount": metric.get("void_amount", 0)
},
"timestamp": timestamp
})
# Convert customer service interactions
if metric.get("customer_interactions"):
activities.append({
"user_id": metric["user_id"],
"branch_id": metric["branch_id"],
"activity_type": "customer_service",
"details": {
"interaction_count": metric["customer_interactions"],
"satisfaction_score": metric.get("customer_satisfaction", 0)
},
"timestamp": timestamp
})
# Convert login time
if metric.get("login_duration"):
activities.append({
"user_id": metric["user_id"],
"branch_id": metric["branch_id"],
"activity_type": "login",
"details": {
"session_type": "pos",
"login_time": metric["login_duration"]
},
"timestamp": timestamp,
"duration": metric["login_duration"]
})
return activities
async def health_check(self) -> Dict[str, Any]:
"""Check POS integration health status"""
try:
status = "healthy"
details = {
"last_sync": self._last_sync_time,
"failed_operations": len(self._failed_operations),
"cache_size": len(self._metrics_cache)
}
if self._last_error:
details["last_error"] = str(self._last_error)
if (datetime.utcnow() - self._last_sync_time) > timedelta(minutes=30):
status = "warning"
if len(self._failed_operations) > 5:
status = "degraded"
# Check POS API connectivity
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{settings.POS_API_URL}/health",
timeout=5
) as response:
if response.status == 200:
details["api_status"] = "connected"
else:
details["api_status"] = "error"
status = "unhealthy"
except Exception as e:
details["api_status"] = "connection_failed"
details["api_error"] = str(e)
status = "unhealthy"
log_health_check("pos_integration", status, details)
return {
"status": status,
"details": details
}
except Exception as e:
logger.error(f"POS health check failed: {str(e)}")
return {
"status": "error",
"details": {"error": str(e)}
}
@retry_with_backoff(max_retries=2)
async def _sync_sales_metrics(self) -> Dict[str, Any]:
"""Sync sales metrics from POS"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{settings.POS_API_URL}/metrics/sales",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
self._metrics_cache["sales"] = data
return data
else:
raise Exception(f"Failed to sync sales metrics: {response.status}")
@retry_with_backoff(max_retries=2)
async def _sync_inventory_metrics(self) -> Dict[str, Any]:
"""Sync inventory metrics from POS"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{settings.POS_API_URL}/metrics/inventory",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
self._metrics_cache["inventory"] = data
return data
else:
raise Exception(f"Failed to sync inventory metrics: {response.status}")
@retry_with_backoff(max_retries=2)
async def _sync_staff_metrics(self) -> Dict[str, Any]:
"""Sync staff performance metrics from POS"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{settings.POS_API_URL}/metrics/staff",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
self._metrics_cache["staff"] = data
return data
else:
raise Exception(f"Failed to sync staff metrics: {response.status}")
@retry_with_backoff(max_retries=2)
async def _sync_customer_metrics(self) -> Dict[str, Any]:
"""Sync customer metrics from POS"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{settings.POS_API_URL}/metrics/customers",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
self._metrics_cache["customers"] = data
return data
else:
raise Exception(f"Failed to sync customer metrics: {response.status}")
def _get_auth_headers(self) -> Dict[str, str]:
"""Get authentication headers for POS API"""
return {
"Authorization": f"Bearer {settings.POS_API_KEY}",
"X-API-Version": settings.POS_API_VERSION
}
def get_cached_metrics(self, metric_type: str) -> Optional[Dict[str, Any]]:
"""Get cached metrics by type"""
return self._metrics_cache.get(metric_type)
def get_sync_status(self) -> Dict[str, Any]:
"""Get current sync status"""
return {
"last_sync": self._last_sync_time,
"failed_operations": self._failed_operations,
"cache_status": {
k: "cached" for k in self._metrics_cache.keys()
},
"last_error": self._last_error
}
pos_analytics = POSAnalyticsService()