MukeshKapoor25's picture
feat: add delete_from_postgres methods to employee, warehouse, and uom handlers
0d1ff6a
"""
Employee sync handler for MongoDB to PostgreSQL synchronization.
"""
from typing import Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncConnection
from sqlalchemy import text
from motor.motor_asyncio import AsyncIOMotorDatabase
from datetime import datetime
from dateutil import parser
from app.core.logging import get_logger
from app.sync.common.handler import SyncHandler
from app.sync.employees.models import EMPLOYEE_FIELD_MAPPING, EMPLOYEE_REQUIRED_FIELDS
from app.constants import SCM_EMPLOYEES_COLLECTION
logger = get_logger(__name__)
class EmployeeSyncHandler(SyncHandler):
"""
Handler for syncing employee data from MongoDB to PostgreSQL.
Implements entity-specific logic for employee synchronization including
field mapping, validation, boolean type conversion, and upsert operations.
"""
def __init__(self):
super().__init__(entity_type="employee")
async def fetch_from_mongodb(
self,
entity_id: str,
mongo_db: AsyncIOMotorDatabase
) -> Optional[Dict[str, Any]]:
"""
Fetch employee from MongoDB by user_id.
Args:
entity_id: user_id to fetch (employee_id in PostgreSQL)
mongo_db: MongoDB database instance
Returns:
Employee document or None if not found
"""
try:
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
employee = await collection.find_one({"user_id": entity_id})
return employee
except Exception as e:
logger.error(
"Error fetching employee from MongoDB",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
raise
def get_field_mapping(self) -> Dict[str, str]:
"""
Get field mapping from MongoDB to PostgreSQL.
Returns:
Dictionary mapping MongoDB field names to PostgreSQL column names
"""
return EMPLOYEE_FIELD_MAPPING
def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
"""
Validate that all required fields are present in employee document.
Required fields: user_id, employee_code, first_name, designation, status
Args:
entity: Employee document from MongoDB
Returns:
True if all required fields present, False otherwise
"""
missing_fields = []
for field in EMPLOYEE_REQUIRED_FIELDS:
if field not in entity or entity[field] is None:
missing_fields.append(field)
if missing_fields:
logger.error(
"Employee missing required fields",
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("user_id"),
"missing_fields": missing_fields
}
)
return False
return True
def transform_field_value(self, field_name: str, value: Any) -> Any:
"""
Transform field value for PostgreSQL.
Handles type conversions including:
- Boolean conversion for system_login_enabled
- Enum to string conversion for designation and status
- Datetime handling (converts ISO strings to datetime objects)
Args:
field_name: Name of the field
value: Value from MongoDB
Returns:
Transformed value for PostgreSQL
"""
# Convert designation enum to string if needed
if field_name == "designation" and hasattr(value, 'value'):
return value.value
# Convert status enum to string if needed
if field_name == "status" and hasattr(value, 'value'):
return value.value
# Handle datetime fields - convert ISO strings to datetime objects
if field_name in ("created_at", "updated_at"):
if isinstance(value, str):
try:
return parser.isoparse(value)
except (ValueError, TypeError):
logger.warning(f"Could not parse datetime string: {value}")
return datetime.utcnow()
elif isinstance(value, datetime):
return value
else:
return datetime.utcnow()
# Handle boolean conversion for system_login_enabled
if field_name == "system_login_enabled":
if isinstance(value, bool):
return value
# Convert string/int to boolean
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes')
if isinstance(value, int):
return bool(value)
return value
def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract and compute derived fields from employee document.
Computes:
- full_name from first_name and last_name
- system_login_enabled from app_access.has_mobile_app
- department (if available in future)
- Ensures created_at/updated_at have default values
Args:
entity: Employee document from MongoDB
Returns:
Dictionary with computed fields
"""
computed = {}
# Compute full_name from first_name and last_name
first_name = entity.get("first_name", "")
last_name = entity.get("last_name", "")
if last_name:
computed["full_name"] = f"{first_name} {last_name}".strip()
else:
computed["full_name"] = first_name.strip()
# Extract system_login_enabled from app_access
app_access = entity.get("app_access", {})
if isinstance(app_access, dict):
computed["system_login_enabled"] = app_access.get("has_mobile_app", False)
else:
computed["system_login_enabled"] = False
# Department field (optional, may not exist in current model)
computed["department"] = entity.get("department")
# Ensure timestamps have default values if missing
now = datetime.utcnow()
if "created_at" not in entity or entity["created_at"] is None:
computed["created_at"] = now
if "updated_at" not in entity or entity["updated_at"] is None:
computed["updated_at"] = now
return computed
async def upsert_to_postgres(
self,
entity: Dict[str, Any],
pg_conn: AsyncConnection
) -> bool:
"""
Upsert employee to PostgreSQL trans.employees_ref table.
Note: employees_ref table does not have updated_at column,
so we skip timestamp conflict resolution and always upsert.
Args:
entity: Employee document from MongoDB
pg_conn: PostgreSQL connection
Returns:
True if upsert successful, False otherwise
"""
try:
employee_id = entity["user_id"] # MongoDB uses user_id
# Note: Skip timestamp conflict check since employees_ref table
# doesn't have updated_at column
# Extract computed fields
computed_fields = self.extract_nested_fields(entity)
# Merge computed fields into entity for mapping
entity_with_computed = {**entity, **computed_fields}
# Map fields
mapped_entity = self.map_fields(entity_with_computed)
# Add full_name and system_login_enabled to mapped entity
mapped_entity["full_name"] = computed_fields["full_name"]
mapped_entity["system_login_enabled"] = computed_fields["system_login_enabled"]
# Build UPSERT query
columns = list(mapped_entity.keys())
placeholders = [f":{col}" for col in columns]
# Build UPDATE clause (exclude primary key)
update_columns = [col for col in columns if col != "employee_id"]
update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
query = text(f"""
INSERT INTO trans.employees_ref ({', '.join(columns)})
VALUES ({', '.join(placeholders)})
ON CONFLICT (employee_id)
DO UPDATE SET {update_clause}
""")
await pg_conn.execute(query, mapped_entity)
logger.debug(
"Employee upserted to PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": employee_id
}
)
return True
except Exception as e:
logger.error(
"Error upserting employee to PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("user_id"),
"error": str(e)
}
)
raise
async def delete_from_postgres(
self,
entity_id: str,
pg_conn: AsyncConnection
) -> bool:
"""
Delete employee from PostgreSQL trans.employees_ref table.
Args:
entity_id: user_id to delete (employee_id in PostgreSQL)
pg_conn: PostgreSQL connection
Returns:
True if delete successful, False otherwise
"""
try:
user_id = entity_id
# Delete from PostgreSQL (using employee_id column which maps to user_id)
query = text("DELETE FROM trans.employees_ref WHERE employee_id = :id")
result = await pg_conn.execute(query, {"id": user_id})
rows_deleted = result.rowcount
if rows_deleted > 0:
logger.info(
f"Employee deleted from PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": user_id,
"rows_deleted": rows_deleted
}
)
return True
else:
logger.warning(
f"Employee not found in PostgreSQL for deletion",
extra={
"entity_type": self.entity_type,
"entity_id": user_id
}
)
return True
except Exception as e:
logger.error(
"Error deleting employee from PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"error": str(e)
}
)
raise