MukeshKapoor25's picture
feat: Implement UOM sync module for MongoDB to PostgreSQL synchronization
d5aa7b8
"""
UOM sync handler for MongoDB to PostgreSQL synchronization.
"""
import json
from typing import Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncConnection
from sqlalchemy import text
from motor.motor_asyncio import AsyncIOMotorDatabase
from datetime import datetime
from app.core.logging import get_logger
from app.sync.common.handler import SyncHandler
from app.sync.uom.models import UOM_FIELD_MAPPING, UOM_REQUIRED_FIELDS, SCM_UOM_COLLECTION
logger = get_logger(__name__)
class UOMSyncHandler(SyncHandler):
"""
Handler for syncing UOM (Unit of Measure) data from MongoDB to PostgreSQL.
Implements entity-specific logic for UOM synchronization including
field mapping, validation, and upsert operations.
"""
def __init__(self):
super().__init__(entity_type="uom")
async def fetch_from_mongodb(
self,
entity_id: str,
mongo_db: AsyncIOMotorDatabase
) -> Optional[Dict[str, Any]]:
"""
Fetch UOM group from MongoDB by uom_group_id.
Args:
entity_id: uom_group_id to fetch
mongo_db: MongoDB database instance
Returns:
UOM group document or None if not found
"""
try:
collection = mongo_db[SCM_UOM_COLLECTION]
uom_group = await collection.find_one({"uom_group_id": entity_id})
return uom_group
except Exception as e:
logger.error(
"Error fetching UOM group from MongoDB",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
raise
def get_field_mapping(self) -> Dict[str, str]:
"""
Get field mapping from MongoDB to PostgreSQL.
Returns:
Dictionary mapping MongoDB field names to PostgreSQL column names
"""
return UOM_FIELD_MAPPING
def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
"""
Validate that all required fields are present in UOM document.
Required fields: uom_group_id, name, base_unit
Args:
entity: UOM group document from MongoDB
Returns:
True if all required fields present, False otherwise
"""
missing_fields = []
for field in UOM_REQUIRED_FIELDS:
if field not in entity or entity[field] is None:
missing_fields.append(field)
if missing_fields:
logger.error(
"UOM group missing required fields",
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("uom_group_id"),
"missing_fields": missing_fields
}
)
return False
return True
def transform_field_value(self, field_name: str, value: Any) -> Any:
"""
Transform field value for PostgreSQL.
Handles type conversions including:
- Units array as JSON
- Datetime conversion
- Status enum to string conversion
Args:
field_name: Name of the field
value: Value from MongoDB
Returns:
Transformed value for PostgreSQL
"""
# Handle None values
if value is None:
return None
# Handle units as JSONB array
if field_name == "units":
if value is None:
return json.dumps([])
if isinstance(value, list):
return json.dumps(value)
if isinstance(value, str):
try:
return json.dumps(json.loads(value))
except (json.JSONDecodeError, ValueError):
logger.warning(
f"Invalid units JSON, setting to empty array",
extra={
"entity_type": self.entity_type,
"field_name": field_name,
"value": value
}
)
return json.dumps([])
return json.dumps([])
# Convert status enum to string if needed
if field_name == "status" and hasattr(value, 'value'):
return value.value
# Handle datetime objects
if isinstance(value, datetime):
return value
return value
def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract nested fields from UOM document.
For UOM groups, most fields are at root level, so minimal extraction needed.
Ensures timestamp defaults if missing.
Args:
entity: UOM group document from MongoDB
Returns:
Dictionary with any extracted/normalized fields
"""
flattened = {}
# Ensure timestamps have default values if missing
now = datetime.utcnow()
flattened["created_at"] = entity.get("created_at") or now
flattened["updated_at"] = entity.get("updated_at") or now
# Set default status if missing
if "status" not in entity or entity["status"] is None:
flattened["status"] = "active"
return flattened
async def upsert_to_postgres(
self,
entity: Dict[str, Any],
pg_conn: AsyncConnection
) -> bool:
"""
Upsert UOM group to PostgreSQL trans.scm_uom_group_ref table.
Uses simple UPDATE or INSERT pattern (rather than ON CONFLICT)
to avoid primary key constraint issues.
Args:
entity: UOM group document from MongoDB
pg_conn: PostgreSQL connection
Returns:
True if upsert successful, False otherwise
"""
try:
uom_group_id = entity["uom_group_id"]
# Get updated_at or created_at
updated_at = entity.get("updated_at") or entity.get("created_at") or datetime.utcnow()
# Check timestamp conflict
should_update = await self.check_timestamp_conflict(
entity_id=uom_group_id,
mongo_updated_at=updated_at,
pg_conn=pg_conn,
table_name="trans.scm_uom_group_ref",
id_column="uom_group_id"
)
if not should_update:
logger.debug(
"Skipping UOM group sync due to timestamp conflict",
extra={
"entity_type": self.entity_type,
"entity_id": uom_group_id,
"mongo_updated_at": updated_at
}
)
return True # Not an error, just skipped
# Extract nested fields
nested_fields = self.extract_nested_fields(entity)
# Merge nested fields into entity for mapping
entity_with_nested = {**entity, **nested_fields}
# Map fields
mapped_entity = self.map_fields(entity_with_nested)
# First try to check if record exists
check_query = text("""
SELECT uom_group_id FROM trans.scm_uom_group_ref
WHERE uom_group_id = :uom_group_id
""")
result = await pg_conn.execute(check_query, {"uom_group_id": uom_group_id})
exists = result.scalar_one_or_none()
if exists:
# Update existing record
update_columns = {col: mapped_entity[col] for col in mapped_entity if col != "uom_group_id"}
update_clause = ", ".join([f"{col} = :{col}" for col in update_columns])
update_query = text(f"""
UPDATE trans.scm_uom_group_ref
SET {update_clause}
WHERE uom_group_id = :uom_group_id
""")
update_params = {**update_columns, "uom_group_id": uom_group_id}
await pg_conn.execute(update_query, update_params)
else:
# Insert new record
columns = list(mapped_entity.keys())
placeholders = [f":{col}" for col in columns]
insert_query = text(f"""
INSERT INTO trans.scm_uom_group_ref ({', '.join(columns)})
VALUES ({', '.join(placeholders)})
""")
await pg_conn.execute(insert_query, mapped_entity)
logger.debug(
"UOM group upserted to PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": uom_group_id,
"action": "update" if exists else "insert"
}
)
return True
except Exception as e:
logger.error(
"Error upserting UOM group to PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("uom_group_id"),
"error": str(e)
}
)
raise