""" UOM sync handler for MongoDB to PostgreSQL synchronization. """ import json from typing import Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy import text from motor.motor_asyncio import AsyncIOMotorDatabase from datetime import datetime from app.core.logging import get_logger from app.sync.common.handler import SyncHandler from app.sync.uom.models import UOM_FIELD_MAPPING, UOM_REQUIRED_FIELDS, SCM_UOM_COLLECTION logger = get_logger(__name__) class UOMSyncHandler(SyncHandler): """ Handler for syncing UOM (Unit of Measure) data from MongoDB to PostgreSQL. Implements entity-specific logic for UOM synchronization including field mapping, validation, and upsert operations. """ def __init__(self): super().__init__(entity_type="uom") async def fetch_from_mongodb( self, entity_id: str, mongo_db: AsyncIOMotorDatabase ) -> Optional[Dict[str, Any]]: """ Fetch UOM group from MongoDB by uom_group_id. Args: entity_id: uom_group_id to fetch mongo_db: MongoDB database instance Returns: UOM group document or None if not found """ try: collection = mongo_db[SCM_UOM_COLLECTION] uom_group = await collection.find_one({"uom_group_id": entity_id}) return uom_group except Exception as e: logger.error( "Error fetching UOM group from MongoDB", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id } ) raise def get_field_mapping(self) -> Dict[str, str]: """ Get field mapping from MongoDB to PostgreSQL. Returns: Dictionary mapping MongoDB field names to PostgreSQL column names """ return UOM_FIELD_MAPPING def validate_required_fields(self, entity: Dict[str, Any]) -> bool: """ Validate that all required fields are present in UOM document. Required fields: uom_group_id, name, base_unit Args: entity: UOM group document from MongoDB Returns: True if all required fields present, False otherwise """ missing_fields = [] for field in UOM_REQUIRED_FIELDS: if field not in entity or entity[field] is None: missing_fields.append(field) if missing_fields: logger.error( "UOM group missing required fields", extra={ "entity_type": self.entity_type, "entity_id": entity.get("uom_group_id"), "missing_fields": missing_fields } ) return False return True def transform_field_value(self, field_name: str, value: Any) -> Any: """ Transform field value for PostgreSQL. Handles type conversions including: - Units array as JSON - Datetime conversion - Status enum to string conversion Args: field_name: Name of the field value: Value from MongoDB Returns: Transformed value for PostgreSQL """ # Handle None values if value is None: return None # Handle units as JSONB array if field_name == "units": if value is None: return json.dumps([]) if isinstance(value, list): return json.dumps(value) if isinstance(value, str): try: return json.dumps(json.loads(value)) except (json.JSONDecodeError, ValueError): logger.warning( f"Invalid units JSON, setting to empty array", extra={ "entity_type": self.entity_type, "field_name": field_name, "value": value } ) return json.dumps([]) return json.dumps([]) # Convert status enum to string if needed if field_name == "status" and hasattr(value, 'value'): return value.value # Handle datetime objects if isinstance(value, datetime): return value return value def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]: """ Extract nested fields from UOM document. For UOM groups, most fields are at root level, so minimal extraction needed. Ensures timestamp defaults if missing. Args: entity: UOM group document from MongoDB Returns: Dictionary with any extracted/normalized fields """ flattened = {} # Ensure timestamps have default values if missing now = datetime.utcnow() flattened["created_at"] = entity.get("created_at") or now flattened["updated_at"] = entity.get("updated_at") or now # Set default status if missing if "status" not in entity or entity["status"] is None: flattened["status"] = "active" return flattened async def upsert_to_postgres( self, entity: Dict[str, Any], pg_conn: AsyncConnection ) -> bool: """ Upsert UOM group to PostgreSQL trans.scm_uom_group_ref table. Uses simple UPDATE or INSERT pattern (rather than ON CONFLICT) to avoid primary key constraint issues. Args: entity: UOM group document from MongoDB pg_conn: PostgreSQL connection Returns: True if upsert successful, False otherwise """ try: uom_group_id = entity["uom_group_id"] # Get updated_at or created_at updated_at = entity.get("updated_at") or entity.get("created_at") or datetime.utcnow() # Check timestamp conflict should_update = await self.check_timestamp_conflict( entity_id=uom_group_id, mongo_updated_at=updated_at, pg_conn=pg_conn, table_name="trans.scm_uom_group_ref", id_column="uom_group_id" ) if not should_update: logger.debug( "Skipping UOM group sync due to timestamp conflict", extra={ "entity_type": self.entity_type, "entity_id": uom_group_id, "mongo_updated_at": updated_at } ) return True # Not an error, just skipped # Extract nested fields nested_fields = self.extract_nested_fields(entity) # Merge nested fields into entity for mapping entity_with_nested = {**entity, **nested_fields} # Map fields mapped_entity = self.map_fields(entity_with_nested) # First try to check if record exists check_query = text(""" SELECT uom_group_id FROM trans.scm_uom_group_ref WHERE uom_group_id = :uom_group_id """) result = await pg_conn.execute(check_query, {"uom_group_id": uom_group_id}) exists = result.scalar_one_or_none() if exists: # Update existing record update_columns = {col: mapped_entity[col] for col in mapped_entity if col != "uom_group_id"} update_clause = ", ".join([f"{col} = :{col}" for col in update_columns]) update_query = text(f""" UPDATE trans.scm_uom_group_ref SET {update_clause} WHERE uom_group_id = :uom_group_id """) update_params = {**update_columns, "uom_group_id": uom_group_id} await pg_conn.execute(update_query, update_params) else: # Insert new record columns = list(mapped_entity.keys()) placeholders = [f":{col}" for col in columns] insert_query = text(f""" INSERT INTO trans.scm_uom_group_ref ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) """) await pg_conn.execute(insert_query, mapped_entity) logger.debug( "UOM group upserted to PostgreSQL", extra={ "entity_type": self.entity_type, "entity_id": uom_group_id, "action": "update" if exists else "insert" } ) return True except Exception as e: logger.error( "Error upserting UOM group to PostgreSQL", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity.get("uom_group_id"), "error": str(e) } ) raise