cuatrolabs-pos-ms / app /sync /sync_service.py
MukeshKapoor25's picture
feat(logging): implement structured logging across services
1988d77
"""
Main synchronization service for POS MongoDB to PostgreSQL sync.
Coordinates all sync operations across different entities.
"""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, Any, Optional
from datetime import datetime
from app.core.logging import get_logger
from app.sync.customers.sync_service import CustomerSyncService
from app.sync.staff.sync_service import StaffSyncService
from app.sync.catalogue_services.sync_service import CatalogueServiceSyncService
from app.customers.models.model import CustomerModel
logger = get_logger(__name__)
class POSSyncService:
"""Main service to coordinate all POS sync operations"""
def __init__(self, pg_session: AsyncSession):
self.pg_session = pg_session
self.customer_sync = CustomerSyncService(pg_session)
self.staff_sync = StaffSyncService(pg_session)
self.catalogue_service_sync = CatalogueServiceSyncService(pg_session)
# Customer sync operations
async def sync_customer_create(self, customer: CustomerModel):
"""Sync customer creation to PostgreSQL"""
try:
return await self.customer_sync.create_customer_ref(customer)
except Exception as e:
logger.error(
f"Error syncing customer creation: {e}",
extra={
"operation": "sync_customer_create",
"customer_id": customer.customer_id,
"merchant_id": customer.merchant_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_customer_update(self, customer_id: str, customer: CustomerModel):
"""Sync customer update to PostgreSQL"""
try:
return await self.customer_sync.update_customer_ref(customer_id, customer)
except Exception as e:
logger.error(
f"Error syncing customer update: {e}",
extra={
"operation": "sync_customer_update",
"customer_id": customer_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_customer_delete(self, customer_id: str):
"""Sync customer deletion to PostgreSQL"""
try:
return await self.customer_sync.delete_customer_ref(customer_id)
except Exception as e:
logger.error(
f"Error syncing customer deletion: {e}",
extra={
"operation": "sync_customer_delete",
"customer_id": customer_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_customer_status(self, customer_id: str, status: str):
"""Sync customer status change to PostgreSQL"""
try:
return await self.customer_sync.sync_customer_status(customer_id, status)
except Exception as e:
logger.error(
f"Error syncing customer status: {e}",
extra={
"operation": "sync_customer_status",
"customer_id": customer_id,
"status": status,
"error": str(e)
},
exc_info=True
)
raise
# Staff sync operations
async def sync_staff_create(self, staff_data: Dict[str, Any]):
"""Sync staff creation to PostgreSQL"""
try:
return await self.staff_sync.create_staff_ref(staff_data)
except Exception as e:
logger.error(
f"Error syncing staff creation: {e}",
extra={
"operation": "sync_staff_create",
"staff_id": staff_data.get('staff_id'),
"merchant_id": staff_data.get('merchant_id'),
"error": str(e)
},
exc_info=True
)
raise
async def sync_staff_update(self, staff_id: str, staff_data: Dict[str, Any]):
"""Sync staff update to PostgreSQL"""
try:
return await self.staff_sync.update_staff_ref(staff_id, staff_data)
except Exception as e:
logger.error(
f"Error syncing staff update: {e}",
extra={
"operation": "sync_staff_update",
"staff_id": staff_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_staff_delete(self, staff_id: str):
"""Sync staff deletion to PostgreSQL"""
try:
return await self.staff_sync.delete_staff_ref(staff_id)
except Exception as e:
logger.error(
f"Error syncing staff deletion: {e}",
extra={
"operation": "sync_staff_delete",
"staff_id": staff_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_staff_status(self, staff_id: str, status: str):
"""Sync staff status change to PostgreSQL"""
try:
return await self.staff_sync.sync_staff_status(staff_id, status)
except Exception as e:
logger.error(
f"Error syncing staff status: {e}",
extra={
"operation": "sync_staff_status",
"staff_id": staff_id,
"status": status,
"error": str(e)
},
exc_info=True
)
raise
# Catalogue service sync operations
async def sync_catalogue_service_create(self, service_data: Dict[str, Any]):
"""Sync catalogue service creation to PostgreSQL"""
try:
return await self.catalogue_service_sync.create_catalogue_service_ref(service_data)
except Exception as e:
logger.error(
f"Error syncing catalogue service creation: {e}",
extra={
"operation": "sync_catalogue_service_create",
"service_id": service_data.get('_id') or service_data.get('service_id'),
"merchant_id": service_data.get('merchant_id'),
"error": str(e)
},
exc_info=True
)
raise
async def sync_catalogue_service_update(self, service_id: str, service_data: Dict[str, Any]):
"""Sync catalogue service update to PostgreSQL"""
try:
return await self.catalogue_service_sync.update_catalogue_service_ref(service_id, service_data)
except Exception as e:
logger.error(
f"Error syncing catalogue service update: {e}",
extra={
"operation": "sync_catalogue_service_update",
"service_id": service_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_catalogue_service_delete(self, service_id: str):
"""Sync catalogue service deletion to PostgreSQL"""
try:
return await self.catalogue_service_sync.delete_catalogue_service_ref(service_id)
except Exception as e:
logger.error(
f"Error syncing catalogue service deletion: {e}",
extra={
"operation": "sync_catalogue_service_delete",
"service_id": service_id,
"error": str(e)
},
exc_info=True
)
raise
async def sync_catalogue_service_status(self, service_id: str, status: str):
"""Sync catalogue service status change to PostgreSQL"""
try:
return await self.catalogue_service_sync.sync_catalogue_service_status(service_id, status)
except Exception as e:
logger.error(
f"Error syncing catalogue service status: {e}",
extra={
"operation": "sync_catalogue_service_status",
"service_id": service_id,
"status": status,
"error": str(e)
},
exc_info=True
)
raise
# Bulk sync operations
async def bulk_sync_customers(self, customers: list):
"""Bulk sync multiple customers"""
results = []
for customer in customers:
try:
result = await self.customer_sync.create_customer_ref(customer)
results.append({"customer_id": customer.customer_id, "status": "success", "result": result})
except Exception as e:
logger.error(
f"Error bulk syncing customer {customer.customer_id}: {e}",
extra={
"operation": "bulk_sync_customers",
"customer_id": customer.customer_id,
"merchant_id": customer.merchant_id,
"error": str(e)
},
exc_info=True
)
results.append({"customer_id": customer.customer_id, "status": "error", "error": str(e)})
return results
async def bulk_sync_staff(self, staff_list: list):
"""Bulk sync multiple staff members"""
results = []
for staff_data in staff_list:
try:
result = await self.staff_sync.create_staff_ref(staff_data)
results.append({"staff_id": staff_data.get('staff_id'), "status": "success", "result": result})
except Exception as e:
logger.error(
f"Error bulk syncing staff {staff_data.get('staff_id')}: {e}",
extra={
"operation": "bulk_sync_staff",
"staff_id": staff_data.get('staff_id'),
"merchant_id": staff_data.get('merchant_id'),
"error": str(e)
},
exc_info=True
)
results.append({"staff_id": staff_data.get('staff_id'), "status": "error", "error": str(e)})
return results
async def bulk_sync_catalogue_services(self, service_list: list):
"""Bulk sync multiple catalogue services"""
results = []
for service_data in service_list:
try:
result = await self.catalogue_service_sync.create_catalogue_service_ref(service_data)
results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "success", "result": result})
except Exception as e:
logger.error(
f"Error bulk syncing catalogue service {service_data.get('_id') or service_data.get('service_id')}: {e}",
extra={
"operation": "bulk_sync_catalogue_services",
"service_id": service_data.get('_id') or service_data.get('service_id'),
"merchant_id": service_data.get('merchant_id'),
"error": str(e)
},
exc_info=True
)
results.append({"service_id": service_data.get('_id') or service_data.get('service_id'), "status": "error", "error": str(e)})
return results
# Health check operations
async def check_sync_health(self) -> Dict[str, Any]:
"""Check the health of sync operations"""
try:
# Test customer sync
customer_health = True
try:
await self.customer_sync._ensure_customer_ref_schema()
except Exception as e:
customer_health = False
logger.error(
f"Customer sync health check failed: {e}",
extra={"operation": "check_sync_health", "component": "customer_sync", "error": str(e)},
exc_info=True
)
# Test staff sync
staff_health = True
try:
await self.staff_sync._ensure_staff_ref_schema()
except Exception as e:
staff_health = False
logger.error(
f"Staff sync health check failed: {e}",
extra={"operation": "check_sync_health", "component": "staff_sync", "error": str(e)},
exc_info=True
)
# Test catalogue service sync
catalogue_service_health = True
try:
await self.catalogue_service_sync._ensure_catalogue_service_ref_schema()
except Exception as e:
catalogue_service_health = False
logger.error(
f"Catalogue service sync health check failed: {e}",
extra={"operation": "check_sync_health", "component": "catalogue_service_sync", "error": str(e)},
exc_info=True
)
logger.info("Sync health check completed", extra={"operation": "check_sync_health"})
return {
"overall_health": customer_health and staff_health and catalogue_service_health,
"customer_sync": customer_health,
"staff_sync": staff_health,
"catalogue_service_sync": catalogue_service_health,
"timestamp": datetime.utcnow()
}
except Exception as e:
logger.error(
f"Error checking sync health: {e}",
extra={"operation": "check_sync_health", "error": str(e)},
exc_info=True
)
return {
"overall_health": False,
"error": str(e)
}