""" Employee sync handler for MongoDB to PostgreSQL synchronization. """ from typing import Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy import text from motor.motor_asyncio import AsyncIOMotorDatabase from datetime import datetime from dateutil import parser from app.core.logging import get_logger from app.sync.common.handler import SyncHandler from app.sync.employees.models import EMPLOYEE_FIELD_MAPPING, EMPLOYEE_REQUIRED_FIELDS from app.constants import SCM_EMPLOYEES_COLLECTION logger = get_logger(__name__) class EmployeeSyncHandler(SyncHandler): """ Handler for syncing employee data from MongoDB to PostgreSQL. Implements entity-specific logic for employee synchronization including field mapping, validation, boolean type conversion, and upsert operations. """ def __init__(self): super().__init__(entity_type="employee") async def fetch_from_mongodb( self, entity_id: str, mongo_db: AsyncIOMotorDatabase ) -> Optional[Dict[str, Any]]: """ Fetch employee from MongoDB by user_id. Args: entity_id: user_id to fetch (employee_id in PostgreSQL) mongo_db: MongoDB database instance Returns: Employee document or None if not found """ try: collection = mongo_db[SCM_EMPLOYEES_COLLECTION] employee = await collection.find_one({"user_id": entity_id}) return employee except Exception as e: logger.error( "Error fetching employee from MongoDB", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) raise def get_field_mapping(self) -> Dict[str, str]: """ Get field mapping from MongoDB to PostgreSQL. Returns: Dictionary mapping MongoDB field names to PostgreSQL column names """ return EMPLOYEE_FIELD_MAPPING def validate_required_fields(self, entity: Dict[str, Any]) -> bool: """ Validate that all required fields are present in employee document. Required fields: user_id, employee_code, first_name, designation, status Args: entity: Employee document from MongoDB Returns: True if all required fields present, False otherwise """ missing_fields = [] for field in EMPLOYEE_REQUIRED_FIELDS: if field not in entity or entity[field] is None: missing_fields.append(field) if missing_fields: logger.error( "Employee missing required fields", extra={ "entity_type": self.entity_type, "entity_id": entity.get("user_id"), "missing_fields": missing_fields } ) return False return True def transform_field_value(self, field_name: str, value: Any) -> Any: """ Transform field value for PostgreSQL. Handles type conversions including: - Boolean conversion for system_login_enabled - Enum to string conversion for designation and status - Datetime handling (converts ISO strings to datetime objects) Args: field_name: Name of the field value: Value from MongoDB Returns: Transformed value for PostgreSQL """ # Convert designation enum to string if needed if field_name == "designation" and hasattr(value, 'value'): return value.value # Convert status enum to string if needed if field_name == "status" and hasattr(value, 'value'): return value.value # Handle datetime fields - convert ISO strings to datetime objects if field_name in ("created_at", "updated_at"): if isinstance(value, str): try: return parser.isoparse(value) except (ValueError, TypeError): logger.warning(f"Could not parse datetime string: {value}") return datetime.utcnow() elif isinstance(value, datetime): return value else: return datetime.utcnow() # Handle boolean conversion for system_login_enabled if field_name == "system_login_enabled": if isinstance(value, bool): return value # Convert string/int to boolean if isinstance(value, str): return value.lower() in ('true', '1', 'yes') if isinstance(value, int): return bool(value) return value def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: """ Extract and compute derived fields from employee document. Computes: - full_name from first_name and last_name - system_login_enabled from app_access.has_mobile_app - department (if available in future) - Ensures created_at/updated_at have default values Args: entity: Employee document from MongoDB Returns: Dictionary with computed fields """ computed = {} # Compute full_name from first_name and last_name first_name = entity.get("first_name", "") last_name = entity.get("last_name", "") if last_name: computed["full_name"] = f"{first_name} {last_name}".strip() else: computed["full_name"] = first_name.strip() # Extract system_login_enabled from app_access app_access = entity.get("app_access", {}) if isinstance(app_access, dict): computed["system_login_enabled"] = app_access.get("has_mobile_app", False) else: computed["system_login_enabled"] = False # Department field (optional, may not exist in current model) computed["department"] = entity.get("department") # Ensure timestamps have default values if missing now = datetime.utcnow() if "created_at" not in entity or entity["created_at"] is None: computed["created_at"] = now if "updated_at" not in entity or entity["updated_at"] is None: computed["updated_at"] = now return computed async def upsert_to_postgres( self, entity: Dict[str, Any], pg_conn: AsyncConnection ) -> bool: """ Upsert employee to PostgreSQL trans.employees_ref table. Note: employees_ref table does not have updated_at column, so we skip timestamp conflict resolution and always upsert. Args: entity: Employee document from MongoDB pg_conn: PostgreSQL connection Returns: True if upsert successful, False otherwise """ try: employee_id = entity["user_id"] # MongoDB uses user_id # Note: Skip timestamp conflict check since employees_ref table # doesn't have updated_at column # Extract computed fields computed_fields = self.extract_nested_fields(entity) # Merge computed fields into entity for mapping entity_with_computed = {**entity, **computed_fields} # Map fields mapped_entity = self.map_fields(entity_with_computed) # Add full_name and system_login_enabled to mapped entity mapped_entity["full_name"] = computed_fields["full_name"] mapped_entity["system_login_enabled"] = computed_fields["system_login_enabled"] # Build UPSERT query columns = list(mapped_entity.keys()) placeholders = [f":{col}" for col in columns] # Build UPDATE clause (exclude primary key) update_columns = [col for col in columns if col != "employee_id"] update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns]) query = text(f""" INSERT INTO trans.employees_ref ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) ON CONFLICT (employee_id) DO UPDATE SET {update_clause} """) await pg_conn.execute(query, mapped_entity) logger.debug( "Employee upserted to PostgreSQL", extra={ "entity_type": self.entity_type, "entity_id": employee_id } ) return True except Exception as e: logger.error( "Error upserting employee to PostgreSQL", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity.get("user_id"), "error": str(e) } ) raise async def delete_from_postgres( self, entity_id: str, pg_conn: AsyncConnection ) -> bool: """ Delete employee from PostgreSQL trans.employees_ref table. Args: entity_id: user_id to delete (employee_id in PostgreSQL) pg_conn: PostgreSQL connection Returns: True if delete successful, False otherwise """ try: user_id = entity_id # Delete from PostgreSQL (using employee_id column which maps to user_id) query = text("DELETE FROM trans.employees_ref WHERE employee_id = :id") result = await pg_conn.execute(query, {"id": user_id}) rows_deleted = result.rowcount if rows_deleted > 0: logger.info( f"Employee deleted from PostgreSQL", extra={ "entity_type": self.entity_type, "entity_id": user_id, "rows_deleted": rows_deleted } ) return True else: logger.warning( f"Employee not found in PostgreSQL for deletion", extra={ "entity_type": self.entity_type, "entity_id": user_id } ) return True except Exception as e: logger.error( "Error deleting employee from PostgreSQL", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id, "error": str(e) } ) raise