Spaces:
Runtime error
Runtime error
File size: 4,185 Bytes
db81e28 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | import json
from openai import AsyncOpenAI
import config
import utils_geometry as utils
from schema_definitions import REAL_ESTATE_SCHEMA_MAP
import re
# You can use a different client here if you want a different key/model!
client = AsyncOpenAI(api_key=config.OPENAI_API_KEY)
async def map_fields_to_schema(extracted_fields):
print(f"\nPhase 2: Mapping {len(extracted_fields)} fields to Schema...")
inputs = []
for f in extracted_fields:
inputs.append({
"uuid": f["id"],
"label": f["label"],
"role": f["role"],
"detected_type": f["detected_type"]
})
mapped_updates = {}
batches = [inputs[i:i + config.MAPPING_BATCH_SIZE] for i in range(0, len(inputs), config.MAPPING_BATCH_SIZE)]
schema_str = json.dumps(REAL_ESTATE_SCHEMA_MAP, indent=0)
for i, batch in enumerate(batches):
print(f" Mapping Batch {i+1}/{len(batches)}...")
input_str = json.dumps(batch, indent=0)
system_prompt = f"""
You are a Data Schema Mapper for Real Estate Contracts.
TASK:
Map the input fields to the provided "Schema Definition".
RULES:
1. Match based on meaning: "Sale Price" -> "purchasePrice".
2. STRICTLY CHECK TYPES:
- Input "detected_type" MUST be compatible with Schema "type".
3. Use "role" to disambiguate: "Name" + "Buyer" -> "buyerName".
4. If no good match exists, return null for "schema_key".
SCHEMA DEFINITION:
{schema_str}
Return JSON: {{ "mappings": [ {{ "uuid": "...", "schema_key": "..." }} ] }}
"""
try:
response = await client.chat.completions.create(
model="gpt-4o",
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"INPUTS:\n{input_str}"}
],
temperature=0.0
)
data = json.loads(response.choices[0].message.content)
utils.save_debug_json(data, f"mapping_batch_{i}_llm")
for m in data.get("mappings", []):
mapped_updates[m["uuid"]] = m.get("schema_key")
except Exception as e:
print(f"Mapping Error Batch {i}: {e}")
# Apply Updates
# for f in extracted_fields:
# matched_key = mapped_updates.get(f["id"])
# f["aliasId"] = matched_key
# if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP:
# f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"]
# else:
# f["semanticType"] = f["detected_type"]
# return extracted_fields
# Apply Updates
for f in extracted_fields:
matched_key = mapped_updates.get(f["id"])
# 1. Canonical Match (It exists in your strict schema)
if matched_key and matched_key in REAL_ESTATE_SCHEMA_MAP:
f["aliasId"] = matched_key
f["semanticType"] = REAL_ESTATE_SCHEMA_MAP[matched_key]["type"]
f["isDynamic"] = False
# 2. Dynamic Match (It's valid data, but not in your schema yet)
else:
# Generate a safe slug: "Loan Amount" -> "custom_loan_amount"
# We sanitize the label to make it a valid JSON key
# clean_slug = "".join(c if c.isalnum() else "_" for c in f["label"].lower())
# clean_slug = clean_slug.strip("_")[:50] # Limit length
parts = re.split(r'[^a-zA-Z0-9]+', f["label"].lower())
result = parts[0]
current_len = len(result)
max_length = 20
for word in parts[1:]:
add = word.capitalize()
if current_len + len(add) > max_length:
break
result += add
f["aliasId"] = f"{result}"
f["semanticType"] = f.get("detected_type", "shortText")
f["isDynamic"] = True
return extracted_fields |