Spaces:
Runtime error
Runtime error
| import json | |
| from openai import AsyncOpenAI | |
| import config | |
| import utils_geometry as utils | |
| from schema_definitions import REAL_ESTATE_SCHEMA_MAP | |
| import re | |
| # You can use a different client here if you want a different key/model! | |
| client = AsyncOpenAI(api_key=config.OPENAI_API_KEY) | |
| async def map_fields_to_schema(extracted_fields): | |
| print(f"\nPhase 2: Mapping {len(extracted_fields)} fields to Schema...") | |
| inputs = [] | |
| for f in extracted_fields: | |
| inputs.append({ | |
| "uuid": f["id"], | |
| "label": f["label"], | |
| "role": f["role"], | |
| "detected_type": f["detected_type"] | |
| }) | |
| mapped_updates = {} | |
| batches = [inputs[i:i + config.MAPPING_BATCH_SIZE] for i in range(0, len(inputs), config.MAPPING_BATCH_SIZE)] | |
| schema_str = json.dumps(REAL_ESTATE_SCHEMA_MAP, indent=0) | |
| for i, batch in enumerate(batches): | |
| print(f" Mapping Batch {i+1}/{len(batches)}...") | |
| input_str = json.dumps(batch, indent=0) | |
| system_prompt = f""" | |
| You are a Data Schema Mapper for Real Estate Contracts. | |
| TASK: | |
| Map the input fields to the provided "Schema Definition". | |
| RULES: | |
| 1. Match based on meaning: "Sale Price" -> "purchasePrice". | |
| 2. STRICTLY CHECK TYPES: | |
| - Input "detected_type" MUST be compatible with Schema "type". | |
| 3. Use "role" to disambiguate: "Name" + "Buyer" -> "buyerName". | |
| 4. If no good match exists, return null for "schema_key". | |
| SCHEMA DEFINITION: | |
| {schema_str} | |
| Return JSON: {{ "mappings": [ {{ "uuid": "...", "schema_key": "..." }} ] }} | |
| """ | |
| try: | |
| response = await client.chat.completions.create( | |
| model="gpt-4o", | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"INPUTS:\n{input_str}"} | |
| ], | |
| temperature=0.0 | |
| ) | |
| data = json.loads(response.choices[0].message.content) | |
| utils.save_debug_json(data, f"mapping_batch_{i}_llm") | |
| for m in data.get("mappings", []): | |
| mapped_updates[m["uuid"]] = m.get("schema_key") | |
| except Exception as e: | |
| print(f"Mapping Error Batch {i}: {e}") | |
| # Apply Updates | |
| # for f in extracted_fields: | |
| # matched_key = mapped_updates.get(f["id"]) | |
| # f["aliasId"] = matched_key | |
| # if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP: | |
| # f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"] | |
| # else: | |
| # f["semanticType"] = f["detected_type"] | |
| # return extracted_fields | |
| # Apply Updates | |
| for f in extracted_fields: | |
| matched_key = mapped_updates.get(f["id"]) | |
| # 1. Canonical Match (It exists in your strict schema) | |
| if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP: | |
| f["aliasId"] = matched_key | |
| f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"] | |
| f["isDynamic"] = False | |
| # 2. Dynamic Match (It's valid data, but not in your schema yet) | |
| else: | |
| # Generate a safe slug: "Loan Amount" -> "custom_loan_amount" | |
| # We sanitize the label to make it a valid JSON key | |
| # clean_slug = "".join(c if c.isalnum() else "_" for c in f["label"].lower()) | |
| # clean_slug = clean_slug.strip("_")[:50] # Limit length | |
| parts = re.split(r'[^a-zA-Z0-9]+', f["label"].lower()) | |
| result = parts[0] | |
| current_len = len(result) | |
| max_length = 20 | |
| for word in parts[1:]: | |
| add = word.capitalize() | |
| if current_len + len(add) > max_length: | |
| break | |
| result += add | |
| f["aliasId"] = f"{result}" | |
| f["semanticType"] = f.get("detected_type", "shortText") | |
| f["isDynamic"] = True | |
| return extracted_fields |