Spaces:
Running
Running
| """ | |
| Sync handler for POS MongoDB to PostgreSQL synchronization. | |
| Provides event-driven sync operations that can be called from service layers. | |
| """ | |
| from typing import Dict, Any, Optional | |
| import logging | |
| from contextlib import asynccontextmanager | |
| from app.sql import get_postgres_session | |
| from app.sync.sync_service import POSSyncService | |
| from app.customers.models.model import CustomerModel | |
| logger = logging.getLogger(__name__) | |
| class POSSyncHandler: | |
| """Handler for POS sync operations with automatic session management""" | |
| async def get_sync_service(): | |
| """Context manager to get sync service with proper session handling""" | |
| async with get_postgres_session() as pg_session: | |
| yield POSSyncService(pg_session) | |
| async def handle_customer_created(customer: CustomerModel) -> bool: | |
| """Handle customer creation event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_customer_create(customer) | |
| logger.info(f"Successfully synced customer creation: {customer.customer_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync customer creation {customer.customer_id}: {e}") | |
| return False | |
| async def handle_customer_updated(customer_id: str, customer: CustomerModel) -> bool: | |
| """Handle customer update event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_customer_update(customer_id, customer) | |
| logger.info(f"Successfully synced customer update: {customer_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync customer update {customer_id}: {e}") | |
| return False | |
| async def handle_customer_deleted(customer_id: str) -> bool: | |
| """Handle customer deletion event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_customer_delete(customer_id) | |
| logger.info(f"Successfully synced customer deletion: {customer_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync customer deletion {customer_id}: {e}") | |
| return False | |
| async def handle_customer_status_changed(customer_id: str, status: str) -> bool: | |
| """Handle customer status change event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_customer_status(customer_id, status) | |
| logger.info(f"Successfully synced customer status change: {customer_id} -> {status}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync customer status change {customer_id}: {e}") | |
| return False | |
| async def handle_staff_created(staff_data: Dict[str, Any]) -> bool: | |
| """Handle staff creation event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_staff_create(staff_data) | |
| logger.info(f"Successfully synced staff creation: {staff_data.get('staff_id')}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync staff creation {staff_data.get('staff_id')}: {e}") | |
| return False | |
| async def handle_staff_updated(staff_id: str, staff_data: Dict[str, Any]) -> bool: | |
| """Handle staff update event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_staff_update(staff_id, staff_data) | |
| logger.info(f"Successfully synced staff update: {staff_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync staff update {staff_id}: {e}") | |
| return False | |
| async def handle_staff_deleted(staff_id: str) -> bool: | |
| """Handle staff deletion event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_staff_delete(staff_id) | |
| logger.info(f"Successfully synced staff deletion: {staff_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync staff deletion {staff_id}: {e}") | |
| return False | |
| async def handle_staff_status_changed(staff_id: str, status: str) -> bool: | |
| """Handle staff status change event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_staff_status(staff_id, status) | |
| logger.info(f"Successfully synced staff status change: {staff_id} -> {status}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync staff status change {staff_id}: {e}") | |
| return False | |
| async def handle_catalogue_service_created(service_data: Dict[str, Any]) -> bool: | |
| """Handle catalogue service creation event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_catalogue_service_create(service_data) | |
| logger.info(f"Successfully synced catalogue service creation: {service_data.get('_id') or service_data.get('service_id')}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync catalogue service creation {service_data.get('_id') or service_data.get('service_id')}: {e}") | |
| return False | |
| async def handle_catalogue_service_updated(service_id: str, service_data: Dict[str, Any]) -> bool: | |
| """Handle catalogue service update event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_catalogue_service_update(service_id, service_data) | |
| logger.info(f"Successfully synced catalogue service update: {service_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync catalogue service update {service_id}: {e}") | |
| return False | |
| async def handle_catalogue_service_deleted(service_id: str) -> bool: | |
| """Handle catalogue service deletion event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_catalogue_service_delete(service_id) | |
| logger.info(f"Successfully synced catalogue service deletion: {service_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync catalogue service deletion {service_id}: {e}") | |
| return False | |
| async def handle_catalogue_service_status_changed(service_id: str, status: str) -> bool: | |
| """Handle catalogue service status change event""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| await sync_service.sync_catalogue_service_status(service_id, status) | |
| logger.info(f"Successfully synced catalogue service status change: {service_id} -> {status}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to sync catalogue service status change {service_id}: {e}") | |
| return False | |
| async def handle_bulk_customer_sync(customers: list) -> Dict[str, Any]: | |
| """Handle bulk customer sync""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| results = await sync_service.bulk_sync_customers(customers) | |
| success_count = sum(1 for r in results if r["status"] == "success") | |
| error_count = sum(1 for r in results if r["status"] == "error") | |
| logger.info(f"Bulk customer sync completed: {success_count} success, {error_count} errors") | |
| return { | |
| "total": len(customers), | |
| "success": success_count, | |
| "errors": error_count, | |
| "results": results | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed bulk customer sync: {e}") | |
| return { | |
| "total": len(customers), | |
| "success": 0, | |
| "errors": len(customers), | |
| "error": str(e) | |
| } | |
| async def handle_bulk_staff_sync(staff_list: list) -> Dict[str, Any]: | |
| """Handle bulk staff sync""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| results = await sync_service.bulk_sync_staff(staff_list) | |
| success_count = sum(1 for r in results if r["status"] == "success") | |
| error_count = sum(1 for r in results if r["status"] == "error") | |
| logger.info(f"Bulk staff sync completed: {success_count} success, {error_count} errors") | |
| return { | |
| "total": len(staff_list), | |
| "success": success_count, | |
| "errors": error_count, | |
| "results": results | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed bulk staff sync: {e}") | |
| return { | |
| "total": len(staff_list), | |
| "success": 0, | |
| "errors": len(staff_list), | |
| "error": str(e) | |
| } | |
| async def handle_bulk_catalogue_service_sync(service_list: list) -> Dict[str, Any]: | |
| """Handle bulk catalogue service sync""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| results = await sync_service.bulk_sync_catalogue_services(service_list) | |
| success_count = sum(1 for r in results if r["status"] == "success") | |
| error_count = sum(1 for r in results if r["status"] == "error") | |
| logger.info(f"Bulk catalogue service sync completed: {success_count} success, {error_count} errors") | |
| return { | |
| "total": len(service_list), | |
| "success": success_count, | |
| "errors": error_count, | |
| "results": results | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed bulk catalogue service sync: {e}") | |
| return { | |
| "total": len(service_list), | |
| "success": 0, | |
| "errors": len(service_list), | |
| "error": str(e) | |
| } | |
| async def check_sync_health() -> Dict[str, Any]: | |
| """Check sync system health""" | |
| try: | |
| async with POSSyncHandler.get_sync_service() as sync_service: | |
| return await sync_service.check_sync_health() | |
| except Exception as e: | |
| logger.error(f"Failed to check sync health: {e}") | |
| return { | |
| "overall_health": False, | |
| "error": str(e) | |
| } |