""" Merchant sync handler for MongoDB to PostgreSQL synchronization. """ from typing import Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy import text from motor.motor_asyncio import AsyncIOMotorDatabase from datetime import datetime from dateutil import parser as date_parser from app.core.logging import get_logger from app.sync.common.handler import SyncHandler from app.sync.merchants.models import MERCHANT_FIELD_MAPPING, MERCHANT_REQUIRED_FIELDS from app.constants import SCM_MERCHANTS_COLLECTION logger = get_logger(__name__) class MerchantSyncHandler(SyncHandler): """ Handler for syncing merchant data from MongoDB to PostgreSQL. Implements entity-specific logic for merchant synchronization including field mapping, validation, and upsert operations. """ def __init__(self): super().__init__(entity_type="merchant") async def fetch_from_mongodb( self, entity_id: str, mongo_db: Any ) -> Optional[Dict[str, Any]]: """ Fetch merchant from MongoDB by merchant_id. Args: entity_id: merchant_id to fetch mongo_db: MongoDB database instance Returns: Merchant document or None if not found """ try: collection = mongo_db[SCM_MERCHANTS_COLLECTION] merchant = await collection.find_one({"merchant_id": entity_id}) return merchant except Exception as e: logger.error( "Error fetching merchant from MongoDB", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) raise def get_field_mapping(self) -> Dict[str, str]: """ Get field mapping from MongoDB to PostgreSQL. Returns: Dictionary mapping MongoDB field names to PostgreSQL column names """ return MERCHANT_FIELD_MAPPING def validate_required_fields(self, entity: Dict[str, Any]) -> bool: """ Validate that all required fields are present in merchant document. Required fields: merchant_id, merchant_code, merchant_type, status Args: entity: Merchant document from MongoDB Returns: True if all required fields present, False otherwise """ missing_fields = [] for field in MERCHANT_REQUIRED_FIELDS: if field not in entity or entity[field] is None: missing_fields.append(field) if missing_fields: logger.error( "Merchant missing required fields", extra={ "entity_type": self.entity_type, "entity_id": entity.get("merchant_id"), "missing_fields": missing_fields } ) return False return True def transform_field_value(self, field_name: str, value: Any) -> Any: """ Transform field value for PostgreSQL. Handles type conversions and nested field extraction. Args: field_name: Name of the field value: Value from MongoDB Returns: Transformed value for PostgreSQL """ if value is None: return None # Convert merchant_type enum to string if needed if field_name == "merchant_type" and hasattr(value, 'value'): return value.value # Convert status enum to string if needed if field_name == "status" and hasattr(value, 'value'): return value.value # Handle datetime objects - convert ISO strings to datetime if field_name in ["created_at", "updated_at", "synced_at"]: if isinstance(value, str): try: return date_parser.isoparse(value) except Exception as e: logger.warning( f"Failed to parse datetime field {field_name}: {value}", exc_info=e ) return None elif isinstance(value, datetime): return value return value def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: """ Extract nested fields from merchant document. Extracts city, state, and gst_number from nested structures: - city and state from contact.city and contact.state - gst_number from kyc.gst_number - Ensures created_at/updated_at have default values Args: entity: Merchant document from MongoDB Returns: Dictionary with flattened fields """ flattened = {} # Extract city and state from contact if "contact" in entity and entity["contact"]: contact = entity["contact"] flattened["city"] = contact.get("city") flattened["state"] = contact.get("state") else: flattened["city"] = None flattened["state"] = None # Extract gst_number from kyc if "kyc" in entity and entity["kyc"]: kyc = entity["kyc"] flattened["gst_number"] = kyc.get("gst_number") else: flattened["gst_number"] = None # Ensure timestamps have default values if missing now = datetime.utcnow() if "created_at" not in entity or entity["created_at"] is None: flattened["created_at"] = now if "updated_at" not in entity or entity["updated_at"] is None: flattened["updated_at"] = now return flattened async def upsert_to_postgres( self, entity: Dict[str, Any], pg_conn: AsyncConnection ) -> bool: """ Upsert merchant to PostgreSQL trans.merchants_ref table. Performs timestamp-based conflict resolution: - If record doesn't exist, insert - If MongoDB updated_at >= PostgreSQL updated_at, update - Otherwise, skip update Args: entity: Merchant document from MongoDB pg_conn: PostgreSQL connection Returns: True if upsert successful, False otherwise """ try: merchant_id = entity["merchant_id"] updated_at = entity.get("updated_at") or entity.get("created_at") # Check timestamp conflict should_update = await self.check_timestamp_conflict( entity_id=merchant_id, mongo_updated_at=updated_at, pg_conn=pg_conn, table_name="trans.merchants_ref", id_column="merchant_id" ) if not should_update: logger.debug( "Skipping merchant sync due to timestamp conflict", extra={ "entity_type": self.entity_type, "entity_id": merchant_id, "mongo_updated_at": updated_at } ) return True # Not an error, just skipped # Extract nested fields nested_fields = self.extract_nested_fields(entity) # Merge nested fields into entity for mapping entity_with_nested = {**entity, **nested_fields} # Map fields mapped_entity = self.map_fields(entity_with_nested) # Build UPSERT query columns = list(mapped_entity.keys()) placeholders = [f":{col}" for col in columns] # Build UPDATE clause (exclude primary key) update_columns = [col for col in columns if col != "merchant_id"] update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns]) query = text(f""" INSERT INTO trans.merchants_ref ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) ON CONFLICT (merchant_id) DO UPDATE SET {update_clause} """) await pg_conn.execute(query, mapped_entity) logger.debug( "Merchant upserted to PostgreSQL", extra={ "entity_type": self.entity_type, "entity_id": merchant_id } ) return True except Exception as e: logger.error( "Error upserting merchant to PostgreSQL", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity.get("merchant_id"), "error": str(e) } ) raise async def delete_from_postgres( self, entity_id: str, pg_conn: AsyncConnection ) -> bool: """ Delete merchant from PostgreSQL trans.merchants_ref table. Args: entity_id: merchant_id to delete pg_conn: PostgreSQL connection Returns: True if delete successful, False otherwise """ try: merchant_id = entity_id # Delete from PostgreSQL query = text("DELETE FROM trans.merchants_ref WHERE merchant_id = :id") result = await pg_conn.execute(query, {"id": merchant_id}) rows_deleted = result.rowcount if rows_deleted > 0: logger.info( f"Merchant deleted from PostgreSQL", extra={ "entity_type": self.entity_type, "entity_id": merchant_id, "rows_deleted": rows_deleted } ) return True else: logger.warning( f"Merchant not found in PostgreSQL for deletion", extra={ "entity_type": self.entity_type, "entity_id": merchant_id } ) return True except Exception as e: logger.error( "Error deleting merchant from PostgreSQL", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id, "error": str(e) } ) raise