from abc import ABC, abstractmethod from typing import Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy import text from motor.motor_asyncio import AsyncIOMotorDatabase from datetime import datetime from dateutil import parser as date_parser from app.core.logging import get_logger logger = get_logger(__name__) class SyncHandler(ABC): """ Abstract base class for entity-specific sync handlers. Provides common sync logic and defines interface for entity-specific implementations. """ def __init__(self, entity_type: str): """ Initialize sync handler. Args: entity_type: Type of entity this handler syncs """ self.entity_type = entity_type @abstractmethod async def fetch_from_mongodb( self, entity_id: str, mongo_db: AsyncIOMotorDatabase ) -> Optional[Dict[str, Any]]: """ Fetch entity from MongoDB. Args: entity_id: ID of entity to fetch mongo_db: MongoDB database instance Returns: Entity document or None if not found """ pass @abstractmethod def get_field_mapping(self) -> Dict[str, str]: """ Get field mapping from MongoDB to PostgreSQL. Returns: Dictionary mapping MongoDB field names to PostgreSQL column names """ pass @abstractmethod def validate_required_fields(self, entity: Dict[str, Any]) -> bool: """ Validate that all required fields are present. Args: entity: Entity document from MongoDB Returns: True if all required fields present, False otherwise """ pass @abstractmethod def transform_field_value(self, field_name: str, value: Any) -> Any: """ Transform field value for PostgreSQL. Args: field_name: Name of the field value: Value from MongoDB Returns: Transformed value for PostgreSQL """ pass @abstractmethod async def upsert_to_postgres( self, entity: Dict[str, Any], pg_conn: AsyncConnection ) -> bool: """ Upsert entity to PostgreSQL. Args: entity: Entity document from MongoDB pg_conn: PostgreSQL connection Returns: True if upsert successful, False otherwise """ pass async def sync( self, entity_id: str, mongo_db: AsyncIOMotorDatabase, pg_conn: AsyncConnection ) -> bool: """ Sync entity from MongoDB to PostgreSQL. This is the main sync method that orchestrates the sync process: 1. Fetch entity from MongoDB 2. Validate required fields 3. Transform fields 4. Upsert to PostgreSQL Args: entity_id: ID of entity to sync mongo_db: MongoDB database instance pg_conn: PostgreSQL connection Returns: True if sync successful, False otherwise """ try: # Fetch from MongoDB entity = await self.fetch_from_mongodb(entity_id, mongo_db) if entity is None: logger.warning( f"{self.entity_type} not found in MongoDB", extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) return False # Validate required fields if not self.validate_required_fields(entity): logger.error( f"{self.entity_type} missing required fields", extra={ "entity_type": self.entity_type, "entity_id": entity_id, "entity": entity } ) return False # Upsert to PostgreSQL success = await self.upsert_to_postgres(entity, pg_conn) if success: logger.debug( f"{self.entity_type} synced successfully", extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) return success except Exception as e: logger.error( f"Error syncing {self.entity_type}", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id, "error": str(e) } ) raise def map_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: """ Map and transform fields from MongoDB to PostgreSQL format. Args: entity: Entity document from MongoDB Returns: Dictionary with PostgreSQL column names and transformed values """ field_mapping = self.get_field_mapping() mapped_entity = {} for mongo_field, pg_column in field_mapping.items(): value = entity.get(mongo_field) # Transform the value if needed if value is not None: value = self.transform_field_value(mongo_field, value) mapped_entity[pg_column] = value return mapped_entity async def check_timestamp_conflict( self, entity_id: str, mongo_updated_at: datetime, pg_conn: AsyncConnection, table_name: str, id_column: str ) -> bool: """ Check if MongoDB timestamp is newer than PostgreSQL timestamp. Args: entity_id: ID of entity mongo_updated_at: Updated timestamp from MongoDB (can be string or datetime) pg_conn: PostgreSQL connection table_name: PostgreSQL table name id_column: Primary key column name Returns: True if should update (MongoDB is newer or record doesn't exist), False otherwise """ try: # Convert mongo_updated_at to datetime if it's a string if isinstance(mongo_updated_at, str): try: mongo_updated_at = date_parser.isoparse(mongo_updated_at) except Exception as e: logger.warning( f"Failed to parse MongoDB timestamp: {mongo_updated_at}", exc_info=e ) return True # Proceed with update on parse error # If mongo_updated_at is None, proceed with update if mongo_updated_at is None: return True # Make sure mongo_updated_at is timezone-naive (remove tzinfo if present) if hasattr(mongo_updated_at, 'tzinfo') and mongo_updated_at.tzinfo is not None: mongo_updated_at = mongo_updated_at.replace(tzinfo=None) # Replaced $1 with :id and fetchval with execute().scalar() query = text(f"SELECT updated_at FROM {table_name} WHERE {id_column} = :id") result = await pg_conn.execute(query, {"id": entity_id}) pg_updated_at = result.scalar() if pg_updated_at is None: # Record doesn't exist, should insert return True # Make sure pg_updated_at is timezone-naive (remove tzinfo if present) if hasattr(pg_updated_at, 'tzinfo') and pg_updated_at.tzinfo is not None: pg_updated_at = pg_updated_at.replace(tzinfo=None) # Compare timestamps return mongo_updated_at >= pg_updated_at except Exception as e: logger.warning( "Error checking timestamp conflict, proceeding with update", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) # On error, proceed with update return True