Spaces:
Running
Running
| """ | |
| UOM sync handler for MongoDB to PostgreSQL synchronization. | |
| """ | |
| import json | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy.ext.asyncio import AsyncConnection | |
| from sqlalchemy import text | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from datetime import datetime | |
| from app.core.logging import get_logger | |
| from app.sync.common.handler import SyncHandler | |
| from app.sync.uom.models import UOM_FIELD_MAPPING, UOM_REQUIRED_FIELDS, SCM_UOM_COLLECTION | |
| logger = get_logger(__name__) | |
| class UOMSyncHandler(SyncHandler): | |
| """ | |
| Handler for syncing UOM (Unit of Measure) data from MongoDB to PostgreSQL. | |
| Implements entity-specific logic for UOM synchronization including | |
| field mapping, validation, and upsert operations. | |
| """ | |
| def __init__(self): | |
| super().__init__(entity_type="uom") | |
| async def fetch_from_mongodb( | |
| self, | |
| entity_id: str, | |
| mongo_db: AsyncIOMotorDatabase | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch UOM group from MongoDB by uom_group_id. | |
| Args: | |
| entity_id: uom_group_id to fetch | |
| mongo_db: MongoDB database instance | |
| Returns: | |
| UOM group document or None if not found | |
| """ | |
| try: | |
| collection = mongo_db[SCM_UOM_COLLECTION] | |
| uom_group = await collection.find_one({"uom_group_id": entity_id}) | |
| return uom_group | |
| except Exception as e: | |
| logger.error( | |
| "Error fetching UOM group from MongoDB", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id | |
| } | |
| ) | |
| raise | |
| def get_field_mapping(self) -> Dict[str, str]: | |
| """ | |
| Get field mapping from MongoDB to PostgreSQL. | |
| Returns: | |
| Dictionary mapping MongoDB field names to PostgreSQL column names | |
| """ | |
| return UOM_FIELD_MAPPING | |
| def validate_required_fields(self, entity: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that all required fields are present in UOM document. | |
| Required fields: uom_group_id, name, base_unit | |
| Args: | |
| entity: UOM group document from MongoDB | |
| Returns: | |
| True if all required fields present, False otherwise | |
| """ | |
| missing_fields = [] | |
| for field in UOM_REQUIRED_FIELDS: | |
| if field not in entity or entity[field] is None: | |
| missing_fields.append(field) | |
| if missing_fields: | |
| logger.error( | |
| "UOM group missing required fields", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("uom_group_id"), | |
| "missing_fields": missing_fields | |
| } | |
| ) | |
| return False | |
| return True | |
| def transform_field_value(self, field_name: str, value: Any) -> Any: | |
| """ | |
| Transform field value for PostgreSQL. | |
| Handles type conversions including: | |
| - Units array as JSON | |
| - Datetime conversion | |
| - Status enum to string conversion | |
| Args: | |
| field_name: Name of the field | |
| value: Value from MongoDB | |
| Returns: | |
| Transformed value for PostgreSQL | |
| """ | |
| # Handle None values | |
| if value is None: | |
| return None | |
| # Handle units as JSONB array | |
| if field_name == "units": | |
| if value is None: | |
| return json.dumps([]) | |
| if isinstance(value, list): | |
| return json.dumps(value) | |
| if isinstance(value, str): | |
| try: | |
| return json.dumps(json.loads(value)) | |
| except (json.JSONDecodeError, ValueError): | |
| logger.warning( | |
| f"Invalid units JSON, setting to empty array", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "field_name": field_name, | |
| "value": value | |
| } | |
| ) | |
| return json.dumps([]) | |
| return json.dumps([]) | |
| # Convert status enum to string if needed | |
| if field_name == "status" and hasattr(value, 'value'): | |
| return value.value | |
| # Handle datetime objects | |
| if isinstance(value, datetime): | |
| return value | |
| return value | |
| def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Extract nested fields from UOM document. | |
| For UOM groups, most fields are at root level, so minimal extraction needed. | |
| Ensures timestamp defaults if missing. | |
| Args: | |
| entity: UOM group document from MongoDB | |
| Returns: | |
| Dictionary with any extracted/normalized fields | |
| """ | |
| flattened = {} | |
| # Ensure timestamps have default values if missing | |
| now = datetime.utcnow() | |
| flattened["created_at"] = entity.get("created_at") or now | |
| flattened["updated_at"] = entity.get("updated_at") or now | |
| # Set default status if missing | |
| if "status" not in entity or entity["status"] is None: | |
| flattened["status"] = "active" | |
| return flattened | |
| async def upsert_to_postgres( | |
| self, | |
| entity: Dict[str, Any], | |
| pg_conn: AsyncConnection | |
| ) -> bool: | |
| """ | |
| Upsert UOM group to PostgreSQL trans.scm_uom_group_ref table. | |
| Uses simple UPDATE or INSERT pattern (rather than ON CONFLICT) | |
| to avoid primary key constraint issues. | |
| Args: | |
| entity: UOM group document from MongoDB | |
| pg_conn: PostgreSQL connection | |
| Returns: | |
| True if upsert successful, False otherwise | |
| """ | |
| try: | |
| uom_group_id = entity["uom_group_id"] | |
| # Get updated_at or created_at | |
| updated_at = entity.get("updated_at") or entity.get("created_at") or datetime.utcnow() | |
| # Check timestamp conflict | |
| should_update = await self.check_timestamp_conflict( | |
| entity_id=uom_group_id, | |
| mongo_updated_at=updated_at, | |
| pg_conn=pg_conn, | |
| table_name="trans.scm_uom_group_ref", | |
| id_column="uom_group_id" | |
| ) | |
| if not should_update: | |
| logger.debug( | |
| "Skipping UOM group sync due to timestamp conflict", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": uom_group_id, | |
| "mongo_updated_at": updated_at | |
| } | |
| ) | |
| return True # Not an error, just skipped | |
| # Extract nested fields | |
| nested_fields = self.extract_nested_fields(entity) | |
| # Merge nested fields into entity for mapping | |
| entity_with_nested = {**entity, **nested_fields} | |
| # Map fields | |
| mapped_entity = self.map_fields(entity_with_nested) | |
| # First try to check if record exists | |
| check_query = text(""" | |
| SELECT uom_group_id FROM trans.scm_uom_group_ref | |
| WHERE uom_group_id = :uom_group_id | |
| """) | |
| result = await pg_conn.execute(check_query, {"uom_group_id": uom_group_id}) | |
| exists = result.scalar_one_or_none() | |
| if exists: | |
| # Update existing record | |
| update_columns = {col: mapped_entity[col] for col in mapped_entity if col != "uom_group_id"} | |
| update_clause = ", ".join([f"{col} = :{col}" for col in update_columns]) | |
| update_query = text(f""" | |
| UPDATE trans.scm_uom_group_ref | |
| SET {update_clause} | |
| WHERE uom_group_id = :uom_group_id | |
| """) | |
| update_params = {**update_columns, "uom_group_id": uom_group_id} | |
| await pg_conn.execute(update_query, update_params) | |
| else: | |
| # Insert new record | |
| columns = list(mapped_entity.keys()) | |
| placeholders = [f":{col}" for col in columns] | |
| insert_query = text(f""" | |
| INSERT INTO trans.scm_uom_group_ref ({', '.join(columns)}) | |
| VALUES ({', '.join(placeholders)}) | |
| """) | |
| await pg_conn.execute(insert_query, mapped_entity) | |
| logger.debug( | |
| "UOM group upserted to PostgreSQL", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": uom_group_id, | |
| "action": "update" if exists else "insert" | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error upserting UOM group to PostgreSQL", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity.get("uom_group_id"), | |
| "error": str(e) | |
| } | |
| ) | |
| raise | |