Spaces:
Running
Running
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy.ext.asyncio import AsyncConnection | |
| from sqlalchemy import text | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from datetime import datetime | |
| from dateutil import parser as date_parser | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| class SyncHandler(ABC): | |
| """ | |
| Abstract base class for entity-specific sync handlers. | |
| Provides common sync logic and defines interface for entity-specific implementations. | |
| """ | |
| def __init__(self, entity_type: str): | |
| """ | |
| Initialize sync handler. | |
| Args: | |
| entity_type: Type of entity this handler syncs | |
| """ | |
| self.entity_type = entity_type | |
| async def fetch_from_mongodb( | |
| self, | |
| entity_id: str, | |
| mongo_db: AsyncIOMotorDatabase | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch entity from MongoDB. | |
| Args: | |
| entity_id: ID of entity to fetch | |
| mongo_db: MongoDB database instance | |
| Returns: | |
| Entity document or None if not found | |
| """ | |
| pass | |
| def get_field_mapping(self) -> Dict[str, str]: | |
| """ | |
| Get field mapping from MongoDB to PostgreSQL. | |
| Returns: | |
| Dictionary mapping MongoDB field names to PostgreSQL column names | |
| """ | |
| pass | |
| def validate_required_fields(self, entity: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that all required fields are present. | |
| Args: | |
| entity: Entity document from MongoDB | |
| Returns: | |
| True if all required fields present, False otherwise | |
| """ | |
| pass | |
| def transform_field_value(self, field_name: str, value: Any) -> Any: | |
| """ | |
| Transform field value for PostgreSQL. | |
| Args: | |
| field_name: Name of the field | |
| value: Value from MongoDB | |
| Returns: | |
| Transformed value for PostgreSQL | |
| """ | |
| pass | |
| async def upsert_to_postgres( | |
| self, | |
| entity: Dict[str, Any], | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Upsert entity to PostgreSQL. | |
| Args: | |
| entity: Entity document from MongoDB | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if upsert successful, False otherwise | |
| """ | |
| pass | |
| async def sync( | |
| self, | |
| entity_id: str, | |
| mongo_db: AsyncIOMotorDatabase, | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Sync entity from MongoDB to PostgreSQL. | |
| This is the main sync method that orchestrates the sync process: | |
| 1. Fetch entity from MongoDB | |
| 2. Validate required fields | |
| 3. Transform fields | |
| 4. Upsert to PostgreSQL | |
| Args: | |
| entity_id: ID of entity to sync | |
| mongo_db: MongoDB database instance | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if sync successful, False otherwise | |
| """ | |
| try: | |
| # Fetch from MongoDB | |
| entity = await self.fetch_from_mongodb(entity_id, mongo_db) | |
| if entity is None: | |
| logger.warning( | |
| f"{self.entity_type} not found in MongoDB", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| return False | |
| # Validate required fields | |
| if not self.validate_required_fields(entity): | |
| logger.error( | |
| f"{self.entity_type} missing required fields", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "entity": entity | |
| } | |
| ) | |
| return False | |
| # Upsert to PostgreSQL | |
| success = await self.upsert_to_postgres(entity, pg_conn) | |
| if success: | |
| logger.debug( | |
| f"{self.entity_type} synced successfully", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| return success | |
| except Exception as e: | |
| logger.error( | |
| f"Error syncing {self.entity_type}", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |
| def map_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Map and transform fields from MongoDB to PostgreSQL format. | |
| Args: | |
| entity: Entity document from MongoDB | |
| Returns: | |
| Dictionary with PostgreSQL column names and transformed values | |
| """ | |
| field_mapping = self.get_field_mapping() | |
| mapped_entity = {} | |
| for mongo_field, pg_column in field_mapping.items(): | |
| value = entity.get(mongo_field) | |
| # Transform the value if needed | |
| if value is not None: | |
| value = self.transform_field_value(mongo_field, value) | |
| mapped_entity[pg_column] = value | |
| return mapped_entity | |
| async def check_timestamp_conflict( | |
| self, | |
| entity_id: str, | |
| mongo_updated_at: datetime, | |
| pg_conn: AsyncConnection, | |
| table_name: str, | |
| id_column: str | |
| ) -> bool: | |
| """ | |
| Check if MongoDB timestamp is newer than PostgreSQL timestamp. | |
| Args: | |
| entity_id: ID of entity | |
| mongo_updated_at: Updated timestamp from MongoDB (can be string or datetime) | |
| pg_conn: PostgreSQL connection | |
| table_name: PostgreSQL table name | |
| id_column: Primary key column name | |
| Returns: | |
| True if should update (MongoDB is newer or record doesn't exist), False otherwise | |
| """ | |
| try: | |
| # Convert mongo_updated_at to datetime if it's a string | |
| if isinstance(mongo_updated_at, str): | |
| try: | |
| mongo_updated_at = date_parser.isoparse(mongo_updated_at) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to parse MongoDB timestamp: {mongo_updated_at}", | |
| exc_info=e | |
| ) | |
| return True # Proceed with update on parse error | |
| # If mongo_updated_at is None, proceed with update | |
| if mongo_updated_at is None: | |
| return True | |
| # Make sure mongo_updated_at is timezone-naive (remove tzinfo if present) | |
| if hasattr(mongo_updated_at, 'tzinfo') and mongo_updated_at.tzinfo is not None: | |
| mongo_updated_at = mongo_updated_at.replace(tzinfo=None) | |
| # Replaced $1 with :id and fetchval with execute().scalar() | |
| query = text(f"SELECT updated_at FROM {table_name} WHERE {id_column} = :id") | |
| result = await pg_conn.execute(query, {"id": entity_id}) | |
| pg_updated_at = result.scalar() | |
| if pg_updated_at is None: | |
| # Record doesn't exist, should insert | |
| return True | |
| # Make sure pg_updated_at is timezone-naive (remove tzinfo if present) | |
| if hasattr(pg_updated_at, 'tzinfo') and pg_updated_at.tzinfo is not None: | |
| pg_updated_at = pg_updated_at.replace(tzinfo=None) | |
| # Compare timestamps | |
| return mongo_updated_at >= pg_updated_at | |
| except Exception as e: | |
| logger.warning( | |
| "Error checking timestamp conflict, proceeding with update", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| # On error, proceed with update | |
| return True | |