import os import base64 import json import re from io import BytesIO from typing import Any, Dict, List, Optional, Tuple from openai import OpenAI try: import fitz # PyMuPDF from PIL import Image PDF_SUPPORT = True except ImportError as e: PDF_SUPPORT = False print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.") # OCR Model Configuration (from sample code) OCR_BASE_URL = os.environ.get("OCR_BASE_URL", "https://od5yev2behke5u-8000.proxy.runpod.net/v1") OCR_API_KEY = os.environ.get("OCR_API_KEY", "Ezofis@123") OCR_MODEL_NAME = os.environ.get("OCR_MODEL_NAME", "EZOFISOCR") # Initialize OpenAI client with OCR endpoint ocr_client = OpenAI( base_url=OCR_BASE_URL, api_key=OCR_API_KEY, ) def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]: """ Convert PDF pages to PNG images. Returns a list of PNG image bytes, one per page. """ if not PDF_SUPPORT: raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.") pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") images = [] print(f"[INFO] PDF has {len(pdf_doc)} page(s)") for page_num in range(len(pdf_doc)): page = pdf_doc[page_num] # Render page to image (zoom factor 2 for better quality) mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality pix = page.get_pixmap(matrix=mat) # Convert to PIL Image then to JPEG bytes (better compression) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) img_bytes = BytesIO() img.save(img_bytes, format="JPEG", quality=95) images.append(img_bytes.getvalue()) print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})") pdf_doc.close() return images def _image_bytes_to_base64(image_bytes: bytes) -> str: """Convert image bytes to base64 data URL (JPEG format).""" b64 = base64.b64encode(image_bytes).decode("utf-8") data_url = f"data:image/jpeg;base64,{b64}" print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars") return data_url def _parse_markdown_table(text: str) -> Optional[Tuple[List[str], List[List[str]]]]: """ Parse a markdown table from text. Returns (headers, rows) if table found, None otherwise. Handles various table formats including malformed ones. """ lines = [line.strip() for line in text.split('\n')] # Find potential table start (line with multiple | and actual text content) table_start = None for i, line in enumerate(lines): if '|' in line and line.count('|') >= 2: # Skip separator lines (only |, -, :, spaces) if re.match(r'^[\s\|\-:]+$', line): continue # Check if line has meaningful text (not just | characters) cells = [cell.strip() for cell in line.split('|')] if cells and not cells[0]: cells = cells[1:] if cells and not cells[-1]: cells = cells[:-1] # Must have at least 2 columns with some text meaningful_cells = [c for c in cells if len(c) > 0] if len(meaningful_cells) >= 2: table_start = i break if table_start is None: return None # Find table end (first non-empty line without | after table start) table_end = None for i in range(table_start + 1, len(lines)): line = lines[i] if not line: # Empty line, continue continue if '|' not in line: # Non-empty line without | means table ended table_end = i break if table_end is None: table_end = len(lines) table_lines = lines[table_start:table_end] # Find the actual header row (should have meaningful text, not just | or separators) headers = None header_idx = None for i, line in enumerate(table_lines): if not line or '|' not in line: continue # Skip separator lines (lines with only |, -, :, spaces) if re.match(r'^[\s\|\-:]+$', line): continue # Check if this line has meaningful content (not just | characters) cells = [cell.strip() for cell in line.split('|')] # Remove empty cells at start/end if cells and not cells[0]: cells = cells[1:] if cells and not cells[-1]: cells = cells[:-1] # Header should have at least 3 columns and meaningful text if len(cells) >= 3: # Check if cells have actual text (not just empty or single char) meaningful_cells = [c for c in cells if len(c) > 1] if len(meaningful_cells) >= 3: headers = cells header_idx = i break if not headers or header_idx is None: return None # Parse data rows (skip separator line after header if present) rows = [] num_columns = len(headers) for i in range(header_idx + 1, len(table_lines)): line = table_lines[i] if not line: continue # Skip separator lines if re.match(r'^[\s\|\-:]+$', line): continue if '|' not in line: # No more table rows break cells = [cell.strip() for cell in line.split('|')] # Remove empty cells at start/end if cells and not cells[0]: cells = cells[1:] if cells and not cells[-1]: cells = cells[:-1] # Only add rows that match header column count (allow some flexibility) if len(cells) == num_columns or (len(cells) >= num_columns - 1 and len(cells) <= num_columns + 1): # Pad or trim to match header count if len(cells) < num_columns: cells.extend([''] * (num_columns - len(cells))) elif len(cells) > num_columns: cells = cells[:num_columns] # Only add if row has at least one non-empty cell if any(cell for cell in cells): rows.append(cells) if not rows: return None return (headers, rows) def _extract_metadata(text: str) -> Dict[str, str]: """ Extract metadata from document header text. Looks for title, office, notice number, and description. """ metadata = { "title": "", "office": "", "notice_no": "", "description": "" } lines = [line.strip() for line in text.split('\n') if line.strip()] # Extract office (usually first non-empty line) if lines: metadata["office"] = lines[0] # Look for notice number pattern (like "पत्रक सं- 1239" or "सं- 1239") notice_pattern = r'(?:पत्रक\s+)?सं[-\s:]*(\d+)' for line in lines[:10]: # Check first 10 lines match = re.search(notice_pattern, line) if match: metadata["notice_no"] = match.group(1) break # Look for title - usually in quotes or contains specific keywords # Check for quoted text first quoted_title = re.search(r'["""]([^"""]+)["""]', text[:1000]) if quoted_title: metadata["title"] = quoted_title.group(1).strip() else: # Look for title patterns title_keywords = ['सम्पत्ति', 'सूचना', 'विज्ञप्ति', 'नाम परिवर्तन'] for line in lines[:5]: if any(keyword in line for keyword in title_keywords): # Extract the title phrase title_match = re.search(r'(सम्पत्ति[^।]*|सूचना[^।]*|विज्ञप्ति[^।]*)', line) if title_match: metadata["title"] = title_match.group(1).strip() break # Extract description (text before table, usually contains key phrases) description_keywords = ['नाम परिवर्तन', 'अधिनियम', 'धारा', 'प्रकाशन', 'आवेदन'] description_parts = [] for i, line in enumerate(lines[:15]): # Check first 15 lines if any(keyword in line for keyword in description_keywords): description_parts.append(line) # Get a few surrounding lines for context if i > 0: description_parts.insert(0, lines[i-1]) if i < len(lines) - 1: description_parts.append(lines[i+1]) break if description_parts: description = ' '.join(description_parts).strip() if len(description) > 30: # Only if substantial # Clean up and limit length description = re.sub(r'\s+', ' ', description) metadata["description"] = description[:300] # Limit length return metadata def _extract_footer_notes(text: str) -> List[str]: """ Extract footer notes from document. Usually appears after the table. """ notes = [] # Find table end lines = text.split('\n') table_end_idx = len(lines) for i, line in enumerate(lines): if '|' in line: # Find last table line j = i + 1 while j < len(lines) and ('|' in lines[j] or re.match(r'^[\s\|\-:]+$', lines[j])): j += 1 table_end_idx = j break # Extract footer text (after table) footer_lines = lines[table_end_idx:] footer_text = '\n'.join(footer_lines).strip() # Split into sentences/notes # Look for sentences ending with period, exclamation, or specific keywords sentences = re.split(r'[।\.!]\s+', footer_text) for sentence in sentences: sentence = sentence.strip() if len(sentence) > 20: # Only substantial notes # Clean up sentence = re.sub(r'\s+', ' ', sentence) if sentence: notes.append(sentence) # Limit to most relevant notes (usually 2-4) return notes[:5] def _parse_text_with_tables(text: str) -> Dict[str, Any]: """ Parse text and extract structured data including tables. Returns structured JSON format with metadata, table, and footer_notes. """ result = { "text": text, # Keep original text "metadata": {}, "table": [], "footer_notes": [] } # Check if text contains a table table_data = _parse_markdown_table(text) if table_data: headers, rows = table_data print(f"[INFO] Found table with {len(headers)} columns and {len(rows)} rows") # Extract metadata result["metadata"] = _extract_metadata(text) # Map headers to field names using original header text # Keep original language, just make valid JSON keys and handle duplicates header_mapping = {} header_counts = {} # Track occurrences of each header for i, header in enumerate(headers): header_clean = header.strip() # Create a valid JSON key from the original header # Remove special characters that aren't valid in JSON keys, but keep the text # Replace spaces and special chars with underscores, but preserve the original text header_key = header_clean # Track how many times we've seen this exact header if header_key not in header_counts: header_counts[header_key] = 0 header_counts[header_key] += 1 # If this header appears multiple times, append a number if header_counts[header_key] > 1: header_key = f"{header_key}_{header_counts[header_key]}" # Clean the key to be valid for JSON (remove/replace problematic characters) # Keep the original text but make it JSON-safe header_key = re.sub(r'[^\w\s\u0900-\u097F]', '', header_key) # Keep Unicode Hindi chars header_key = re.sub(r'\s+', '_', header_key) # Replace spaces with underscores # If key is empty after cleaning, use column index if not header_key: header_key = f"column_{i+1}" header_mapping[i] = header_key # Parse table rows - each row becomes a separate section table_rows_dict = {} for idx, row in enumerate(rows, start=1): row_dict = {} for i, header_idx in header_mapping.items(): if i < len(row): row_dict[header_idx] = row[i].strip() if row_dict: # Each row is a separate section: row_1, row_2, etc. table_rows_dict[f"row_{idx}"] = row_dict # Store rows as separate sections instead of array result["table"] = table_rows_dict # Extract footer notes result["footer_notes"] = _extract_footer_notes(text) else: # No table found, just extract basic metadata result["metadata"] = _extract_metadata(text) result["footer_notes"] = _extract_footer_notes(text) return result async def _extract_text_with_ocr(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]: """ Extract text from a single page/image using the OCR model. Returns text output in full_text field, keeps fields empty for now. """ # Convert image bytes to base64 data URL data_url = _image_bytes_to_base64(image_bytes) print(f"[INFO] OCR: Processing page {page_num}/{total_pages} with model {OCR_MODEL_NAME}") try: # Use OpenAI client with OCR endpoint (as per sample code) import asyncio loop = asyncio.get_event_loop() # Run the synchronous OpenAI call in executor response = await loop.run_in_executor( None, lambda: ocr_client.chat.completions.create( model=OCR_MODEL_NAME, messages=[ { "role": "user", "content": [ {"type": "text", "text": "Extract all text from this image"}, { "type": "image_url", "image_url": { "url": data_url } } ] } ], ) ) # Extract text from response extracted_text = response.choices[0].message.content if not extracted_text: extracted_text = "" print(f"[INFO] OCR: Extracted {len(extracted_text)} characters from page {page_num}") # Calculate confidence based on response quality confidence = _calculate_ocr_confidence(response, extracted_text) # Return text in full_text, keep fields empty for now return { "doc_type": "other", "confidence": confidence, "full_text": extracted_text, "fields": {} # Keep fields empty for now } except Exception as e: error_msg = str(e) print(f"[ERROR] OCR API error for page {page_num}: {error_msg}") raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}") def _calculate_ocr_confidence(response, extracted_text: str) -> float: """ Calculate confidence score based on OCR response quality. Checks for explicit confidence in response, or calculates based on heuristics. """ # Check if response has explicit confidence score try: # Check response object for confidence-related fields if hasattr(response, 'usage'): # Some models provide usage info that might indicate quality usage = response.usage if hasattr(usage, 'completion_tokens') and usage.completion_tokens > 0: # More tokens might indicate better extraction pass # Check if finish_reason indicates quality if hasattr(response.choices[0], 'finish_reason'): finish_reason = response.choices[0].finish_reason if finish_reason == "stop": # Normal completion - good sign base_confidence = 85.0 elif finish_reason == "length": # Response was truncated - lower confidence base_confidence = 70.0 else: base_confidence = 75.0 else: base_confidence = 85.0 except Exception: base_confidence = 85.0 # Adjust confidence based on text quality heuristics text_length = len(extracted_text.strip()) if text_length == 0: return 0.0 elif text_length < 10: # Very short text - might be error or empty return max(30.0, base_confidence - 30.0) elif text_length < 50: # Short text return max(50.0, base_confidence - 15.0) elif text_length > 1000: # Long text - likely good extraction confidence = min(95.0, base_confidence + 10.0) else: confidence = base_confidence # Check for structured content (tables, etc.) - indicates good extraction if '|' in extracted_text and extracted_text.count('|') > 5: # Table detected - boost confidence confidence = min(95.0, confidence + 5.0) # Check for meaningful content (non-whitespace ratio) non_whitespace = len([c for c in extracted_text if not c.isspace()]) if text_length > 0: content_ratio = non_whitespace / text_length if content_ratio > 0.8: # High content ratio - good confidence = min(95.0, confidence + 3.0) elif content_ratio < 0.3: # Low content ratio - mostly whitespace confidence = max(50.0, confidence - 10.0) return round(confidence, 1) async def extract_fields_from_document( file_bytes: bytes, content_type: str, filename: str, ) -> Dict[str, Any]: """ Extract text from document using OCR model. Processes pages separately for better reliability. Returns text output in full_text, keeps JSON/XML fields empty for now. """ # Get raw image bytes for processing if content_type == "application/pdf" or content_type.endswith("/pdf"): if not PDF_SUPPORT: raise RuntimeError("PDF support requires PyMuPDF. Please install it.") # For PDFs, convert to images pdf_images = _pdf_to_images(file_bytes) image_bytes_list = pdf_images else: # For regular images, process the file bytes # Convert to JPEG for consistency try: img = Image.open(BytesIO(file_bytes)) if img.mode != "RGB": img = img.convert("RGB") # Resize if too large (max 1920px on longest side) max_size = 1920 w, h = img.size if w > max_size or h > max_size: if w > h: new_w = max_size new_h = int(h * (max_size / w)) else: new_h = max_size new_w = int(w * (max_size / h)) img = img.resize((new_w, new_h), Image.LANCZOS) print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}") # Convert to JPEG bytes img_bytes = BytesIO() img.save(img_bytes, format="JPEG", quality=95) image_bytes_list = [img_bytes.getvalue()] except Exception as e: # Fallback: use original file bytes print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.") image_bytes_list = [file_bytes] total_pages = len(image_bytes_list) print(f"[INFO] Processing {total_pages} page(s) with OCR model...") # Process each page separately page_results = [] for page_num, img_bytes in enumerate(image_bytes_list): print(f"[INFO] Processing page {page_num + 1}/{total_pages}...") try: page_result = await _extract_text_with_ocr(img_bytes, page_num + 1, total_pages) page_results.append({ "page_number": page_num + 1, "text": page_result.get("full_text", ""), "fields": page_result.get("fields", {}), "confidence": page_result.get("confidence", 0), "doc_type": page_result.get("doc_type", "other"), }) print(f"[INFO] Page {page_num + 1} processed successfully") except Exception as e: print(f"[ERROR] Failed to process page {page_num + 1}: {e}") page_results.append({ "page_number": page_num + 1, "text": "", "fields": {}, "confidence": 0, "error": str(e) }) # Combine results from all pages combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")]) # Parse each page for tables and structure the output structured_pages = {} for page_result in page_results: if page_result.get("text"): page_num = page_result.get("page_number", 1) page_text = page_result.get("text", "") # Parse text for tables and structure parsed_data = _parse_text_with_tables(page_text) # Build structured page output page_key = f"page_{page_num}" structured_pages[page_key] = { "text": parsed_data["text"], "metadata": parsed_data["metadata"], "table": parsed_data["table"], "footer_notes": parsed_data["footer_notes"], "confidence": page_result.get("confidence", 0), "doc_type": page_result.get("doc_type", "other") } # If we have structured pages, use them; otherwise keep fields empty if structured_pages: # Always return pages with page_X keys (even for single page) combined_fields = structured_pages else: combined_fields = {} # Calculate average confidence confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0] avg_confidence = sum(confidences) / len(confidences) if confidences else 0 # Determine doc_type from first successful page doc_type = "other" for page_result in page_results: if page_result.get("doc_type") and page_result["doc_type"] != "other": doc_type = page_result["doc_type"] break return { "doc_type": doc_type, "confidence": avg_confidence, "full_text": combined_full_text, "fields": combined_fields, # Now contains structured data with tables "pages": page_results }