MukeshKapoor25's picture
feat(sync-worker-ms): add delete_from_postgres method to merchant handler
a2386f0
"""
Merchant sync handler for MongoDB to PostgreSQL synchronization.
"""
from typing import Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncConnection
from sqlalchemy import text
from motor.motor_asyncio import AsyncIOMotorDatabase
from datetime import datetime
from dateutil import parser as date_parser
from app.core.logging import get_logger
from app.sync.common.handler import SyncHandler
from app.sync.merchants.models import MERCHANT_FIELD_MAPPING, MERCHANT_REQUIRED_FIELDS
from app.constants import SCM_MERCHANTS_COLLECTION
logger = get_logger(__name__)
class MerchantSyncHandler(SyncHandler):
"""
Handler for syncing merchant data from MongoDB to PostgreSQL.
Implements entity-specific logic for merchant synchronization including
field mapping, validation, and upsert operations.
"""
def __init__(self):
super().__init__(entity_type="merchant")
async def fetch_from_mongodb(
self,
entity_id: str,
mongo_db: Any
) -> Optional[Dict[str, Any]]:
"""
Fetch merchant from MongoDB by merchant_id.
Args:
entity_id: merchant_id to fetch
mongo_db: MongoDB database instance
Returns:
Merchant document or None if not found
"""
try:
collection = mongo_db[SCM_MERCHANTS_COLLECTION]
merchant = await collection.find_one({"merchant_id": entity_id})
return merchant
except Exception as e:
logger.error(
"Error fetching merchant from MongoDB",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
raise
def get_field_mapping(self) -> Dict[str, str]:
"""
Get field mapping from MongoDB to PostgreSQL.
Returns:
Dictionary mapping MongoDB field names to PostgreSQL column names
"""
return MERCHANT_FIELD_MAPPING
def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
"""
Validate that all required fields are present in merchant document.
Required fields: merchant_id, merchant_code, merchant_type, status
Args:
entity: Merchant document from MongoDB
Returns:
True if all required fields present, False otherwise
"""
missing_fields = []
for field in MERCHANT_REQUIRED_FIELDS:
if field not in entity or entity[field] is None:
missing_fields.append(field)
if missing_fields:
logger.error(
"Merchant missing required fields",
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("merchant_id"),
"missing_fields": missing_fields
}
)
return False
return True
def transform_field_value(self, field_name: str, value: Any) -> Any:
"""
Transform field value for PostgreSQL.
Handles type conversions and nested field extraction.
Args:
field_name: Name of the field
value: Value from MongoDB
Returns:
Transformed value for PostgreSQL
"""
if value is None:
return None
# Convert merchant_type enum to string if needed
if field_name == "merchant_type" and hasattr(value, 'value'):
return value.value
# Convert status enum to string if needed
if field_name == "status" and hasattr(value, 'value'):
return value.value
# Handle datetime objects - convert ISO strings to datetime
if field_name in ["created_at", "updated_at", "synced_at"]:
if isinstance(value, str):
try:
return date_parser.isoparse(value)
except Exception as e:
logger.warning(
f"Failed to parse datetime field {field_name}: {value}",
exc_info=e
)
return None
elif isinstance(value, datetime):
return value
return value
def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract nested fields from merchant document.
Extracts city, state, and gst_number from nested structures:
- city and state from contact.city and contact.state
- gst_number from kyc.gst_number
- Ensures created_at/updated_at have default values
Args:
entity: Merchant document from MongoDB
Returns:
Dictionary with flattened fields
"""
flattened = {}
# Extract city and state from contact
if "contact" in entity and entity["contact"]:
contact = entity["contact"]
flattened["city"] = contact.get("city")
flattened["state"] = contact.get("state")
else:
flattened["city"] = None
flattened["state"] = None
# Extract gst_number from kyc
if "kyc" in entity and entity["kyc"]:
kyc = entity["kyc"]
flattened["gst_number"] = kyc.get("gst_number")
else:
flattened["gst_number"] = None
# Ensure timestamps have default values if missing
now = datetime.utcnow()
if "created_at" not in entity or entity["created_at"] is None:
flattened["created_at"] = now
if "updated_at" not in entity or entity["updated_at"] is None:
flattened["updated_at"] = now
return flattened
async def upsert_to_postgres(
self,
entity: Dict[str, Any],
pg_conn: AsyncConnection
) -> bool:
"""
Upsert merchant to PostgreSQL trans.merchants_ref table.
Performs timestamp-based conflict resolution:
- If record doesn't exist, insert
- If MongoDB updated_at >= PostgreSQL updated_at, update
- Otherwise, skip update
Args:
entity: Merchant document from MongoDB
pg_conn: PostgreSQL connection
Returns:
True if upsert successful, False otherwise
"""
try:
merchant_id = entity["merchant_id"]
updated_at = entity.get("updated_at") or entity.get("created_at")
# Check timestamp conflict
should_update = await self.check_timestamp_conflict(
entity_id=merchant_id,
mongo_updated_at=updated_at,
pg_conn=pg_conn,
table_name="trans.merchants_ref",
id_column="merchant_id"
)
if not should_update:
logger.debug(
"Skipping merchant sync due to timestamp conflict",
extra={
"entity_type": self.entity_type,
"entity_id": merchant_id,
"mongo_updated_at": updated_at
}
)
return True # Not an error, just skipped
# Extract nested fields
nested_fields = self.extract_nested_fields(entity)
# Merge nested fields into entity for mapping
entity_with_nested = {**entity, **nested_fields}
# Map fields
mapped_entity = self.map_fields(entity_with_nested)
# Build UPSERT query
columns = list(mapped_entity.keys())
placeholders = [f":{col}" for col in columns]
# Build UPDATE clause (exclude primary key)
update_columns = [col for col in columns if col != "merchant_id"]
update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
query = text(f"""
INSERT INTO trans.merchants_ref ({', '.join(columns)})
VALUES ({', '.join(placeholders)})
ON CONFLICT (merchant_id)
DO UPDATE SET {update_clause}
""")
await pg_conn.execute(query, mapped_entity)
logger.debug(
"Merchant upserted to PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": merchant_id
}
)
return True
except Exception as e:
logger.error(
"Error upserting merchant to PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("merchant_id"),
"error": str(e)
}
)
raise
async def delete_from_postgres(
self,
entity_id: str,
pg_conn: AsyncConnection
) -> bool:
"""
Delete merchant from PostgreSQL trans.merchants_ref table.
Args:
entity_id: merchant_id to delete
pg_conn: PostgreSQL connection
Returns:
True if delete successful, False otherwise
"""
try:
merchant_id = entity_id
# Delete from PostgreSQL
query = text("DELETE FROM trans.merchants_ref WHERE merchant_id = :id")
result = await pg_conn.execute(query, {"id": merchant_id})
rows_deleted = result.rowcount
if rows_deleted > 0:
logger.info(
f"Merchant deleted from PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": merchant_id,
"rows_deleted": rows_deleted
}
)
return True
else:
logger.warning(
f"Merchant not found in PostgreSQL for deletion",
extra={
"entity_type": self.entity_type,
"entity_id": merchant_id
}
)
return True
except Exception as e:
logger.error(
"Error deleting merchant from PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"error": str(e)
}
)
raise