""" Main synchronization service for POS MongoDB to PostgreSQL sync. Coordinates all sync operations across different entities. """ from sqlalchemy.ext.asyncio import AsyncSession from typing import Dict, Any, Optional from datetime import datetime from app.core.logging import get_logger from app.sync.customers.sync_service import CustomerSyncService from app.sync.staff.sync_service import StaffSyncService from app.sync.catalogue_services.sync_service import CatalogueServiceSyncService from app.customers.models.model import CustomerModel logger = get_logger(__name__) class POSSyncService: """Main service to coordinate all POS sync operations""" def __init__(self, pg_session: AsyncSession): self.pg_session = pg_session self.customer_sync = CustomerSyncService(pg_session) self.staff_sync = StaffSyncService(pg_session) self.catalogue_service_sync = CatalogueServiceSyncService(pg_session) # Customer sync operations async def sync_customer_create(self, customer: CustomerModel): """Sync customer creation to PostgreSQL""" try: return await self.customer_sync.create_customer_ref(customer) except Exception as e: logger.error( f"Error syncing customer creation: {e}", extra={ "operation": "sync_customer_create", "customer_id": customer.customer_id, "merchant_id": customer.merchant_id, "error": str(e) }, exc_info=True ) raise async def sync_customer_update(self, customer_id: str, customer: CustomerModel): """Sync customer update to PostgreSQL""" try: return await self.customer_sync.update_customer_ref(customer_id, customer) except Exception as e: logger.error( f"Error syncing customer update: {e}", extra={ "operation": "sync_customer_update", "customer_id": customer_id, "error": str(e) }, exc_info=True ) raise async def sync_customer_delete(self, customer_id: str): """Sync customer deletion to PostgreSQL""" try: return await self.customer_sync.delete_customer_ref(customer_id) except Exception as e: logger.error( f"Error syncing customer deletion: {e}", extra={ "operation": "sync_customer_delete", "customer_id": customer_id, "error": str(e) }, exc_info=True ) raise async def sync_customer_status(self, customer_id: str, status: str): """Sync customer status change to PostgreSQL""" try: return await self.customer_sync.sync_customer_status(customer_id, status) except Exception as e: logger.error( f"Error syncing customer status: {e}", extra={ "operation": "sync_customer_status", "customer_id": customer_id, "status": status, "error": str(e) }, exc_info=True ) raise # Staff sync operations async def sync_staff_create(self, staff_data: Dict[str, Any]): """Sync staff creation to PostgreSQL""" try: return await self.staff_sync.create_staff_ref(staff_data) except Exception as e: logger.error( f"Error syncing staff creation: {e}", extra={ "operation": "sync_staff_create", "staff_id": staff_data.get('staff_id'), "merchant_id": staff_data.get('merchant_id'), "error": str(e) }, exc_info=True ) raise async def sync_staff_update(self, staff_id: str, staff_data: Dict[str, Any]): """Sync staff update to PostgreSQL""" try: return await self.staff_sync.update_staff_ref(staff_id, staff_data) except Exception as e: logger.error( f"Error syncing staff update: {e}", extra={ "operation": "sync_staff_update", "staff_id": staff_id, "error": str(e) }, exc_info=True ) raise async def sync_staff_delete(self, staff_id: str): """Sync staff deletion to PostgreSQL""" try: return await self.staff_sync.delete_staff_ref(staff_id) except Exception as e: logger.error( f"Error syncing staff deletion: {e}", extra={ "operation": "sync_staff_delete", "staff_id": staff_id, "error": str(e) }, exc_info=True ) raise async def sync_staff_status(self, staff_id: str, status: str): """Sync staff status change to PostgreSQL""" try: return await self.staff_sync.sync_staff_status(staff_id, status) except Exception as e: logger.error( f"Error syncing staff status: {e}", extra={ "operation": "sync_staff_status", "staff_id": staff_id, "status": status, "error": str(e) }, exc_info=True ) raise # Catalogue service sync operations async def sync_catalogue_service_create(self, service_data: Dict[str, Any]): """Sync catalogue service creation to PostgreSQL""" try: return await self.catalogue_service_sync.create_catalogue_service_ref(service_data) except Exception as e: logger.error( f"Error syncing catalogue service creation: {e}", extra={ "operation": "sync_catalogue_service_create", "service_id": service_data.get('_id') or service_data.get('service_id'), "merchant_id": service_data.get('merchant_id'), "error": str(e) }, exc_info=True ) raise async def sync_catalogue_service_update(self, service_id: str, service_data: Dict[str, Any]): """Sync catalogue service update to PostgreSQL""" try: return await self.catalogue_service_sync.update_catalogue_service_ref(service_id, service_data) except Exception as e: logger.error( f"Error syncing catalogue service update: {e}", extra={ "operation": "sync_catalogue_service_update", "service_id": service_id, "error": str(e) }, exc_info=True ) raise async def sync_catalogue_service_delete(self, service_id: str): """Sync catalogue service deletion to PostgreSQL""" try: return await self.catalogue_service_sync.delete_catalogue_service_ref(service_id) except Exception as e: logger.error( f"Error syncing catalogue service deletion: {e}", extra={ "operation": "sync_catalogue_service_delete", "service_id": service_id, "error": str(e) }, exc_info=True ) raise async def sync_catalogue_service_status(self, service_id: str, status: str): """Sync catalogue service status change to PostgreSQL""" try: return await self.catalogue_service_sync.sync_catalogue_service_status(service_id, status) except Exception as e: logger.error( f"Error syncing catalogue service status: {e}", extra={ "operation": "sync_catalogue_service_status", "service_id": service_id, "status": status, "error": str(e) }, exc_info=True ) raise # Bulk sync operations async def bulk_sync_customers(self, customers: list): """Bulk sync multiple customers""" results = [] for customer in customers: try: result = await self.customer_sync.create_customer_ref(customer) results.append({"customer_id": customer.customer_id, "status": "success", "result": result}) except Exception as e: logger.error( f"Error bulk syncing customer {customer.customer_id}: {e}", extra={ "operation": "bulk_sync_customers", "customer_id": customer.customer_id, "merchant_id": customer.merchant_id, "error": str(e) }, exc_info=True ) results.append({"customer_id": customer.customer_id, "status": "error", "error": str(e)}) return results async def bulk_sync_staff(self, staff_list: list): """Bulk sync multiple staff members""" results = [] for staff_data in staff_list: try: result = await self.staff_sync.create_staff_ref(staff_data) results.append({"staff_id": staff_data.get('staff_id'), "status": "success", "result": result}) except Exception as e: logger.error( f"Error bulk syncing staff {staff_data.get('staff_id')}: {e}", extra={ "operation": "bulk_sync_staff", "staff_id": staff_data.get('staff_id'), "merchant_id": staff_data.get('merchant_id'), "error": str(e) }, exc_info=True ) results.append({"staff_id": staff_data.get('staff_id'), "status": "error", "error": str(e)}) return results async def bulk_sync_catalogue_services(self, service_list: list): """Bulk sync multiple catalogue services""" results = [] for service_data in service_list: try: result = await self.catalogue_service_sync.create_catalogue_service_ref(service_data) results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "success", "result": result}) except Exception as e: logger.error( f"Error bulk syncing catalogue service {service_data.get('_id') or service_data.get('service_id')}: {e}", extra={ "operation": "bulk_sync_catalogue_services", "service_id": service_data.get('_id') or service_data.get('service_id'), "merchant_id": service_data.get('merchant_id'), "error": str(e) }, exc_info=True ) results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "error", "error": str(e)}) return results # Health check operations async def check_sync_health(self) -> Dict[str, Any]: """Check the health of sync operations""" try: # Test customer sync customer_health = True try: await self.customer_sync._ensure_customer_ref_schema() except Exception as e: customer_health = False logger.error( f"Customer sync health check failed: {e}", extra={"operation": "check_sync_health", "component": "customer_sync", "error": str(e)}, exc_info=True ) # Test staff sync staff_health = True try: await self.staff_sync._ensure_staff_ref_schema() except Exception as e: staff_health = False logger.error( f"Staff sync health check failed: {e}", extra={"operation": "check_sync_health", "component": "staff_sync", "error": str(e)}, exc_info=True ) # Test catalogue service sync catalogue_service_health = True try: await self.catalogue_service_sync._ensure_catalogue_service_ref_schema() except Exception as e: catalogue_service_health = False logger.error( f"Catalogue service sync health check failed: {e}", extra={"operation": "check_sync_health", "component": "catalogue_service_sync", "error": str(e)}, exc_info=True ) logger.info("Sync health check completed", extra={"operation": "check_sync_health"}) return { "overall_health": customer_health and staff_health and catalogue_service_health, "customer_sync": customer_health, "staff_sync": staff_health, "catalogue_service_sync": catalogue_service_health, "timestamp": datetime.utcnow() } except Exception as e: logger.error( f"Error checking sync health: {e}", extra={"operation": "check_sync_health", "error": str(e)}, exc_info=True ) return { "overall_health": False, "error": str(e) }