MukeshKapoor25's picture
feat(sync): Add comprehensive POS data sync infrastructure and seed data management
9fd3989
"""
Sync handler for POS MongoDB to PostgreSQL synchronization.
Provides event-driven sync operations that can be called from service layers.
"""
from typing import Dict, Any, Optional
import logging
from contextlib import asynccontextmanager
from app.sql import get_postgres_session
from app.sync.sync_service import POSSyncService
from app.customers.models.model import CustomerModel
logger = logging.getLogger(__name__)
class POSSyncHandler:
"""Handler for POS sync operations with automatic session management"""
@staticmethod
@asynccontextmanager
async def get_sync_service():
"""Context manager to get sync service with proper session handling"""
async with get_postgres_session() as pg_session:
yield POSSyncService(pg_session)
@staticmethod
async def handle_customer_created(customer: CustomerModel) -> bool:
"""Handle customer creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_create(customer)
logger.info(f"Successfully synced customer creation: {customer.customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer creation {customer.customer_id}: {e}")
return False
@staticmethod
async def handle_customer_updated(customer_id: str, customer: CustomerModel) -> bool:
"""Handle customer update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_update(customer_id, customer)
logger.info(f"Successfully synced customer update: {customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer update {customer_id}: {e}")
return False
@staticmethod
async def handle_customer_deleted(customer_id: str) -> bool:
"""Handle customer deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_delete(customer_id)
logger.info(f"Successfully synced customer deletion: {customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer deletion {customer_id}: {e}")
return False
@staticmethod
async def handle_customer_status_changed(customer_id: str, status: str) -> bool:
"""Handle customer status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_status(customer_id, status)
logger.info(f"Successfully synced customer status change: {customer_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync customer status change {customer_id}: {e}")
return False
@staticmethod
async def handle_staff_created(staff_data: Dict[str, Any]) -> bool:
"""Handle staff creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_create(staff_data)
logger.info(f"Successfully synced staff creation: {staff_data.get('staff_id')}")
return True
except Exception as e:
logger.error(f"Failed to sync staff creation {staff_data.get('staff_id')}: {e}")
return False
@staticmethod
async def handle_staff_updated(staff_id: str, staff_data: Dict[str, Any]) -> bool:
"""Handle staff update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_update(staff_id, staff_data)
logger.info(f"Successfully synced staff update: {staff_id}")
return True
except Exception as e:
logger.error(f"Failed to sync staff update {staff_id}: {e}")
return False
@staticmethod
async def handle_staff_deleted(staff_id: str) -> bool:
"""Handle staff deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_delete(staff_id)
logger.info(f"Successfully synced staff deletion: {staff_id}")
return True
except Exception as e:
logger.error(f"Failed to sync staff deletion {staff_id}: {e}")
return False
@staticmethod
async def handle_staff_status_changed(staff_id: str, status: str) -> bool:
"""Handle staff status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_status(staff_id, status)
logger.info(f"Successfully synced staff status change: {staff_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync staff status change {staff_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_created(service_data: Dict[str, Any]) -> bool:
"""Handle catalogue service creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_create(service_data)
logger.info(f"Successfully synced catalogue service creation: {service_data.get('_id') or service_data.get('service_id')}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service creation {service_data.get('_id') or service_data.get('service_id')}: {e}")
return False
@staticmethod
async def handle_catalogue_service_updated(service_id: str, service_data: Dict[str, Any]) -> bool:
"""Handle catalogue service update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_update(service_id, service_data)
logger.info(f"Successfully synced catalogue service update: {service_id}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service update {service_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_deleted(service_id: str) -> bool:
"""Handle catalogue service deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_delete(service_id)
logger.info(f"Successfully synced catalogue service deletion: {service_id}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service deletion {service_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_status_changed(service_id: str, status: str) -> bool:
"""Handle catalogue service status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_status(service_id, status)
logger.info(f"Successfully synced catalogue service status change: {service_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service status change {service_id}: {e}")
return False
@staticmethod
async def handle_bulk_customer_sync(customers: list) -> Dict[str, Any]:
"""Handle bulk customer sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_customers(customers)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk customer sync completed: {success_count} success, {error_count} errors")
return {
"total": len(customers),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk customer sync: {e}")
return {
"total": len(customers),
"success": 0,
"errors": len(customers),
"error": str(e)
}
@staticmethod
async def handle_bulk_staff_sync(staff_list: list) -> Dict[str, Any]:
"""Handle bulk staff sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_staff(staff_list)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk staff sync completed: {success_count} success, {error_count} errors")
return {
"total": len(staff_list),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk staff sync: {e}")
return {
"total": len(staff_list),
"success": 0,
"errors": len(staff_list),
"error": str(e)
}
@staticmethod
async def handle_bulk_catalogue_service_sync(service_list: list) -> Dict[str, Any]:
"""Handle bulk catalogue service sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_catalogue_services(service_list)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk catalogue service sync completed: {success_count} success, {error_count} errors")
return {
"total": len(service_list),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk catalogue service sync: {e}")
return {
"total": len(service_list),
"success": 0,
"errors": len(service_list),
"error": str(e)
}
@staticmethod
async def check_sync_health() -> Dict[str, Any]:
"""Check sync system health"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
return await sync_service.check_sync_health()
except Exception as e:
logger.error(f"Failed to check sync health: {e}")
return {
"overall_health": False,
"error": str(e)
}