Spaces:
Running
Running
File size: 9,741 Bytes
d5aa7b8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | """
UOM sync handler for MongoDB to PostgreSQL synchronization.
"""
import json
from typing import Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncConnection
from sqlalchemy import text
from motor.motor_asyncio import AsyncIOMotorDatabase
from datetime import datetime
from app.core.logging import get_logger
from app.sync.common.handler import SyncHandler
from app.sync.uom.models import UOM_FIELD_MAPPING, UOM_REQUIRED_FIELDS, SCM_UOM_COLLECTION
logger = get_logger(__name__)
class UOMSyncHandler(SyncHandler):
"""
Handler for syncing UOM (Unit of Measure) data from MongoDB to PostgreSQL.
Implements entity-specific logic for UOM synchronization including
field mapping, validation, and upsert operations.
"""
def __init__(self):
super().__init__(entity_type="uom")
async def fetch_from_mongodb(
self,
entity_id: str,
mongo_db: AsyncIOMotorDatabase
) -> Optional[Dict[str, Any]]:
"""
Fetch UOM group from MongoDB by uom_group_id.
Args:
entity_id: uom_group_id to fetch
mongo_db: MongoDB database instance
Returns:
UOM group document or None if not found
"""
try:
collection = mongo_db[SCM_UOM_COLLECTION]
uom_group = await collection.find_one({"uom_group_id": entity_id})
return uom_group
except Exception as e:
logger.error(
"Error fetching UOM group from MongoDB",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id
}
)
raise
def get_field_mapping(self) -> Dict[str, str]:
"""
Get field mapping from MongoDB to PostgreSQL.
Returns:
Dictionary mapping MongoDB field names to PostgreSQL column names
"""
return UOM_FIELD_MAPPING
def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
"""
Validate that all required fields are present in UOM document.
Required fields: uom_group_id, name, base_unit
Args:
entity: UOM group document from MongoDB
Returns:
True if all required fields present, False otherwise
"""
missing_fields = []
for field in UOM_REQUIRED_FIELDS:
if field not in entity or entity[field] is None:
missing_fields.append(field)
if missing_fields:
logger.error(
"UOM group missing required fields",
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("uom_group_id"),
"missing_fields": missing_fields
}
)
return False
return True
def transform_field_value(self, field_name: str, value: Any) -> Any:
"""
Transform field value for PostgreSQL.
Handles type conversions including:
- Units array as JSON
- Datetime conversion
- Status enum to string conversion
Args:
field_name: Name of the field
value: Value from MongoDB
Returns:
Transformed value for PostgreSQL
"""
# Handle None values
if value is None:
return None
# Handle units as JSONB array
if field_name == "units":
if value is None:
return json.dumps([])
if isinstance(value, list):
return json.dumps(value)
if isinstance(value, str):
try:
return json.dumps(json.loads(value))
except (json.JSONDecodeError, ValueError):
logger.warning(
f"Invalid units JSON, setting to empty array",
extra={
"entity_type": self.entity_type,
"field_name": field_name,
"value": value
}
)
return json.dumps([])
return json.dumps([])
# Convert status enum to string if needed
if field_name == "status" and hasattr(value, 'value'):
return value.value
# Handle datetime objects
if isinstance(value, datetime):
return value
return value
def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract nested fields from UOM document.
For UOM groups, most fields are at root level, so minimal extraction needed.
Ensures timestamp defaults if missing.
Args:
entity: UOM group document from MongoDB
Returns:
Dictionary with any extracted/normalized fields
"""
flattened = {}
# Ensure timestamps have default values if missing
now = datetime.utcnow()
flattened["created_at"] = entity.get("created_at") or now
flattened["updated_at"] = entity.get("updated_at") or now
# Set default status if missing
if "status" not in entity or entity["status"] is None:
flattened["status"] = "active"
return flattened
async def upsert_to_postgres(
self,
entity: Dict[str, Any],
pg_conn: AsyncConnection
) -> bool:
"""
Upsert UOM group to PostgreSQL trans.scm_uom_group_ref table.
Uses simple UPDATE or INSERT pattern (rather than ON CONFLICT)
to avoid primary key constraint issues.
Args:
entity: UOM group document from MongoDB
pg_conn: PostgreSQL connection
Returns:
True if upsert successful, False otherwise
"""
try:
uom_group_id = entity["uom_group_id"]
# Get updated_at or created_at
updated_at = entity.get("updated_at") or entity.get("created_at") or datetime.utcnow()
# Check timestamp conflict
should_update = await self.check_timestamp_conflict(
entity_id=uom_group_id,
mongo_updated_at=updated_at,
pg_conn=pg_conn,
table_name="trans.scm_uom_group_ref",
id_column="uom_group_id"
)
if not should_update:
logger.debug(
"Skipping UOM group sync due to timestamp conflict",
extra={
"entity_type": self.entity_type,
"entity_id": uom_group_id,
"mongo_updated_at": updated_at
}
)
return True # Not an error, just skipped
# Extract nested fields
nested_fields = self.extract_nested_fields(entity)
# Merge nested fields into entity for mapping
entity_with_nested = {**entity, **nested_fields}
# Map fields
mapped_entity = self.map_fields(entity_with_nested)
# First try to check if record exists
check_query = text("""
SELECT uom_group_id FROM trans.scm_uom_group_ref
WHERE uom_group_id = :uom_group_id
""")
result = await pg_conn.execute(check_query, {"uom_group_id": uom_group_id})
exists = result.scalar_one_or_none()
if exists:
# Update existing record
update_columns = {col: mapped_entity[col] for col in mapped_entity if col != "uom_group_id"}
update_clause = ", ".join([f"{col} = :{col}" for col in update_columns])
update_query = text(f"""
UPDATE trans.scm_uom_group_ref
SET {update_clause}
WHERE uom_group_id = :uom_group_id
""")
update_params = {**update_columns, "uom_group_id": uom_group_id}
await pg_conn.execute(update_query, update_params)
else:
# Insert new record
columns = list(mapped_entity.keys())
placeholders = [f":{col}" for col in columns]
insert_query = text(f"""
INSERT INTO trans.scm_uom_group_ref ({', '.join(columns)})
VALUES ({', '.join(placeholders)})
""")
await pg_conn.execute(insert_query, mapped_entity)
logger.debug(
"UOM group upserted to PostgreSQL",
extra={
"entity_type": self.entity_type,
"entity_id": uom_group_id,
"action": "update" if exists else "insert"
}
)
return True
except Exception as e:
logger.error(
"Error upserting UOM group to PostgreSQL",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity.get("uom_group_id"),
"error": str(e)
}
)
raise
|