Spaces:
Sleeping
Sleeping
| """ | |
| Main synchronization service for POS MongoDB to PostgreSQL sync. | |
| Coordinates all sync operations across different entities. | |
| """ | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| from app.core.logging import get_logger | |
| from app.sync.customers.sync_service import CustomerSyncService | |
| from app.sync.staff.sync_service import StaffSyncService | |
| from app.sync.catalogue_services.sync_service import CatalogueServiceSyncService | |
| from app.customers.models.model import CustomerModel | |
| logger = get_logger(__name__) | |
| class POSSyncService: | |
| """Main service to coordinate all POS sync operations""" | |
| def __init__(self, pg_session: AsyncSession): | |
| self.pg_session = pg_session | |
| self.customer_sync = CustomerSyncService(pg_session) | |
| self.staff_sync = StaffSyncService(pg_session) | |
| self.catalogue_service_sync = CatalogueServiceSyncService(pg_session) | |
| # Customer sync operations | |
| async def sync_customer_create(self, customer: CustomerModel): | |
| """Sync customer creation to PostgreSQL""" | |
| try: | |
| return await self.customer_sync.create_customer_ref(customer) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing customer creation: {e}", | |
| extra={ | |
| "operation": "sync_customer_create", | |
| "customer_id": customer.customer_id, | |
| "merchant_id": customer.merchant_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_customer_update(self, customer_id: str, customer: CustomerModel): | |
| """Sync customer update to PostgreSQL""" | |
| try: | |
| return await self.customer_sync.update_customer_ref(customer_id, customer) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing customer update: {e}", | |
| extra={ | |
| "operation": "sync_customer_update", | |
| "customer_id": customer_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_customer_delete(self, customer_id: str): | |
| """Sync customer deletion to PostgreSQL""" | |
| try: | |
| return await self.customer_sync.delete_customer_ref(customer_id) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing customer deletion: {e}", | |
| extra={ | |
| "operation": "sync_customer_delete", | |
| "customer_id": customer_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_customer_status(self, customer_id: str, status: str): | |
| """Sync customer status change to PostgreSQL""" | |
| try: | |
| return await self.customer_sync.sync_customer_status(customer_id, status) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing customer status: {e}", | |
| extra={ | |
| "operation": "sync_customer_status", | |
| "customer_id": customer_id, | |
| "status": status, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| # Staff sync operations | |
| async def sync_staff_create(self, staff_data: Dict[str, Any]): | |
| """Sync staff creation to PostgreSQL""" | |
| try: | |
| return await self.staff_sync.create_staff_ref(staff_data) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing staff creation: {e}", | |
| extra={ | |
| "operation": "sync_staff_create", | |
| "staff_id": staff_data.get('staff_id'), | |
| "merchant_id": staff_data.get('merchant_id'), | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_staff_update(self, staff_id: str, staff_data: Dict[str, Any]): | |
| """Sync staff update to PostgreSQL""" | |
| try: | |
| return await self.staff_sync.update_staff_ref(staff_id, staff_data) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing staff update: {e}", | |
| extra={ | |
| "operation": "sync_staff_update", | |
| "staff_id": staff_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_staff_delete(self, staff_id: str): | |
| """Sync staff deletion to PostgreSQL""" | |
| try: | |
| return await self.staff_sync.delete_staff_ref(staff_id) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing staff deletion: {e}", | |
| extra={ | |
| "operation": "sync_staff_delete", | |
| "staff_id": staff_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_staff_status(self, staff_id: str, status: str): | |
| """Sync staff status change to PostgreSQL""" | |
| try: | |
| return await self.staff_sync.sync_staff_status(staff_id, status) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing staff status: {e}", | |
| extra={ | |
| "operation": "sync_staff_status", | |
| "staff_id": staff_id, | |
| "status": status, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| # Catalogue service sync operations | |
| async def sync_catalogue_service_create(self, service_data: Dict[str, Any]): | |
| """Sync catalogue service creation to PostgreSQL""" | |
| try: | |
| return await self.catalogue_service_sync.create_catalogue_service_ref(service_data) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing catalogue service creation: {e}", | |
| extra={ | |
| "operation": "sync_catalogue_service_create", | |
| "service_id": service_data.get('_id') or service_data.get('service_id'), | |
| "merchant_id": service_data.get('merchant_id'), | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_catalogue_service_update(self, service_id: str, service_data: Dict[str, Any]): | |
| """Sync catalogue service update to PostgreSQL""" | |
| try: | |
| return await self.catalogue_service_sync.update_catalogue_service_ref(service_id, service_data) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing catalogue service update: {e}", | |
| extra={ | |
| "operation": "sync_catalogue_service_update", | |
| "service_id": service_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_catalogue_service_delete(self, service_id: str): | |
| """Sync catalogue service deletion to PostgreSQL""" | |
| try: | |
| return await self.catalogue_service_sync.delete_catalogue_service_ref(service_id) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing catalogue service deletion: {e}", | |
| extra={ | |
| "operation": "sync_catalogue_service_delete", | |
| "service_id": service_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| async def sync_catalogue_service_status(self, service_id: str, status: str): | |
| """Sync catalogue service status change to PostgreSQL""" | |
| try: | |
| return await self.catalogue_service_sync.sync_catalogue_service_status(service_id, status) | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing catalogue service status: {e}", | |
| extra={ | |
| "operation": "sync_catalogue_service_status", | |
| "service_id": service_id, | |
| "status": status, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| raise | |
| # Bulk sync operations | |
| async def bulk_sync_customers(self, customers: list): | |
| """Bulk sync multiple customers""" | |
| results = [] | |
| for customer in customers: | |
| try: | |
| result = await self.customer_sync.create_customer_ref(customer) | |
| results.append({"customer_id": customer.customer_id, "status": "success", "result": result}) | |
| except Exception as e: | |
| logger.error( | |
| f"Error bulk syncing customer {customer.customer_id}: {e}", | |
| extra={ | |
| "operation": "bulk_sync_customers", | |
| "customer_id": customer.customer_id, | |
| "merchant_id": customer.merchant_id, | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| results.append({"customer_id": customer.customer_id, "status": "error", "error": str(e)}) | |
| return results | |
| async def bulk_sync_staff(self, staff_list: list): | |
| """Bulk sync multiple staff members""" | |
| results = [] | |
| for staff_data in staff_list: | |
| try: | |
| result = await self.staff_sync.create_staff_ref(staff_data) | |
| results.append({"staff_id": staff_data.get('staff_id'), "status": "success", "result": result}) | |
| except Exception as e: | |
| logger.error( | |
| f"Error bulk syncing staff {staff_data.get('staff_id')}: {e}", | |
| extra={ | |
| "operation": "bulk_sync_staff", | |
| "staff_id": staff_data.get('staff_id'), | |
| "merchant_id": staff_data.get('merchant_id'), | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| results.append({"staff_id": staff_data.get('staff_id'), "status": "error", "error": str(e)}) | |
| return results | |
| async def bulk_sync_catalogue_services(self, service_list: list): | |
| """Bulk sync multiple catalogue services""" | |
| results = [] | |
| for service_data in service_list: | |
| try: | |
| result = await self.catalogue_service_sync.create_catalogue_service_ref(service_data) | |
| results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "success", "result": result}) | |
| except Exception as e: | |
| logger.error( | |
| f"Error bulk syncing catalogue service {service_data.get('_id') or service_data.get('service_id')}: {e}", | |
| extra={ | |
| "operation": "bulk_sync_catalogue_services", | |
| "service_id": service_data.get('_id') or service_data.get('service_id'), | |
| "merchant_id": service_data.get('merchant_id'), | |
| "error": str(e) | |
| }, | |
| exc_info=True | |
| ) | |
| results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "error", "error": str(e)}) | |
| return results | |
| # Health check operations | |
| async def check_sync_health(self) -> Dict[str, Any]: | |
| """Check the health of sync operations""" | |
| try: | |
| # Test customer sync | |
| customer_health = True | |
| try: | |
| await self.customer_sync._ensure_customer_ref_schema() | |
| except Exception as e: | |
| customer_health = False | |
| logger.error( | |
| f"Customer sync health check failed: {e}", | |
| extra={"operation": "check_sync_health", "component": "customer_sync", "error": str(e)}, | |
| exc_info=True | |
| ) | |
| # Test staff sync | |
| staff_health = True | |
| try: | |
| await self.staff_sync._ensure_staff_ref_schema() | |
| except Exception as e: | |
| staff_health = False | |
| logger.error( | |
| f"Staff sync health check failed: {e}", | |
| extra={"operation": "check_sync_health", "component": "staff_sync", "error": str(e)}, | |
| exc_info=True | |
| ) | |
| # Test catalogue service sync | |
| catalogue_service_health = True | |
| try: | |
| await self.catalogue_service_sync._ensure_catalogue_service_ref_schema() | |
| except Exception as e: | |
| catalogue_service_health = False | |
| logger.error( | |
| f"Catalogue service sync health check failed: {e}", | |
| extra={"operation": "check_sync_health", "component": "catalogue_service_sync", "error": str(e)}, | |
| exc_info=True | |
| ) | |
| logger.info("Sync health check completed", extra={"operation": "check_sync_health"}) | |
| return { | |
| "overall_health": customer_health and staff_health and catalogue_service_health, | |
| "customer_sync": customer_health, | |
| "staff_sync": staff_health, | |
| "catalogue_service_sync": catalogue_service_health, | |
| "timestamp": datetime.utcnow() | |
| } | |
| except Exception as e: | |
| logger.error( | |
| f"Error checking sync health: {e}", | |
| extra={"operation": "check_sync_health", "error": str(e)}, | |
| exc_info=True | |
| ) | |
| return { | |
| "overall_health": False, | |
| "error": str(e) | |
| } |