EZOFISOCR / backend /app /openrouter_client.py
Seth
update
ced5eff
import os
import base64
import json
import re
import time
import asyncio
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import httpx
try:
import fitz # PyMuPDF
from PIL import Image
PDF_SUPPORT = True
except ImportError as e:
PDF_SUPPORT = False
print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.")
# RunPod Serverless OCR Configuration
RUNPOD_ENDPOINT = os.environ.get("RUNPOD_ENDPOINT", "https://api.runpod.ai/v2/j2jvf8t6n0rk5c/run")
RUNPOD_API_KEY = os.environ.get("RUNPOD_API_KEY", "rpa_0UJOK33ZO7SID9B3ASFSKKPUHNPBQC5Z2128RB4O4qi9ts")
# Extract endpoint ID from endpoint URL for status polling
# URL format: https://api.runpod.ai/v2/{endpoint_id}/run
_endpoint_id = RUNPOD_ENDPOINT.split("/v2/")[1].split("/")[0] if "/v2/" in RUNPOD_ENDPOINT else None
RUNPOD_STATUS_ENDPOINT = f"https://api.runpod.ai/v2/{_endpoint_id}/status" if _endpoint_id else None
def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]:
"""
Convert PDF pages to PNG images.
Returns a list of PNG image bytes, one per page.
"""
if not PDF_SUPPORT:
raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.")
pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf")
images = []
print(f"[INFO] PDF has {len(pdf_doc)} page(s)")
for page_num in range(len(pdf_doc)):
page = pdf_doc[page_num]
# Render page to image (zoom factor 2 for better quality)
mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# Resize if too large to avoid GPU memory issues (max 1920px on longest side)
max_size = 1920
w, h = img.size
if w > max_size or h > max_size:
if w > h:
new_w = max_size
new_h = int(h * (max_size / w))
else:
new_h = max_size
new_w = int(w * (max_size / h))
img = img.resize((new_w, new_h), Image.LANCZOS)
print(f"[INFO] Resized page {page_num + 1} from {w}x{h} to {new_w}x{new_h}")
else:
print(f"[INFO] Converted page {page_num + 1} to image ({w}x{h})")
# Convert to JPEG bytes (better compression)
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
images.append(img_bytes.getvalue())
pdf_doc.close()
return images
def _image_bytes_to_base64(image_bytes: bytes) -> str:
"""Convert image bytes to base64 data URL (JPEG format)."""
b64 = base64.b64encode(image_bytes).decode("utf-8")
data_url = f"data:image/jpeg;base64,{b64}"
print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars")
return data_url
def _parse_markdown_table(text: str) -> Optional[Tuple[List[str], List[List[str]]]]:
"""
Parse a markdown table from text.
Returns (headers, rows) if table found, None otherwise.
Handles various table formats including malformed ones.
"""
lines = [line.strip() for line in text.split('\n')]
# Find potential table start (line with multiple | and actual text content)
table_start = None
for i, line in enumerate(lines):
if '|' in line and line.count('|') >= 2:
# Skip separator lines (only |, -, :, spaces)
if re.match(r'^[\s\|\-:]+$', line):
continue
# Check if line has meaningful text (not just | characters)
cells = [cell.strip() for cell in line.split('|')]
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Must have at least 2 columns with some text
meaningful_cells = [c for c in cells if len(c) > 0]
if len(meaningful_cells) >= 2:
table_start = i
break
if table_start is None:
return None
# Find table end (first non-empty line without | after table start)
table_end = None
for i in range(table_start + 1, len(lines)):
line = lines[i]
if not line: # Empty line, continue
continue
if '|' not in line:
# Non-empty line without | means table ended
table_end = i
break
if table_end is None:
table_end = len(lines)
table_lines = lines[table_start:table_end]
# Find the actual header row (should have meaningful text, not just | or separators)
headers = None
header_idx = None
for i, line in enumerate(table_lines):
if not line or '|' not in line:
continue
# Skip separator lines (lines with only |, -, :, spaces)
if re.match(r'^[\s\|\-:]+$', line):
continue
# Check if this line has meaningful content (not just | characters)
cells = [cell.strip() for cell in line.split('|')]
# Remove empty cells at start/end
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Header should have at least 3 columns and meaningful text
if len(cells) >= 3:
# Check if cells have actual text (not just empty or single char)
meaningful_cells = [c for c in cells if len(c) > 1]
if len(meaningful_cells) >= 3:
headers = cells
header_idx = i
break
if not headers or header_idx is None:
return None
# Parse data rows (skip separator line after header if present)
rows = []
num_columns = len(headers)
for i in range(header_idx + 1, len(table_lines)):
line = table_lines[i]
if not line:
continue
# Skip separator lines
if re.match(r'^[\s\|\-:]+$', line):
continue
if '|' not in line:
# No more table rows
break
cells = [cell.strip() for cell in line.split('|')]
# Remove empty cells at start/end
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Only add rows that match header column count (allow some flexibility)
if len(cells) == num_columns or (len(cells) >= num_columns - 1 and len(cells) <= num_columns + 1):
# Pad or trim to match header count
if len(cells) < num_columns:
cells.extend([''] * (num_columns - len(cells)))
elif len(cells) > num_columns:
cells = cells[:num_columns]
# Only add if row has at least one non-empty cell
if any(cell for cell in cells):
rows.append(cells)
if not rows:
return None
return (headers, rows)
def _extract_metadata(text: str) -> Dict[str, str]:
"""
Extract metadata from document header text.
Looks for title, office, notice number, and description.
"""
metadata = {
"title": "",
"office": "",
"notice_no": "",
"description": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
# Extract office (usually first non-empty line)
if lines:
metadata["office"] = lines[0]
# Look for notice number pattern (like "पत्रक सं- 1239" or "सं- 1239")
notice_pattern = r'(?:पत्रक\s+)?सं[-\s:]*(\d+)'
for line in lines[:10]: # Check first 10 lines
match = re.search(notice_pattern, line)
if match:
metadata["notice_no"] = match.group(1)
break
# Look for title - usually in quotes or contains specific keywords
# Check for quoted text first
quoted_title = re.search(r'["""]([^"""]+)["""]', text[:1000])
if quoted_title:
metadata["title"] = quoted_title.group(1).strip()
else:
# Look for title patterns
title_keywords = ['सम्पत्ति', 'सूचना', 'विज्ञप्ति', 'नाम परिवर्तन']
for line in lines[:5]:
if any(keyword in line for keyword in title_keywords):
# Extract the title phrase
title_match = re.search(r'(सम्पत्ति[^।]*|सूचना[^।]*|विज्ञप्ति[^।]*)', line)
if title_match:
metadata["title"] = title_match.group(1).strip()
break
# Extract description (text before table, usually contains key phrases)
description_keywords = ['नाम परिवर्तन', 'अधिनियम', 'धारा', 'प्रकाशन', 'आवेदन']
description_parts = []
for i, line in enumerate(lines[:15]): # Check first 15 lines
if any(keyword in line for keyword in description_keywords):
description_parts.append(line)
# Get a few surrounding lines for context
if i > 0:
description_parts.insert(0, lines[i-1])
if i < len(lines) - 1:
description_parts.append(lines[i+1])
break
if description_parts:
description = ' '.join(description_parts).strip()
if len(description) > 30: # Only if substantial
# Clean up and limit length
description = re.sub(r'\s+', ' ', description)
metadata["description"] = description[:300] # Limit length
return metadata
def _parse_model_response(response_text: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse model response to extract text and metadata.
The model may return text and metadata in various formats.
Returns: (extracted_text, metadata_dict)
"""
metadata = {}
text = response_text
# Try to find JSON metadata section
# Look for METADATA: or metadata: section
metadata_patterns = [
r'METADATA:\s*\n?\s*({.*?})(?:\n\n|\nTEXT|$)',
r'metadata:\s*\n?\s*({.*?})(?:\n\n|\nTEXT|$)',
r'METADATA:\s*\n?\s*```json\s*({.*?})\s*```',
r'METADATA:\s*\n?\s*```\s*({.*?})\s*```',
]
for pattern in metadata_patterns:
match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE)
if match:
try:
metadata_json = match.group(1).strip()
metadata = json.loads(metadata_json)
# Remove metadata section from text
text = response_text[:match.start()] + response_text[match.end():]
break
except (json.JSONDecodeError, IndexError):
continue
# If no JSON found, try to extract metadata from structured text format
if not metadata:
# Look for key-value pairs in METADATA section
metadata_section = re.search(r'METADATA:\s*\n(.*?)(?:\n\n|\nTEXT|$)', response_text, re.DOTALL | re.IGNORECASE)
if metadata_section:
metadata_text = metadata_section.group(1)
# Parse key-value pairs
for line in metadata_text.split('\n'):
if ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
key = parts[0].strip().lower().replace(' ', '_')
value = parts[1].strip()
if value:
metadata[key] = value
# Extract TEXT section if present
text_match = re.search(r'TEXT:\s*\n(.*?)(?:\n\nMETADATA|$)', response_text, re.DOTALL | re.IGNORECASE)
if text_match:
text = text_match.group(1).strip()
else:
# If no TEXT section, remove METADATA section if found
text = re.sub(r'METADATA:.*', '', response_text, flags=re.DOTALL | re.IGNORECASE).strip()
# Clean up text
text = text.strip()
# Clean up metadata - remove empty values
metadata = {k: v for k, v in metadata.items() if v and str(v).strip()}
return text, metadata
def _extract_footer_notes(text: str) -> List[str]:
"""
Extract footer notes from document.
Usually appears after the table.
"""
notes = []
# Find table end
lines = text.split('\n')
table_end_idx = len(lines)
for i, line in enumerate(lines):
if '|' in line:
# Find last table line
j = i + 1
while j < len(lines) and ('|' in lines[j] or re.match(r'^[\s\|\-:]+$', lines[j])):
j += 1
table_end_idx = j
break
# Extract footer text (after table)
footer_lines = lines[table_end_idx:]
footer_text = '\n'.join(footer_lines).strip()
# Split into sentences/notes
# Look for sentences ending with period, exclamation, or specific keywords
sentences = re.split(r'[।\.!]\s+', footer_text)
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 20: # Only substantial notes
# Clean up
sentence = re.sub(r'\s+', ' ', sentence)
if sentence:
notes.append(sentence)
# Limit to most relevant notes (usually 2-4)
return notes[:5]
def _parse_text_with_tables(text: str, page_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Parse text and extract structured data including tables.
Uses model-extracted metadata if provided, otherwise falls back to basic extraction.
Returns structured JSON format with metadata, table, and footer_notes.
"""
result = {
"text": text, # Keep original text
"metadata": page_metadata if page_metadata else {},
"table": [],
"footer_notes": []
}
# Check if text contains a table
table_data = _parse_markdown_table(text)
if table_data:
headers, rows = table_data
print(f"[INFO] Found table with {len(headers)} columns and {len(rows)} rows")
# Use provided metadata or extract basic metadata as fallback
if not result["metadata"]:
result["metadata"] = _extract_metadata(text)
# Map headers to field names using original header text
# Keep original language, just make valid JSON keys and handle duplicates
header_mapping = {}
header_counts = {} # Track occurrences of each header
for i, header in enumerate(headers):
header_clean = header.strip()
# Create a valid JSON key from the original header
# Remove special characters that aren't valid in JSON keys, but keep the text
# Replace spaces and special chars with underscores, but preserve the original text
header_key = header_clean
# Track how many times we've seen this exact header
if header_key not in header_counts:
header_counts[header_key] = 0
header_counts[header_key] += 1
# If this header appears multiple times, append a number
if header_counts[header_key] > 1:
header_key = f"{header_key}_{header_counts[header_key]}"
# Clean the key to be valid for JSON (remove/replace problematic characters)
# Keep the original text but make it JSON-safe
header_key = re.sub(r'[^\w\s\u0900-\u097F]', '', header_key) # Keep Unicode Hindi chars
header_key = re.sub(r'\s+', '_', header_key) # Replace spaces with underscores
# If key is empty after cleaning, use column index
if not header_key:
header_key = f"column_{i+1}"
header_mapping[i] = header_key
# Parse table rows - each row becomes a separate section
table_rows_dict = {}
for idx, row in enumerate(rows, start=1):
row_dict = {}
for i, header_idx in header_mapping.items():
if i < len(row):
row_dict[header_idx] = row[i].strip()
if row_dict:
# Each row is a separate section: row_1, row_2, etc.
table_rows_dict[f"row_{idx}"] = row_dict
# Store rows as separate sections instead of array
result["table"] = table_rows_dict
# Extract footer notes
result["footer_notes"] = _extract_footer_notes(text)
else:
# No table found, just extract basic metadata
result["metadata"] = _extract_metadata(text)
result["footer_notes"] = _extract_footer_notes(text)
return result
async def _poll_runpod_job(job_id: str, client: httpx.AsyncClient, max_wait_time: int = 300) -> Dict[str, Any]:
"""
Poll RunPod job status until completion.
Returns the final job result with output.
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {RUNPOD_API_KEY}"
}
start_time = time.time()
poll_interval = 2 # Poll every 2 seconds
while True:
# Check timeout
elapsed = time.time() - start_time
if elapsed > max_wait_time:
raise RuntimeError(f"Job {job_id} timed out after {max_wait_time} seconds")
# Poll job status
status_url = f"{RUNPOD_STATUS_ENDPOINT}/{job_id}"
response = await client.get(status_url, headers=headers)
response.raise_for_status()
status_result = response.json()
status = status_result.get("status", "").upper()
if status == "COMPLETED":
print(f"[INFO] Job {job_id} completed successfully")
return status_result
elif status == "FAILED":
error_msg = status_result.get("error", "Unknown error")
raise RuntimeError(f"Job {job_id} failed: {error_msg}")
elif status in ["IN_QUEUE", "IN_PROGRESS"]:
print(f"[INFO] Job {job_id} status: {status}, waiting...")
await asyncio.sleep(poll_interval)
else:
# Unknown status, wait and retry
print(f"[INFO] Job {job_id} status: {status}, waiting...")
await asyncio.sleep(poll_interval)
async def _extract_text_with_ocr(image_bytes: bytes, page_num: int, total_pages: int, custom_prompt: str = None) -> Dict[str, Any]:
"""
Extract text and metadata from a single page/image using the RunPod serverless OCR model.
Uses model-driven extraction to identify and extract metadata fields dynamically.
Returns text output in full_text field and extracted metadata.
Args:
image_bytes: Image bytes to process
page_num: Page number
total_pages: Total number of pages
custom_prompt: Optional custom prompt for field extraction
"""
# Convert image bytes to base64
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
print(f"[INFO] OCR: Processing page {page_num}/{total_pages} with RunPod endpoint")
try:
# Use custom prompt if provided, otherwise use default
if custom_prompt:
metadata_prompt = custom_prompt
else:
# Default prompt for general text extraction
metadata_prompt = """Extract all text from this image."""
# Prepare request payload for RunPod
# RunPod serverless endpoints expect image_base64, image_url, or image_path
payload = {
"input": {
"prompt": metadata_prompt,
"image_base64": image_base64 # Base64 encoded image
}
}
# Make HTTP request to RunPod endpoint
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {RUNPOD_API_KEY}"
}
async with httpx.AsyncClient(timeout=300.0) as client:
# Submit job
response = await client.post(
RUNPOD_ENDPOINT,
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
# Check if this is an async job (has job ID and status)
job_id = result.get("id")
status = result.get("status", "").upper()
if job_id and status in ["IN_QUEUE", "IN_PROGRESS"]:
# This is an async job, need to poll for completion
print(f"[INFO] Job submitted with ID: {job_id}, status: {status}")
if not RUNPOD_STATUS_ENDPOINT:
raise RuntimeError("RunPod status endpoint not configured. Cannot poll async job.")
# Poll until completion
result = await _poll_runpod_job(job_id, client)
# Extract text from RunPod response
# RunPod serverless typically returns: {"id": "...", "status": "...", "output": "..."}
# The output might be a string or a dict depending on the model
extracted_text = ""
if "output" in result:
output = result["output"]
if isinstance(output, str):
extracted_text = output
elif isinstance(output, dict):
# If output is a dict, try common fields
extracted_text = output.get("text", output.get("result", output.get("content", "")))
if not extracted_text and isinstance(output.get("text"), str):
extracted_text = output["text"]
elif isinstance(output, list) and len(output) > 0:
# If output is a list, take the first element
extracted_text = str(output[0])
elif "result" in result:
extracted_text = str(result["result"])
elif "text" in result:
extracted_text = str(result["text"])
else:
# Fallback: convert entire response to string
extracted_text = str(result)
if not extracted_text:
extracted_text = ""
print(f"[INFO] OCR: Extracted {len(extracted_text)} characters from page {page_num}")
# Parse model response to extract text and metadata
parsed_text, parsed_metadata = _parse_model_response(extracted_text)
# Calculate confidence based on response quality
# Create a mock response object for compatibility with confidence calculation
mock_response = type('obj', (object,), {
'choices': [type('obj', (object,), {'finish_reason': 'stop'})()],
'usage': type('obj', (object,), {'completion_tokens': len(parsed_text.split())})()
})()
confidence = _calculate_ocr_confidence(mock_response, parsed_text)
# Determine document type from metadata if available
doc_type = parsed_metadata.get("document_type", "other")
if doc_type == "other" and parsed_metadata.get("title"):
# Try to infer from title
title_lower = parsed_metadata.get("title", "").lower()
if any(kw in title_lower for kw in ["tender", "bid", "quotation"]):
doc_type = "tender"
elif any(kw in title_lower for kw in ["recruitment", "appointment", "vacancy"]):
doc_type = "recruitment"
elif any(kw in title_lower for kw in ["notice", "notification", "circular"]):
doc_type = "notice"
# Return text and extracted metadata
return {
"doc_type": doc_type,
"confidence": confidence,
"full_text": parsed_text,
"fields": parsed_metadata if parsed_metadata else {} # Model-extracted metadata
}
except httpx.HTTPStatusError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text}"
print(f"[ERROR] OCR API HTTP error for page {page_num}: {error_msg}")
raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}")
except Exception as e:
error_msg = str(e)
print(f"[ERROR] OCR API error for page {page_num}: {error_msg}")
raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}")
def _calculate_ocr_confidence(response, extracted_text: str) -> float:
"""
Calculate confidence score based on OCR response quality.
Returns a score from 0-100, with higher scores for better extraction quality.
"""
# Start with a higher base confidence for successful extractions
base_confidence = 92.0
# Adjust confidence based on text quality heuristics
text_length = len(extracted_text.strip())
if text_length == 0:
return 0.0
elif text_length < 10:
# Very short text - might be error or empty
return max(30.0, base_confidence - 40.0)
elif text_length < 50:
# Short text - might be incomplete
return max(60.0, base_confidence - 20.0)
elif text_length > 1000:
# Long text - likely good extraction
confidence = min(100.0, base_confidence + 5.0)
elif text_length > 500:
# Medium-long text - good extraction
confidence = min(100.0, base_confidence + 3.0)
else:
confidence = base_confidence
# Check for structured content (tables, etc.) - indicates good extraction
if '|' in extracted_text and extracted_text.count('|') > 5:
# Table detected - boost confidence significantly
confidence = min(100.0, confidence + 6.0)
# Check for meaningful content (non-whitespace ratio)
non_whitespace = len([c for c in extracted_text if not c.isspace()])
if text_length > 0:
content_ratio = non_whitespace / text_length
if content_ratio > 0.85:
# Very high content ratio - excellent extraction
confidence = min(100.0, confidence + 5.0)
elif content_ratio > 0.75:
# High content ratio - good extraction
confidence = min(100.0, confidence + 3.0)
elif content_ratio > 0.6:
# Moderate content ratio - decent extraction
confidence = min(100.0, confidence + 1.0)
elif content_ratio < 0.3:
# Low content ratio - mostly whitespace
confidence = max(60.0, confidence - 15.0)
# Check for common OCR quality indicators
# Presence of numbers, dates, and structured patterns indicates good extraction
has_numbers = any(c.isdigit() for c in extracted_text)
has_letters = any(c.isalpha() for c in extracted_text)
has_punctuation = any(c in '.,;:!?()[]{}' for c in extracted_text)
if has_numbers and has_letters and has_punctuation:
# Well-structured text with mixed content - high confidence
confidence = min(100.0, confidence + 2.0)
# Cap at 100% and ensure minimum quality threshold
return round(min(100.0, max(0.0, confidence)), 1)
async def extract_fields_from_document(
file_bytes: bytes,
content_type: str,
filename: str,
key_fields: str = None,
) -> Dict[str, Any]:
"""
Extract text from document using OCR model.
Processes pages separately for better reliability.
Returns text output in full_text, keeps JSON/XML fields empty for now.
"""
# Get raw image bytes for processing
if content_type == "application/pdf" or content_type.endswith("/pdf"):
if not PDF_SUPPORT:
raise RuntimeError("PDF support requires PyMuPDF. Please install it.")
# For PDFs, convert to images
pdf_images = _pdf_to_images(file_bytes)
image_bytes_list = pdf_images
else:
# For regular images, process the file bytes
# Convert to JPEG for consistency
try:
img = Image.open(BytesIO(file_bytes))
if img.mode != "RGB":
img = img.convert("RGB")
# Resize if too large (max 1920px on longest side)
max_size = 1920
w, h = img.size
if w > max_size or h > max_size:
if w > h:
new_w = max_size
new_h = int(h * (max_size / w))
else:
new_h = max_size
new_w = int(w * (max_size / h))
img = img.resize((new_w, new_h), Image.LANCZOS)
print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}")
# Convert to JPEG bytes
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
image_bytes_list = [img_bytes.getvalue()]
except Exception as e:
# Fallback: use original file bytes
print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.")
image_bytes_list = [file_bytes]
total_pages = len(image_bytes_list)
print(f"[INFO] Processing {total_pages} page(s) with OCR model...")
# Process each page separately
page_results = []
for page_num, img_bytes in enumerate(image_bytes_list):
print(f"[INFO] Processing page {page_num + 1}/{total_pages}...")
try:
page_result = await _extract_text_with_ocr(img_bytes, page_num + 1, total_pages, None)
page_results.append({
"page_number": page_num + 1,
"text": page_result.get("full_text", ""),
"fields": page_result.get("fields", {}),
"confidence": page_result.get("confidence", 0),
"doc_type": page_result.get("doc_type", "other"),
})
print(f"[INFO] Page {page_num + 1} processed successfully")
except Exception as e:
print(f"[ERROR] Failed to process page {page_num + 1}: {e}")
page_results.append({
"page_number": page_num + 1,
"text": "",
"fields": {},
"confidence": 0,
"error": str(e)
})
# Combine results from all pages
combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")])
# Extract user-specified fields if key_fields provided
extracted_fields = {}
if key_fields and key_fields.strip():
# Parse user input: "Invoice Number, Invoice Date, PO Number" -> ['Invoice Number', 'Invoice Date', 'PO Number']
field_list = [f.strip() for f in key_fields.split(',') if f.strip()]
if field_list:
print(f"[INFO] Extracting user-specified fields: {field_list}")
# Format fields as JSON array string for prompt
fields_json = json.dumps(field_list)
custom_prompt = f"Extract the following fields from this image and return as JSON: {fields_json}. Return only a valid JSON object with the field names as keys and their extracted values."
# Run second OCR pass on first page (usually has most metadata) with custom prompt
if image_bytes_list and len(image_bytes_list) > 0:
try:
print("[INFO] Running second OCR pass for field extraction...")
field_result = await _extract_text_with_ocr(image_bytes_list[0], 1, 1, custom_prompt)
field_text = field_result.get("full_text", "")
# Try to parse JSON from the response
try:
# Look for JSON in the response
json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', field_text, re.DOTALL)
if json_match:
extracted_fields = json.loads(json_match.group(0))
print(f"[INFO] Successfully extracted {len(extracted_fields)} fields from second OCR pass")
else:
# Try parsing the entire response as JSON
extracted_fields = json.loads(field_text)
print(f"[INFO] Successfully extracted {len(extracted_fields)} fields from second OCR pass")
except json.JSONDecodeError:
print(f"[WARNING] Could not parse JSON from field extraction response: {field_text[:200]}")
extracted_fields = {}
except Exception as e:
print(f"[WARNING] Field extraction failed: {e}")
extracted_fields = {}
# Parse each page for tables and structure the output
structured_pages = {}
for page_result in page_results:
if page_result.get("text"):
page_num = page_result.get("page_number", 1)
page_text = page_result.get("text", "")
# Parse text for tables and structure
parsed_data = _parse_text_with_tables(page_text, {})
# Build structured page output (without Fields - moved to root level)
page_key = f"page_{page_num}"
structured_pages[page_key] = {
"text": parsed_data["text"],
"table": parsed_data["table"],
"footer_notes": parsed_data["footer_notes"],
"confidence": page_result.get("confidence", 0),
"doc_type": page_result.get("doc_type", "other")
}
# If we have structured pages, use them; otherwise keep fields empty
if structured_pages:
# Always return pages with page_X keys (even for single page)
combined_fields = structured_pages
else:
combined_fields = {}
# Calculate average confidence
confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
# Determine doc_type from first successful page
doc_type = "other"
for page_result in page_results:
if page_result.get("doc_type") and page_result["doc_type"] != "other":
doc_type = page_result["doc_type"]
break
# Build return object - add Fields at root level only if extracted_fields is not empty
return_obj = {
"doc_type": doc_type,
"confidence": avg_confidence,
"full_text": combined_full_text,
"fields": combined_fields, # Now contains structured data with tables
"pages": page_results
}
# Add Fields at root level only if user provided key_fields and extraction succeeded
if extracted_fields:
return_obj["Fields"] = extracted_fields
return return_obj