Spaces:
Runtime error
Runtime error
| """ | |
| Employee sync handler for MongoDB to PostgreSQL synchronization. | |
| """ | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy.ext.asyncio import AsyncConnection | |
| from sqlalchemy import text | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from datetime import datetime | |
| from dateutil import parser | |
| from app.core.logging import get_logger | |
| from app.sync.common.handler import SyncHandler | |
| from app.sync.employees.models import EMPLOYEE_FIELD_MAPPING, EMPLOYEE_REQUIRED_FIELDS | |
| from app.constants import SCM_EMPLOYEES_COLLECTION | |
| logger = get_logger(__name__) | |
| class EmployeeSyncHandler(SyncHandler): | |
| """ | |
| Handler for syncing employee data from MongoDB to PostgreSQL. | |
| Implements entity-specific logic for employee synchronization including | |
| field mapping, validation, boolean type conversion, and upsert operations. | |
| """ | |
| def __init__(self): | |
| super().__init__(entity_type="employee") | |
| async def fetch_from_mongodb( | |
| self, | |
| entity_id: str, | |
| mongo_db: AsyncIOMotorDatabase | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch employee from MongoDB by user_id. | |
| Args: | |
| entity_id: user_id to fetch (employee_id in PostgreSQL) | |
| mongo_db: MongoDB database instance | |
| Returns: | |
| Employee document or None if not found | |
| """ | |
| try: | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| employee = await collection.find_one({"user_id": entity_id}) | |
| return employee | |
| except Exception as e: | |
| logger.error( | |
| "Error fetching employee from MongoDB", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| raise | |
| def get_field_mapping(self) -> Dict[str, str]: | |
| """ | |
| Get field mapping from MongoDB to PostgreSQL. | |
| Returns: | |
| Dictionary mapping MongoDB field names to PostgreSQL column names | |
| """ | |
| return EMPLOYEE_FIELD_MAPPING | |
| def validate_required_fields(self, entity: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that all required fields are present in employee document. | |
| Required fields: user_id, employee_code, first_name, designation, status | |
| Args: | |
| entity: Employee document from MongoDB | |
| Returns: | |
| True if all required fields present, False otherwise | |
| """ | |
| missing_fields = [] | |
| for field in EMPLOYEE_REQUIRED_FIELDS: | |
| if field not in entity or entity[field] is None: | |
| missing_fields.append(field) | |
| if missing_fields: | |
| logger.error( | |
| "Employee missing required fields", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("user_id"), | |
| "missing_fields": missing_fields | |
| } | |
| ) | |
| return False | |
| return True | |
| def transform_field_value(self, field_name: str, value: Any) -> Any: | |
| """ | |
| Transform field value for PostgreSQL. | |
| Handles type conversions including: | |
| - Boolean conversion for system_login_enabled | |
| - Enum to string conversion for designation and status | |
| - Datetime handling (converts ISO strings to datetime objects) | |
| Args: | |
| field_name: Name of the field | |
| value: Value from MongoDB | |
| Returns: | |
| Transformed value for PostgreSQL | |
| """ | |
| # Convert designation enum to string if needed | |
| if field_name == "designation" and hasattr(value, 'value'): | |
| return value.value | |
| # Convert status enum to string if needed | |
| if field_name == "status" and hasattr(value, 'value'): | |
| return value.value | |
| # Handle datetime fields - convert ISO strings to datetime objects | |
| if field_name in ("created_at", "updated_at"): | |
| if isinstance(value, str): | |
| try: | |
| return parser.isoparse(value) | |
| except (ValueError, TypeError): | |
| logger.warning(f"Could not parse datetime string: {value}") | |
| return datetime.utcnow() | |
| elif isinstance(value, datetime): | |
| return value | |
| else: | |
| return datetime.utcnow() | |
| # Handle boolean conversion for system_login_enabled | |
| if field_name == "system_login_enabled": | |
| if isinstance(value, bool): | |
| return value | |
| # Convert string/int to boolean | |
| if isinstance(value, str): | |
| return value.lower() in ('true', '1', 'yes') | |
| if isinstance(value, int): | |
| return bool(value) | |
| return value | |
| def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Extract and compute derived fields from employee document. | |
| Computes: | |
| - full_name from first_name and last_name | |
| - system_login_enabled from app_access.has_mobile_app | |
| - department (if available in future) | |
| - Ensures created_at/updated_at have default values | |
| Args: | |
| entity: Employee document from MongoDB | |
| Returns: | |
| Dictionary with computed fields | |
| """ | |
| computed = {} | |
| # Compute full_name from first_name and last_name | |
| first_name = entity.get("first_name", "") | |
| last_name = entity.get("last_name", "") | |
| if last_name: | |
| computed["full_name"] = f"{first_name} {last_name}".strip() | |
| else: | |
| computed["full_name"] = first_name.strip() | |
| # Extract system_login_enabled from app_access | |
| app_access = entity.get("app_access", {}) | |
| if isinstance(app_access, dict): | |
| computed["system_login_enabled"] = app_access.get("has_mobile_app", False) | |
| else: | |
| computed["system_login_enabled"] = False | |
| # Department field (optional, may not exist in current model) | |
| computed["department"] = entity.get("department") | |
| # Ensure timestamps have default values if missing | |
| now = datetime.utcnow() | |
| if "created_at" not in entity or entity["created_at"] is None: | |
| computed["created_at"] = now | |
| if "updated_at" not in entity or entity["updated_at"] is None: | |
| computed["updated_at"] = now | |
| return computed | |
| async def upsert_to_postgres( | |
| self, | |
| entity: Dict[str, Any], | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Upsert employee to PostgreSQL trans.employees_ref table. | |
| Note: employees_ref table does not have updated_at column, | |
| so we skip timestamp conflict resolution and always upsert. | |
| Args: | |
| entity: Employee document from MongoDB | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if upsert successful, False otherwise | |
| """ | |
| try: | |
| employee_id = entity["user_id"] # MongoDB uses user_id | |
| # Note: Skip timestamp conflict check since employees_ref table | |
| # doesn't have updated_at column | |
| # Extract computed fields | |
| computed_fields = self.extract_nested_fields(entity) | |
| # Merge computed fields into entity for mapping | |
| entity_with_computed = {**entity, **computed_fields} | |
| # Map fields | |
| mapped_entity = self.map_fields(entity_with_computed) | |
| # Add full_name and system_login_enabled to mapped entity | |
| mapped_entity["full_name"] = computed_fields["full_name"] | |
| mapped_entity["system_login_enabled"] = computed_fields["system_login_enabled"] | |
| # Build UPSERT query | |
| columns = list(mapped_entity.keys()) | |
| placeholders = [f":{col}" for col in columns] | |
| # Build UPDATE clause (exclude primary key) | |
| update_columns = [col for col in columns if col != "employee_id"] | |
| update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns]) | |
| query = text(f""" | |
| INSERT INTO trans.employees_ref ({', '.join(columns)}) | |
| VALUES ({', '.join(placeholders)}) | |
| ON CONFLICT (employee_id) | |
| DO UPDATE SET {update_clause} | |
| """) | |
| await pg_conn.execute(query, mapped_entity) | |
| logger.debug( | |
| "Employee upserted to PostgreSQL", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": employee_id | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error upserting employee to PostgreSQL", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("user_id"), | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |
| async def delete_from_postgres( | |
| self, | |
| entity_id: str, | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Delete employee from PostgreSQL trans.employees_ref table. | |
| Args: | |
| entity_id: user_id to delete (employee_id in PostgreSQL) | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if delete successful, False otherwise | |
| """ | |
| try: | |
| user_id = entity_id | |
| # Delete from PostgreSQL (using employee_id column which maps to user_id) | |
| query = text("DELETE FROM trans.employees_ref WHERE employee_id = :id") | |
| result = await pg_conn.execute(query, {"id": user_id}) | |
| rows_deleted = result.rowcount | |
| if rows_deleted > 0: | |
| logger.info( | |
| f"Employee deleted from PostgreSQL", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": user_id, | |
| "rows_deleted": rows_deleted | |
| } | |
| ) | |
| return True | |
| else: | |
| logger.warning( | |
| f"Employee not found in PostgreSQL for deletion", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": user_id | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error deleting employee from PostgreSQL", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |