import json from openai import AsyncOpenAI import config import utils_geometry as utils from schema_definitions import REAL_ESTATE_SCHEMA_MAP import re # You can use a different client here if you want a different key/model! client = AsyncOpenAI(api_key=config.OPENAI_API_KEY) async def map_fields_to_schema(extracted_fields): print(f"\nPhase 2: Mapping {len(extracted_fields)} fields to Schema...") inputs = [] for f in extracted_fields: inputs.append({ "uuid": f["id"], "label": f["label"], "role": f["role"], "detected_type": f["detected_type"] }) mapped_updates = {} batches = [inputs[i:i + config.MAPPING_BATCH_SIZE] for i in range(0, len(inputs), config.MAPPING_BATCH_SIZE)] schema_str = json.dumps(REAL_ESTATE_SCHEMA_MAP, indent=0) for i, batch in enumerate(batches): print(f" Mapping Batch {i+1}/{len(batches)}...") input_str = json.dumps(batch, indent=0) system_prompt = f""" You are a Data Schema Mapper for Real Estate Contracts. TASK: Map the input fields to the provided "Schema Definition". RULES: 1. Match based on meaning: "Sale Price" -> "purchasePrice". 2. STRICTLY CHECK TYPES: - Input "detected_type" MUST be compatible with Schema "type". 3. Use "role" to disambiguate: "Name" + "Buyer" -> "buyerName". 4. If no good match exists, return null for "schema_key". SCHEMA DEFINITION: {schema_str} Return JSON: {{ "mappings": [ {{ "uuid": "...", "schema_key": "..." }} ] }} """ try: response = await client.chat.completions.create( model="gpt-4o", response_format={"type": "json_object"}, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"INPUTS:\n{input_str}"} ], temperature=0.0 ) data = json.loads(response.choices[0].message.content) utils.save_debug_json(data, f"mapping_batch_{i}_llm") for m in data.get("mappings", []): mapped_updates[m["uuid"]] = m.get("schema_key") except Exception as e: print(f"Mapping Error Batch {i}: {e}") # Apply Updates # for f in extracted_fields: # matched_key = mapped_updates.get(f["id"]) # f["aliasId"] = matched_key # if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP: # f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"] # else: # f["semanticType"] = f["detected_type"] # return extracted_fields # Apply Updates for f in extracted_fields: matched_key = mapped_updates.get(f["id"]) # 1. Canonical Match (It exists in your strict schema) if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP: f["aliasId"] = matched_key f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"] f["isDynamic"] = False # 2. Dynamic Match (It's valid data, but not in your schema yet) else: # Generate a safe slug: "Loan Amount" -> "custom_loan_amount" # We sanitize the label to make it a valid JSON key # clean_slug = "".join(c if c.isalnum() else "_" for c in f["label"].lower()) # clean_slug = clean_slug.strip("_")[:50] # Limit length parts = re.split(r'[^a-zA-Z0-9]+', f["label"].lower()) result = parts[0] current_len = len(result) max_length = 20 for word in parts[1:]: add = word.capitalize() if current_len + len(add) > max_length: break result += add f["aliasId"] = f"{result}" f["semanticType"] = f.get("detected_type", "shortText") f["isDynamic"] = True return extracted_fields