Spaces:
Sleeping
Sleeping
File size: 11,507 Bytes
9fd3989 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | """
Sync handler for POS MongoDB to PostgreSQL synchronization.
Provides event-driven sync operations that can be called from service layers.
"""
from typing import Dict, Any, Optional
import logging
from contextlib import asynccontextmanager
from app.sql import get_postgres_session
from app.sync.sync_service import POSSyncService
from app.customers.models.model import CustomerModel
logger = logging.getLogger(__name__)
class POSSyncHandler:
"""Handler for POS sync operations with automatic session management"""
@staticmethod
@asynccontextmanager
async def get_sync_service():
"""Context manager to get sync service with proper session handling"""
async with get_postgres_session() as pg_session:
yield POSSyncService(pg_session)
@staticmethod
async def handle_customer_created(customer: CustomerModel) -> bool:
"""Handle customer creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_create(customer)
logger.info(f"Successfully synced customer creation: {customer.customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer creation {customer.customer_id}: {e}")
return False
@staticmethod
async def handle_customer_updated(customer_id: str, customer: CustomerModel) -> bool:
"""Handle customer update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_update(customer_id, customer)
logger.info(f"Successfully synced customer update: {customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer update {customer_id}: {e}")
return False
@staticmethod
async def handle_customer_deleted(customer_id: str) -> bool:
"""Handle customer deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_delete(customer_id)
logger.info(f"Successfully synced customer deletion: {customer_id}")
return True
except Exception as e:
logger.error(f"Failed to sync customer deletion {customer_id}: {e}")
return False
@staticmethod
async def handle_customer_status_changed(customer_id: str, status: str) -> bool:
"""Handle customer status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_customer_status(customer_id, status)
logger.info(f"Successfully synced customer status change: {customer_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync customer status change {customer_id}: {e}")
return False
@staticmethod
async def handle_staff_created(staff_data: Dict[str, Any]) -> bool:
"""Handle staff creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_create(staff_data)
logger.info(f"Successfully synced staff creation: {staff_data.get('staff_id')}")
return True
except Exception as e:
logger.error(f"Failed to sync staff creation {staff_data.get('staff_id')}: {e}")
return False
@staticmethod
async def handle_staff_updated(staff_id: str, staff_data: Dict[str, Any]) -> bool:
"""Handle staff update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_update(staff_id, staff_data)
logger.info(f"Successfully synced staff update: {staff_id}")
return True
except Exception as e:
logger.error(f"Failed to sync staff update {staff_id}: {e}")
return False
@staticmethod
async def handle_staff_deleted(staff_id: str) -> bool:
"""Handle staff deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_delete(staff_id)
logger.info(f"Successfully synced staff deletion: {staff_id}")
return True
except Exception as e:
logger.error(f"Failed to sync staff deletion {staff_id}: {e}")
return False
@staticmethod
async def handle_staff_status_changed(staff_id: str, status: str) -> bool:
"""Handle staff status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_staff_status(staff_id, status)
logger.info(f"Successfully synced staff status change: {staff_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync staff status change {staff_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_created(service_data: Dict[str, Any]) -> bool:
"""Handle catalogue service creation event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_create(service_data)
logger.info(f"Successfully synced catalogue service creation: {service_data.get('_id') or service_data.get('service_id')}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service creation {service_data.get('_id') or service_data.get('service_id')}: {e}")
return False
@staticmethod
async def handle_catalogue_service_updated(service_id: str, service_data: Dict[str, Any]) -> bool:
"""Handle catalogue service update event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_update(service_id, service_data)
logger.info(f"Successfully synced catalogue service update: {service_id}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service update {service_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_deleted(service_id: str) -> bool:
"""Handle catalogue service deletion event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_delete(service_id)
logger.info(f"Successfully synced catalogue service deletion: {service_id}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service deletion {service_id}: {e}")
return False
@staticmethod
async def handle_catalogue_service_status_changed(service_id: str, status: str) -> bool:
"""Handle catalogue service status change event"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
await sync_service.sync_catalogue_service_status(service_id, status)
logger.info(f"Successfully synced catalogue service status change: {service_id} -> {status}")
return True
except Exception as e:
logger.error(f"Failed to sync catalogue service status change {service_id}: {e}")
return False
@staticmethod
async def handle_bulk_customer_sync(customers: list) -> Dict[str, Any]:
"""Handle bulk customer sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_customers(customers)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk customer sync completed: {success_count} success, {error_count} errors")
return {
"total": len(customers),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk customer sync: {e}")
return {
"total": len(customers),
"success": 0,
"errors": len(customers),
"error": str(e)
}
@staticmethod
async def handle_bulk_staff_sync(staff_list: list) -> Dict[str, Any]:
"""Handle bulk staff sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_staff(staff_list)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk staff sync completed: {success_count} success, {error_count} errors")
return {
"total": len(staff_list),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk staff sync: {e}")
return {
"total": len(staff_list),
"success": 0,
"errors": len(staff_list),
"error": str(e)
}
@staticmethod
async def handle_bulk_catalogue_service_sync(service_list: list) -> Dict[str, Any]:
"""Handle bulk catalogue service sync"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
results = await sync_service.bulk_sync_catalogue_services(service_list)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
logger.info(f"Bulk catalogue service sync completed: {success_count} success, {error_count} errors")
return {
"total": len(service_list),
"success": success_count,
"errors": error_count,
"results": results
}
except Exception as e:
logger.error(f"Failed bulk catalogue service sync: {e}")
return {
"total": len(service_list),
"success": 0,
"errors": len(service_list),
"error": str(e)
}
@staticmethod
async def check_sync_health() -> Dict[str, Any]:
"""Check sync system health"""
try:
async with POSSyncHandler.get_sync_service() as sync_service:
return await sync_service.check_sync_health()
except Exception as e:
logger.error(f"Failed to check sync health: {e}")
return {
"overall_health": False,
"error": str(e)
} |