MukeshKapoor25's picture
feat(superadmin): Add superadmin module with sync API endpoints
d21a3bd
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncConnection
from sqlalchemy import text
from motor.motor_asyncio import AsyncIOMotorDatabase
from datetime import datetime
from dateutil import parser as date_parser
from app.core.logging import get_logger
logger = get_logger(__name__)
class SyncHandler(ABC):
"""
Abstract base class for entity-specific sync handlers.
Provides common sync logic and defines interface for entity-specific implementations.
"""
def __init__(self, entity_type: str):
"""
Initialize sync handler.
Args:
entity_type: Type of entity this handler syncs
"""
self.entity_type = entity_type
@abstractmethod
async def fetch_from_mongodb(
self,
entity_id: str,
mongo_db: AsyncIOMotorDatabase
) -> Optional[Dict[str, Any]]:
"""
Fetch entity from MongoDB.
Args:
entity_id: ID of entity to fetch
mongo_db: MongoDB database instance
Returns:
Entity document or None if not found
"""
pass
@abstractmethod
def get_field_mapping(self) -> Dict[str, str]:
"""
Get field mapping from MongoDB to PostgreSQL.
Returns:
Dictionary mapping MongoDB field names to PostgreSQL column names
"""
pass
@abstractmethod
def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
"""
Validate that all required fields are present.
Args:
entity: Entity document from MongoDB
Returns:
True if all required fields present, False otherwise
"""
pass
@abstractmethod
def transform_field_value(self, field_name: str, value: Any) -> Any:
"""
Transform field value for PostgreSQL.
Args:
field_name: Name of the field
value: Value from MongoDB
Returns:
Transformed value for PostgreSQL
"""
pass
@abstractmethod
async def upsert_to_postgres(
self,
entity: Dict[str, Any],
pg_conn: AsyncConnection
) -> bool:
"""
Upsert entity to PostgreSQL.
Args:
entity: Entity document from MongoDB
pg_conn: PostgreSQL connection
Returns:
True if upsert successful, False otherwise
"""
pass
async def sync(
self,
entity_id: str,
mongo_db: AsyncIOMotorDatabase,
pg_conn: AsyncConnection
) -> bool:
"""
Sync entity from MongoDB to PostgreSQL.
This is the main sync method that orchestrates the sync process:
1. Fetch entity from MongoDB
2. Validate required fields
3. Transform fields
4. Upsert to PostgreSQL
Args:
entity_id: ID of entity to sync
mongo_db: MongoDB database instance
pg_conn: PostgreSQL connection
Returns:
True if sync successful, False otherwise
"""
try:
# Fetch from MongoDB
entity = await self.fetch_from_mongodb(entity_id, mongo_db)
if entity is None:
logger.warning(
f"{self.entity_type} not found in MongoDB",
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
return False
# Validate required fields
if not self.validate_required_fields(entity):
logger.error(
f"{self.entity_type} missing required fields",
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"entity": entity
}
)
return False
# Upsert to PostgreSQL
success = await self.upsert_to_postgres(entity, pg_conn)
if success:
logger.debug(
f"{self.entity_type} synced successfully",
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
return success
except Exception as e:
logger.error(
f"Error syncing {self.entity_type}",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"error": str(e)
}
)
raise
def map_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
"""
Map and transform fields from MongoDB to PostgreSQL format.
Args:
entity: Entity document from MongoDB
Returns:
Dictionary with PostgreSQL column names and transformed values
"""
field_mapping = self.get_field_mapping()
mapped_entity = {}
for mongo_field, pg_column in field_mapping.items():
value = entity.get(mongo_field)
# Transform the value if needed
if value is not None:
value = self.transform_field_value(mongo_field, value)
mapped_entity[pg_column] = value
return mapped_entity
async def check_timestamp_conflict(
self,
entity_id: str,
mongo_updated_at: datetime,
pg_conn: AsyncConnection,
table_name: str,
id_column: str
) -> bool:
"""
Check if MongoDB timestamp is newer than PostgreSQL timestamp.
Args:
entity_id: ID of entity
mongo_updated_at: Updated timestamp from MongoDB (can be string or datetime)
pg_conn: PostgreSQL connection
table_name: PostgreSQL table name
id_column: Primary key column name
Returns:
True if should update (MongoDB is newer or record doesn't exist), False otherwise
"""
try:
# Convert mongo_updated_at to datetime if it's a string
if isinstance(mongo_updated_at, str):
try:
mongo_updated_at = date_parser.isoparse(mongo_updated_at)
except Exception as e:
logger.warning(
f"Failed to parse MongoDB timestamp: {mongo_updated_at}",
exc_info=e
)
return True # Proceed with update on parse error
# If mongo_updated_at is None, proceed with update
if mongo_updated_at is None:
return True
# Make sure mongo_updated_at is timezone-naive (remove tzinfo if present)
if hasattr(mongo_updated_at, 'tzinfo') and mongo_updated_at.tzinfo is not None:
mongo_updated_at = mongo_updated_at.replace(tzinfo=None)
# Replaced $1 with :id and fetchval with execute().scalar()
query = text(f"SELECT updated_at FROM {table_name} WHERE {id_column} = :id")
result = await pg_conn.execute(query, {"id": entity_id})
pg_updated_at = result.scalar()
if pg_updated_at is None:
# Record doesn't exist, should insert
return True
# Make sure pg_updated_at is timezone-naive (remove tzinfo if present)
if hasattr(pg_updated_at, 'tzinfo') and pg_updated_at.tzinfo is not None:
pg_updated_at = pg_updated_at.replace(tzinfo=None)
# Compare timestamps
return mongo_updated_at >= pg_updated_at
except Exception as e:
logger.warning(
"Error checking timestamp conflict, proceeding with update",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
# On error, proceed with update
return True