import json import asyncio import base64 import uuid import time import random from openai import AsyncOpenAI import config import utils_geometry as utils from schema_definitions import VALID_FIELD_TYPES client = AsyncOpenAI(api_key=config.OPENAI_API_KEY) #updated the code to push # ========================================== # WORKER: PROCESS SINGLE BATCH (With Retries) # ========================================== async def process_single_batch(semaphore, doc, page_num, batch_idx, batch_fields, global_context): async with semaphore: prompt_items = [] for f in batch_fields: anchors = f["debug_anchors"] prompt_items.append( f"- Box ID {f['temp_id']}:\n" f" Spatial Hints -> Left: '{anchors['left']}' | Above: '{anchors['above']}'\n" f" PDF Type Hint: {f.get('ft', 'text')}" ) # B. Render Image img_bytes = await asyncio.to_thread(utils.render_hollow_debug_image, doc, page_num, batch_fields) if not img_bytes: return [] # Save debug artifact tag = f"{page_num}_batch_{batch_idx}" utils.save_debug_image(img_bytes, tag) b64_img = base64.b64encode(img_bytes).decode('utf-8') # C. System Prompt (UPDATED with section_context) valid_types_str = ", ".join(VALID_FIELD_TYPES) system_prompt = f""" You are an expert Legal Document Processor. CONTEXT: Real Estate Contract. Global Context: "{global_context}" TASK: Analyze the Neon Green Boxes (IDs {batch_fields[0]['temp_id']} to {batch_fields[-1]['temp_id']}). OUTPUT RULES: For each box, return JSON with: 1. "visual_evidence": Text closest to the box. 2. "section_context": The BOLD HEADER or SECTION TITLE this field belongs to (e.g. "Purchase Price", "Property Description", "Closing Date"). 3. "final_label": Precise natural label (e.g. "Purchase Price", "Seller Signature"). 4. "role": Who fills this out? Choose ONLY from: [Buyer, Seller, Agent, Broker, President, Reviewer, Disclosing Party, Receiving Party, N/A]. - If ambiguous, infer from section header (e.g. "Tenant's Signature" -> "Tenant"). - If strictly administrative (e.g. "Office Use Only"), return "System". 5. "detected_type": MUST be one of [{valid_types_str}]. - If it looks like money ($), use "dollar". - If it looks like a date, use "date". - If it's a signature, use "signature". INPUT DATA: {chr(10).join(prompt_items)} Return JSON: {{ "fields": [ {{ "box_id": 1, ... }} ] }} """ # D. Retry Logic (Restored from your original code) MAX_RETRIES = 5 BASE_DELAY = 2 batch_results = [] page_height = doc[page_num].rect.height for attempt in range(MAX_RETRIES): try: response = await client.chat.completions.create( model="gpt-4o", # Use gpt-4o for best vision response_format={"type": "json_object"}, messages=[ {"role": "user", "content": [ {"type": "text", "text": system_prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_img}"}} ]} ], temperature=0.0 ) content = response.choices[0].message.content parsed = json.loads(content) utils.save_debug_json(parsed, f"{tag}_vision_response") results_map = {item["box_id"]: item for item in parsed.get("fields", [])} for f in batch_fields: res = results_map.get(f["temp_id"], {}) label = res.get("final_label", "") # Fallback Geometry Logic if not label or label == "Unknown": anchors = f["debug_anchors"] label = anchors["left"] if anchors["left"] else (anchors["above"] if anchors["above"] else "Unknown Field") norm_rect = utils.normalize_bbox_to_top_left(f["bbox"], page_height) batch_results.append({ "id": f.get("name", str(uuid.uuid4())[:8]), "temp_id": f["temp_id"], "label": label, "section": res.get("section_context", "General Information"), # <--- Capturing Section Context "role": res.get("role", "System"), "detected_type": res.get("detected_type", "shortText"), "uiType": "checkbox" if f.get("ft") == "/Btn" else "text", "page": page_num, "rect": { "x": norm_rect["x0"], "y": norm_rect["y0"], "width": norm_rect["x1"] - norm_rect["x0"], "height": norm_rect["y1"] - norm_rect["y0"] }, "debug_evidence": res.get("visual_evidence", "N/A") }) break # Success, exit retry loop except Exception as e: error_msg = str(e) if "429" in error_msg or "Rate limit" in error_msg: wait_time = (BASE_DELAY * (2 ** attempt)) + (random.random() * 0.5) print(f"Rate Limit ({tag}). Waiting {wait_time:.2f}s...") await asyncio.sleep(wait_time) # Use await sleep for async! else: print(f"Error {tag}: {e}") break return batch_results # # ========================================== # ORCHESTRATOR: PROCESS PAGE # ========================================== async def process_page_smart(semaphore, doc, page_num, fields, global_context): page = doc[page_num] page_words = utils.get_words_from_page(page) page_height = page.rect.height # 1. Pre-calc anchors for idx, f in enumerate(fields): f["temp_id"] = idx + 1 f["debug_anchors"] = utils.calculate_smart_anchors(f["bbox"], page_words, page_height) # 2. Create Batches batches = [fields[i:i + config.VISION_BATCH_SIZE] for i in range(0, len(fields), config.VISION_BATCH_SIZE)] print(f"📄 Page {page_num}: Queuing {len(batches)} batches for {len(fields)} fields...") # 3. Spawn Parallel Tasks (Restored Concurrency) tasks = [] for batch_idx, batch_fields in enumerate(batches): task = asyncio.create_task( process_single_batch(semaphore, doc, page_num, batch_idx, batch_fields, global_context) ) tasks.append(task) # 4. Gather Results results = await asyncio.gather(*tasks) return [item for sublist in results for item in sublist]