EZOFISAIOCR / backend /app /openrouter_client.py
Seth0330's picture
Update backend/app/openrouter_client.py
a3239f4 verified
import os
import base64
import json
import re
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
from openai import OpenAI
try:
import fitz # PyMuPDF
from PIL import Image
PDF_SUPPORT = True
except ImportError as e:
PDF_SUPPORT = False
print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.")
# OCR Model Configuration (from sample code)
OCR_BASE_URL = os.environ.get("OCR_BASE_URL", "https://od5yev2behke5u-8000.proxy.runpod.net/v1")
OCR_API_KEY = os.environ.get("OCR_API_KEY", "Ezofis@123")
OCR_MODEL_NAME = os.environ.get("OCR_MODEL_NAME", "EZOFISOCR")
# Initialize OpenAI client with OCR endpoint
ocr_client = OpenAI(
base_url=OCR_BASE_URL,
api_key=OCR_API_KEY,
)
def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]:
"""
Convert PDF pages to PNG images.
Returns a list of PNG image bytes, one per page.
"""
if not PDF_SUPPORT:
raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.")
pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf")
images = []
print(f"[INFO] PDF has {len(pdf_doc)} page(s)")
for page_num in range(len(pdf_doc)):
page = pdf_doc[page_num]
# Render page to image (zoom factor 2 for better quality)
mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image then to JPEG bytes (better compression)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
images.append(img_bytes.getvalue())
print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})")
pdf_doc.close()
return images
def _image_bytes_to_base64(image_bytes: bytes) -> str:
"""Convert image bytes to base64 data URL (JPEG format)."""
b64 = base64.b64encode(image_bytes).decode("utf-8")
data_url = f"data:image/jpeg;base64,{b64}"
print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars")
return data_url
def _parse_markdown_table(text: str) -> Optional[Tuple[List[str], List[List[str]]]]:
"""
Parse a markdown table from text.
Returns (headers, rows) if table found, None otherwise.
Handles various table formats including malformed ones.
"""
lines = [line.strip() for line in text.split('\n')]
# Find potential table start (line with multiple | and actual text content)
table_start = None
for i, line in enumerate(lines):
if '|' in line and line.count('|') >= 2:
# Skip separator lines (only |, -, :, spaces)
if re.match(r'^[\s\|\-:]+$', line):
continue
# Check if line has meaningful text (not just | characters)
cells = [cell.strip() for cell in line.split('|')]
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Must have at least 2 columns with some text
meaningful_cells = [c for c in cells if len(c) > 0]
if len(meaningful_cells) >= 2:
table_start = i
break
if table_start is None:
return None
# Find table end (first non-empty line without | after table start)
table_end = None
for i in range(table_start + 1, len(lines)):
line = lines[i]
if not line: # Empty line, continue
continue
if '|' not in line:
# Non-empty line without | means table ended
table_end = i
break
if table_end is None:
table_end = len(lines)
table_lines = lines[table_start:table_end]
# Find the actual header row (should have meaningful text, not just | or separators)
headers = None
header_idx = None
for i, line in enumerate(table_lines):
if not line or '|' not in line:
continue
# Skip separator lines (lines with only |, -, :, spaces)
if re.match(r'^[\s\|\-:]+$', line):
continue
# Check if this line has meaningful content (not just | characters)
cells = [cell.strip() for cell in line.split('|')]
# Remove empty cells at start/end
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Header should have at least 3 columns and meaningful text
if len(cells) >= 3:
# Check if cells have actual text (not just empty or single char)
meaningful_cells = [c for c in cells if len(c) > 1]
if len(meaningful_cells) >= 3:
headers = cells
header_idx = i
break
if not headers or header_idx is None:
return None
# Parse data rows (skip separator line after header if present)
rows = []
num_columns = len(headers)
for i in range(header_idx + 1, len(table_lines)):
line = table_lines[i]
if not line:
continue
# Skip separator lines
if re.match(r'^[\s\|\-:]+$', line):
continue
if '|' not in line:
# No more table rows
break
cells = [cell.strip() for cell in line.split('|')]
# Remove empty cells at start/end
if cells and not cells[0]:
cells = cells[1:]
if cells and not cells[-1]:
cells = cells[:-1]
# Only add rows that match header column count (allow some flexibility)
if len(cells) == num_columns or (len(cells) >= num_columns - 1 and len(cells) <= num_columns + 1):
# Pad or trim to match header count
if len(cells) < num_columns:
cells.extend([''] * (num_columns - len(cells)))
elif len(cells) > num_columns:
cells = cells[:num_columns]
# Only add if row has at least one non-empty cell
if any(cell for cell in cells):
rows.append(cells)
if not rows:
return None
return (headers, rows)
def _extract_metadata(text: str) -> Dict[str, str]:
"""
Extract metadata from document header text.
Looks for title, office, notice number, and description.
"""
metadata = {
"title": "",
"office": "",
"notice_no": "",
"description": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
# Extract office (usually first non-empty line)
if lines:
metadata["office"] = lines[0]
# Look for notice number pattern (like "पत्रक सं- 1239" or "सं- 1239")
notice_pattern = r'(?:पत्रक\s+)?सं[-\s:]*(\d+)'
for line in lines[:10]: # Check first 10 lines
match = re.search(notice_pattern, line)
if match:
metadata["notice_no"] = match.group(1)
break
# Look for title - usually in quotes or contains specific keywords
# Check for quoted text first
quoted_title = re.search(r'["""]([^"""]+)["""]', text[:1000])
if quoted_title:
metadata["title"] = quoted_title.group(1).strip()
else:
# Look for title patterns
title_keywords = ['सम्पत्ति', 'सूचना', 'विज्ञप्ति', 'नाम परिवर्तन']
for line in lines[:5]:
if any(keyword in line for keyword in title_keywords):
# Extract the title phrase
title_match = re.search(r'(सम्पत्ति[^।]*|सूचना[^।]*|विज्ञप्ति[^।]*)', line)
if title_match:
metadata["title"] = title_match.group(1).strip()
break
# Extract description (text before table, usually contains key phrases)
description_keywords = ['नाम परिवर्तन', 'अधिनियम', 'धारा', 'प्रकाशन', 'आवेदन']
description_parts = []
for i, line in enumerate(lines[:15]): # Check first 15 lines
if any(keyword in line for keyword in description_keywords):
description_parts.append(line)
# Get a few surrounding lines for context
if i > 0:
description_parts.insert(0, lines[i-1])
if i < len(lines) - 1:
description_parts.append(lines[i+1])
break
if description_parts:
description = ' '.join(description_parts).strip()
if len(description) > 30: # Only if substantial
# Clean up and limit length
description = re.sub(r'\s+', ' ', description)
metadata["description"] = description[:300] # Limit length
return metadata
def _extract_footer_notes(text: str) -> List[str]:
"""
Extract footer notes from document.
Usually appears after the table.
"""
notes = []
# Find table end
lines = text.split('\n')
table_end_idx = len(lines)
for i, line in enumerate(lines):
if '|' in line:
# Find last table line
j = i + 1
while j < len(lines) and ('|' in lines[j] or re.match(r'^[\s\|\-:]+$', lines[j])):
j += 1
table_end_idx = j
break
# Extract footer text (after table)
footer_lines = lines[table_end_idx:]
footer_text = '\n'.join(footer_lines).strip()
# Split into sentences/notes
# Look for sentences ending with period, exclamation, or specific keywords
sentences = re.split(r'[।\.!]\s+', footer_text)
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 20: # Only substantial notes
# Clean up
sentence = re.sub(r'\s+', ' ', sentence)
if sentence:
notes.append(sentence)
# Limit to most relevant notes (usually 2-4)
return notes[:5]
def _parse_text_with_tables(text: str) -> Dict[str, Any]:
"""
Parse text and extract structured data including tables.
Returns structured JSON format with metadata, table, and footer_notes.
"""
result = {
"text": text, # Keep original text
"metadata": {},
"table": [],
"footer_notes": []
}
# Check if text contains a table
table_data = _parse_markdown_table(text)
if table_data:
headers, rows = table_data
print(f"[INFO] Found table with {len(headers)} columns and {len(rows)} rows")
# Extract metadata
result["metadata"] = _extract_metadata(text)
# Map headers to field names using original header text
# Keep original language, just make valid JSON keys and handle duplicates
header_mapping = {}
header_counts = {} # Track occurrences of each header
for i, header in enumerate(headers):
header_clean = header.strip()
# Create a valid JSON key from the original header
# Remove special characters that aren't valid in JSON keys, but keep the text
# Replace spaces and special chars with underscores, but preserve the original text
header_key = header_clean
# Track how many times we've seen this exact header
if header_key not in header_counts:
header_counts[header_key] = 0
header_counts[header_key] += 1
# If this header appears multiple times, append a number
if header_counts[header_key] > 1:
header_key = f"{header_key}_{header_counts[header_key]}"
# Clean the key to be valid for JSON (remove/replace problematic characters)
# Keep the original text but make it JSON-safe
header_key = re.sub(r'[^\w\s\u0900-\u097F]', '', header_key) # Keep Unicode Hindi chars
header_key = re.sub(r'\s+', '_', header_key) # Replace spaces with underscores
# If key is empty after cleaning, use column index
if not header_key:
header_key = f"column_{i+1}"
header_mapping[i] = header_key
# Parse table rows - each row becomes a separate section
table_rows_dict = {}
for idx, row in enumerate(rows, start=1):
row_dict = {}
for i, header_idx in header_mapping.items():
if i < len(row):
row_dict[header_idx] = row[i].strip()
if row_dict:
# Each row is a separate section: row_1, row_2, etc.
table_rows_dict[f"row_{idx}"] = row_dict
# Store rows as separate sections instead of array
result["table"] = table_rows_dict
# Extract footer notes
result["footer_notes"] = _extract_footer_notes(text)
else:
# No table found, just extract basic metadata
result["metadata"] = _extract_metadata(text)
result["footer_notes"] = _extract_footer_notes(text)
return result
async def _extract_text_with_ocr(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]:
"""
Extract text from a single page/image using the OCR model.
Returns text output in full_text field, keeps fields empty for now.
"""
# Convert image bytes to base64 data URL
data_url = _image_bytes_to_base64(image_bytes)
print(f"[INFO] OCR: Processing page {page_num}/{total_pages} with model {OCR_MODEL_NAME}")
try:
# Use OpenAI client with OCR endpoint (as per sample code)
import asyncio
loop = asyncio.get_event_loop()
# Run the synchronous OpenAI call in executor
response = await loop.run_in_executor(
None,
lambda: ocr_client.chat.completions.create(
model=OCR_MODEL_NAME,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Extract all text from this image"},
{
"type": "image_url",
"image_url": {
"url": data_url
}
}
]
}
],
)
)
# Extract text from response
extracted_text = response.choices[0].message.content
if not extracted_text:
extracted_text = ""
print(f"[INFO] OCR: Extracted {len(extracted_text)} characters from page {page_num}")
# Calculate confidence based on response quality
confidence = _calculate_ocr_confidence(response, extracted_text)
# Return text in full_text, keep fields empty for now
return {
"doc_type": "other",
"confidence": confidence,
"full_text": extracted_text,
"fields": {} # Keep fields empty for now
}
except Exception as e:
error_msg = str(e)
print(f"[ERROR] OCR API error for page {page_num}: {error_msg}")
raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}")
def _calculate_ocr_confidence(response, extracted_text: str) -> float:
"""
Calculate confidence score based on OCR response quality.
Checks for explicit confidence in response, or calculates based on heuristics.
"""
# Check if response has explicit confidence score
try:
# Check response object for confidence-related fields
if hasattr(response, 'usage'):
# Some models provide usage info that might indicate quality
usage = response.usage
if hasattr(usage, 'completion_tokens') and usage.completion_tokens > 0:
# More tokens might indicate better extraction
pass
# Check if finish_reason indicates quality
if hasattr(response.choices[0], 'finish_reason'):
finish_reason = response.choices[0].finish_reason
if finish_reason == "stop":
# Normal completion - good sign
base_confidence = 85.0
elif finish_reason == "length":
# Response was truncated - lower confidence
base_confidence = 70.0
else:
base_confidence = 75.0
else:
base_confidence = 85.0
except Exception:
base_confidence = 85.0
# Adjust confidence based on text quality heuristics
text_length = len(extracted_text.strip())
if text_length == 0:
return 0.0
elif text_length < 10:
# Very short text - might be error or empty
return max(30.0, base_confidence - 30.0)
elif text_length < 50:
# Short text
return max(50.0, base_confidence - 15.0)
elif text_length > 1000:
# Long text - likely good extraction
confidence = min(95.0, base_confidence + 10.0)
else:
confidence = base_confidence
# Check for structured content (tables, etc.) - indicates good extraction
if '|' in extracted_text and extracted_text.count('|') > 5:
# Table detected - boost confidence
confidence = min(95.0, confidence + 5.0)
# Check for meaningful content (non-whitespace ratio)
non_whitespace = len([c for c in extracted_text if not c.isspace()])
if text_length > 0:
content_ratio = non_whitespace / text_length
if content_ratio > 0.8:
# High content ratio - good
confidence = min(95.0, confidence + 3.0)
elif content_ratio < 0.3:
# Low content ratio - mostly whitespace
confidence = max(50.0, confidence - 10.0)
return round(confidence, 1)
async def extract_fields_from_document(
file_bytes: bytes,
content_type: str,
filename: str,
) -> Dict[str, Any]:
"""
Extract text from document using OCR model.
Processes pages separately for better reliability.
Returns text output in full_text, keeps JSON/XML fields empty for now.
"""
# Get raw image bytes for processing
if content_type == "application/pdf" or content_type.endswith("/pdf"):
if not PDF_SUPPORT:
raise RuntimeError("PDF support requires PyMuPDF. Please install it.")
# For PDFs, convert to images
pdf_images = _pdf_to_images(file_bytes)
image_bytes_list = pdf_images
else:
# For regular images, process the file bytes
# Convert to JPEG for consistency
try:
img = Image.open(BytesIO(file_bytes))
if img.mode != "RGB":
img = img.convert("RGB")
# Resize if too large (max 1920px on longest side)
max_size = 1920
w, h = img.size
if w > max_size or h > max_size:
if w > h:
new_w = max_size
new_h = int(h * (max_size / w))
else:
new_h = max_size
new_w = int(w * (max_size / h))
img = img.resize((new_w, new_h), Image.LANCZOS)
print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}")
# Convert to JPEG bytes
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
image_bytes_list = [img_bytes.getvalue()]
except Exception as e:
# Fallback: use original file bytes
print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.")
image_bytes_list = [file_bytes]
total_pages = len(image_bytes_list)
print(f"[INFO] Processing {total_pages} page(s) with OCR model...")
# Process each page separately
page_results = []
for page_num, img_bytes in enumerate(image_bytes_list):
print(f"[INFO] Processing page {page_num + 1}/{total_pages}...")
try:
page_result = await _extract_text_with_ocr(img_bytes, page_num + 1, total_pages)
page_results.append({
"page_number": page_num + 1,
"text": page_result.get("full_text", ""),
"fields": page_result.get("fields", {}),
"confidence": page_result.get("confidence", 0),
"doc_type": page_result.get("doc_type", "other"),
})
print(f"[INFO] Page {page_num + 1} processed successfully")
except Exception as e:
print(f"[ERROR] Failed to process page {page_num + 1}: {e}")
page_results.append({
"page_number": page_num + 1,
"text": "",
"fields": {},
"confidence": 0,
"error": str(e)
})
# Combine results from all pages
combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")])
# Parse each page for tables and structure the output
structured_pages = {}
for page_result in page_results:
if page_result.get("text"):
page_num = page_result.get("page_number", 1)
page_text = page_result.get("text", "")
# Parse text for tables and structure
parsed_data = _parse_text_with_tables(page_text)
# Build structured page output
page_key = f"page_{page_num}"
structured_pages[page_key] = {
"text": parsed_data["text"],
"metadata": parsed_data["metadata"],
"table": parsed_data["table"],
"footer_notes": parsed_data["footer_notes"],
"confidence": page_result.get("confidence", 0),
"doc_type": page_result.get("doc_type", "other")
}
# If we have structured pages, use them; otherwise keep fields empty
if structured_pages:
# Always return pages with page_X keys (even for single page)
combined_fields = structured_pages
else:
combined_fields = {}
# Calculate average confidence
confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
# Determine doc_type from first successful page
doc_type = "other"
for page_result in page_results:
if page_result.get("doc_type") and page_result["doc_type"] != "other":
doc_type = page_result["doc_type"]
break
return {
"doc_type": doc_type,
"confidence": avg_confidence,
"full_text": combined_full_text,
"fields": combined_fields, # Now contains structured data with tables
"pages": page_results
}