Spaces:
Runtime error
Runtime error
| """ | |
| Merchant sync handler for MongoDB to PostgreSQL synchronization. | |
| """ | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy.ext.asyncio import AsyncConnection | |
| from sqlalchemy import text | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from datetime import datetime | |
| from dateutil import parser as date_parser | |
| from app.core.logging import get_logger | |
| from app.sync.common.handler import SyncHandler | |
| from app.sync.merchants.models import MERCHANT_FIELD_MAPPING, MERCHANT_REQUIRED_FIELDS | |
| from app.constants import SCM_MERCHANTS_COLLECTION | |
| logger = get_logger(__name__) | |
| class MerchantSyncHandler(SyncHandler): | |
| """ | |
| Handler for syncing merchant data from MongoDB to PostgreSQL. | |
| Implements entity-specific logic for merchant synchronization including | |
| field mapping, validation, and upsert operations. | |
| """ | |
| def __init__(self): | |
| super().__init__(entity_type="merchant") | |
| async def fetch_from_mongodb( | |
| self, | |
| entity_id: str, | |
| mongo_db: Any | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch merchant from MongoDB by merchant_id. | |
| Args: | |
| entity_id: merchant_id to fetch | |
| mongo_db: MongoDB database instance | |
| Returns: | |
| Merchant document or None if not found | |
| """ | |
| try: | |
| collection = mongo_db[SCM_MERCHANTS_COLLECTION] | |
| merchant = await collection.find_one({"merchant_id": entity_id}) | |
| return merchant | |
| except Exception as e: | |
| logger.error( | |
| "Error fetching merchant from MongoDB", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| raise | |
| def get_field_mapping(self) -> Dict[str, str]: | |
| """ | |
| Get field mapping from MongoDB to PostgreSQL. | |
| Returns: | |
| Dictionary mapping MongoDB field names to PostgreSQL column names | |
| """ | |
| return MERCHANT_FIELD_MAPPING | |
| def validate_required_fields(self, entity: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that all required fields are present in merchant document. | |
| Required fields: merchant_id, merchant_code, merchant_type, status | |
| Args: | |
| entity: Merchant document from MongoDB | |
| Returns: | |
| True if all required fields present, False otherwise | |
| """ | |
| missing_fields = [] | |
| for field in MERCHANT_REQUIRED_FIELDS: | |
| if field not in entity or entity[field] is None: | |
| missing_fields.append(field) | |
| if missing_fields: | |
| logger.error( | |
| "Merchant missing required fields", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("merchant_id"), | |
| "missing_fields": missing_fields | |
| } | |
| ) | |
| return False | |
| return True | |
| def transform_field_value(self, field_name: str, value: Any) -> Any: | |
| """ | |
| Transform field value for PostgreSQL. | |
| Handles type conversions and nested field extraction. | |
| Args: | |
| field_name: Name of the field | |
| value: Value from MongoDB | |
| Returns: | |
| Transformed value for PostgreSQL | |
| """ | |
| if value is None: | |
| return None | |
| # Convert merchant_type enum to string if needed | |
| if field_name == "merchant_type" and hasattr(value, 'value'): | |
| return value.value | |
| # Convert status enum to string if needed | |
| if field_name == "status" and hasattr(value, 'value'): | |
| return value.value | |
| # Handle datetime objects - convert ISO strings to datetime | |
| if field_name in ["created_at", "updated_at", "synced_at"]: | |
| if isinstance(value, str): | |
| try: | |
| return date_parser.isoparse(value) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to parse datetime field {field_name}: {value}", | |
| exc_info=e | |
| ) | |
| return None | |
| elif isinstance(value, datetime): | |
| return value | |
| return value | |
| def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Extract nested fields from merchant document. | |
| Extracts city, state, and gst_number from nested structures: | |
| - city and state from contact.city and contact.state | |
| - gst_number from kyc.gst_number | |
| - Ensures created_at/updated_at have default values | |
| Args: | |
| entity: Merchant document from MongoDB | |
| Returns: | |
| Dictionary with flattened fields | |
| """ | |
| flattened = {} | |
| # Extract city and state from contact | |
| if "contact" in entity and entity["contact"]: | |
| contact = entity["contact"] | |
| flattened["city"] = contact.get("city") | |
| flattened["state"] = contact.get("state") | |
| else: | |
| flattened["city"] = None | |
| flattened["state"] = None | |
| # Extract gst_number from kyc | |
| if "kyc" in entity and entity["kyc"]: | |
| kyc = entity["kyc"] | |
| flattened["gst_number"] = kyc.get("gst_number") | |
| else: | |
| flattened["gst_number"] = None | |
| # Ensure timestamps have default values if missing | |
| now = datetime.utcnow() | |
| if "created_at" not in entity or entity["created_at"] is None: | |
| flattened["created_at"] = now | |
| if "updated_at" not in entity or entity["updated_at"] is None: | |
| flattened["updated_at"] = now | |
| return flattened | |
| async def upsert_to_postgres( | |
| self, | |
| entity: Dict[str, Any], | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Upsert merchant to PostgreSQL trans.merchants_ref table. | |
| Performs timestamp-based conflict resolution: | |
| - If record doesn't exist, insert | |
| - If MongoDB updated_at >= PostgreSQL updated_at, update | |
| - Otherwise, skip update | |
| Args: | |
| entity: Merchant document from MongoDB | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if upsert successful, False otherwise | |
| """ | |
| try: | |
| merchant_id = entity["merchant_id"] | |
| updated_at = entity.get("updated_at") or entity.get("created_at") | |
| # Check timestamp conflict | |
| should_update = await self.check_timestamp_conflict( | |
| entity_id=merchant_id, | |
| mongo_updated_at=updated_at, | |
| pg_conn=pg_conn, | |
| table_name="trans.merchants_ref", | |
| id_column="merchant_id" | |
| ) | |
| if not should_update: | |
| logger.debug( | |
| "Skipping merchant sync due to timestamp conflict", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": merchant_id, | |
| "mongo_updated_at": updated_at | |
| } | |
| ) | |
| return True # Not an error, just skipped | |
| # Extract nested fields | |
| nested_fields = self.extract_nested_fields(entity) | |
| # Merge nested fields into entity for mapping | |
| entity_with_nested = {**entity, **nested_fields} | |
| # Map fields | |
| mapped_entity = self.map_fields(entity_with_nested) | |
| # Build UPSERT query | |
| columns = list(mapped_entity.keys()) | |
| placeholders = [f":{col}" for col in columns] | |
| # Build UPDATE clause (exclude primary key) | |
| update_columns = [col for col in columns if col != "merchant_id"] | |
| update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns]) | |
| query = text(f""" | |
| INSERT INTO trans.merchants_ref ({', '.join(columns)}) | |
| VALUES ({', '.join(placeholders)}) | |
| ON CONFLICT (merchant_id) | |
| DO UPDATE SET {update_clause} | |
| """) | |
| await pg_conn.execute(query, mapped_entity) | |
| logger.debug( | |
| "Merchant upserted to PostgreSQL", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": merchant_id | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error upserting merchant to PostgreSQL", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("merchant_id"), | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |
| async def delete_from_postgres( | |
| self, | |
| entity_id: str, | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Delete merchant from PostgreSQL trans.merchants_ref table. | |
| Args: | |
| entity_id: merchant_id to delete | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if delete successful, False otherwise | |
| """ | |
| try: | |
| merchant_id = entity_id | |
| # Delete from PostgreSQL | |
| query = text("DELETE FROM trans.merchants_ref WHERE merchant_id = :id") | |
| result = await pg_conn.execute(query, {"id": merchant_id}) | |
| rows_deleted = result.rowcount | |
| if rows_deleted > 0: | |
| logger.info( | |
| f"Merchant deleted from PostgreSQL", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": merchant_id, | |
| "rows_deleted": rows_deleted | |
| } | |
| ) | |
| return True | |
| else: | |
| logger.warning( | |
| f"Merchant not found in PostgreSQL for deletion", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": merchant_id | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error deleting merchant from PostgreSQL", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |