Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import json | |
| import re | |
| from io import BytesIO | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from openai import OpenAI | |
| try: | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| PDF_SUPPORT = True | |
| except ImportError as e: | |
| PDF_SUPPORT = False | |
| print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.") | |
| # OCR Model Configuration (from sample code) | |
| OCR_BASE_URL = os.environ.get("OCR_BASE_URL", "https://od5yev2behke5u-8000.proxy.runpod.net/v1") | |
| OCR_API_KEY = os.environ.get("OCR_API_KEY", "Ezofis@123") | |
| OCR_MODEL_NAME = os.environ.get("OCR_MODEL_NAME", "EZOFISOCR") | |
| # Initialize OpenAI client with OCR endpoint | |
| ocr_client = OpenAI( | |
| base_url=OCR_BASE_URL, | |
| api_key=OCR_API_KEY, | |
| ) | |
| def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]: | |
| """ | |
| Convert PDF pages to PNG images. | |
| Returns a list of PNG image bytes, one per page. | |
| """ | |
| if not PDF_SUPPORT: | |
| raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.") | |
| pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| images = [] | |
| print(f"[INFO] PDF has {len(pdf_doc)} page(s)") | |
| for page_num in range(len(pdf_doc)): | |
| page = pdf_doc[page_num] | |
| # Render page to image (zoom factor 2 for better quality) | |
| mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to PIL Image then to JPEG bytes (better compression) | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| img_bytes = BytesIO() | |
| img.save(img_bytes, format="JPEG", quality=95) | |
| images.append(img_bytes.getvalue()) | |
| print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})") | |
| pdf_doc.close() | |
| return images | |
| def _image_bytes_to_base64(image_bytes: bytes) -> str: | |
| """Convert image bytes to base64 data URL (JPEG format).""" | |
| b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| data_url = f"data:image/jpeg;base64,{b64}" | |
| print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars") | |
| return data_url | |
| def _parse_markdown_table(text: str) -> Optional[Tuple[List[str], List[List[str]]]]: | |
| """ | |
| Parse a markdown table from text. | |
| Returns (headers, rows) if table found, None otherwise. | |
| Handles various table formats including malformed ones. | |
| """ | |
| lines = [line.strip() for line in text.split('\n')] | |
| # Find potential table start (line with multiple | and actual text content) | |
| table_start = None | |
| for i, line in enumerate(lines): | |
| if '|' in line and line.count('|') >= 2: | |
| # Skip separator lines (only |, -, :, spaces) | |
| if re.match(r'^[\s\|\-:]+$', line): | |
| continue | |
| # Check if line has meaningful text (not just | characters) | |
| cells = [cell.strip() for cell in line.split('|')] | |
| if cells and not cells[0]: | |
| cells = cells[1:] | |
| if cells and not cells[-1]: | |
| cells = cells[:-1] | |
| # Must have at least 2 columns with some text | |
| meaningful_cells = [c for c in cells if len(c) > 0] | |
| if len(meaningful_cells) >= 2: | |
| table_start = i | |
| break | |
| if table_start is None: | |
| return None | |
| # Find table end (first non-empty line without | after table start) | |
| table_end = None | |
| for i in range(table_start + 1, len(lines)): | |
| line = lines[i] | |
| if not line: # Empty line, continue | |
| continue | |
| if '|' not in line: | |
| # Non-empty line without | means table ended | |
| table_end = i | |
| break | |
| if table_end is None: | |
| table_end = len(lines) | |
| table_lines = lines[table_start:table_end] | |
| # Find the actual header row (should have meaningful text, not just | or separators) | |
| headers = None | |
| header_idx = None | |
| for i, line in enumerate(table_lines): | |
| if not line or '|' not in line: | |
| continue | |
| # Skip separator lines (lines with only |, -, :, spaces) | |
| if re.match(r'^[\s\|\-:]+$', line): | |
| continue | |
| # Check if this line has meaningful content (not just | characters) | |
| cells = [cell.strip() for cell in line.split('|')] | |
| # Remove empty cells at start/end | |
| if cells and not cells[0]: | |
| cells = cells[1:] | |
| if cells and not cells[-1]: | |
| cells = cells[:-1] | |
| # Header should have at least 3 columns and meaningful text | |
| if len(cells) >= 3: | |
| # Check if cells have actual text (not just empty or single char) | |
| meaningful_cells = [c for c in cells if len(c) > 1] | |
| if len(meaningful_cells) >= 3: | |
| headers = cells | |
| header_idx = i | |
| break | |
| if not headers or header_idx is None: | |
| return None | |
| # Parse data rows (skip separator line after header if present) | |
| rows = [] | |
| num_columns = len(headers) | |
| for i in range(header_idx + 1, len(table_lines)): | |
| line = table_lines[i] | |
| if not line: | |
| continue | |
| # Skip separator lines | |
| if re.match(r'^[\s\|\-:]+$', line): | |
| continue | |
| if '|' not in line: | |
| # No more table rows | |
| break | |
| cells = [cell.strip() for cell in line.split('|')] | |
| # Remove empty cells at start/end | |
| if cells and not cells[0]: | |
| cells = cells[1:] | |
| if cells and not cells[-1]: | |
| cells = cells[:-1] | |
| # Only add rows that match header column count (allow some flexibility) | |
| if len(cells) == num_columns or (len(cells) >= num_columns - 1 and len(cells) <= num_columns + 1): | |
| # Pad or trim to match header count | |
| if len(cells) < num_columns: | |
| cells.extend([''] * (num_columns - len(cells))) | |
| elif len(cells) > num_columns: | |
| cells = cells[:num_columns] | |
| # Only add if row has at least one non-empty cell | |
| if any(cell for cell in cells): | |
| rows.append(cells) | |
| if not rows: | |
| return None | |
| return (headers, rows) | |
| def _extract_metadata(text: str) -> Dict[str, str]: | |
| """ | |
| Extract metadata from document header text. | |
| Looks for title, office, notice number, and description. | |
| """ | |
| metadata = { | |
| "title": "", | |
| "office": "", | |
| "notice_no": "", | |
| "description": "" | |
| } | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| # Extract office (usually first non-empty line) | |
| if lines: | |
| metadata["office"] = lines[0] | |
| # Look for notice number pattern (like "पत्रक सं- 1239" or "सं- 1239") | |
| notice_pattern = r'(?:पत्रक\s+)?सं[-\s:]*(\d+)' | |
| for line in lines[:10]: # Check first 10 lines | |
| match = re.search(notice_pattern, line) | |
| if match: | |
| metadata["notice_no"] = match.group(1) | |
| break | |
| # Look for title - usually in quotes or contains specific keywords | |
| # Check for quoted text first | |
| quoted_title = re.search(r'["""]([^"""]+)["""]', text[:1000]) | |
| if quoted_title: | |
| metadata["title"] = quoted_title.group(1).strip() | |
| else: | |
| # Look for title patterns | |
| title_keywords = ['सम्पत्ति', 'सूचना', 'विज्ञप्ति', 'नाम परिवर्तन'] | |
| for line in lines[:5]: | |
| if any(keyword in line for keyword in title_keywords): | |
| # Extract the title phrase | |
| title_match = re.search(r'(सम्पत्ति[^।]*|सूचना[^।]*|विज्ञप्ति[^।]*)', line) | |
| if title_match: | |
| metadata["title"] = title_match.group(1).strip() | |
| break | |
| # Extract description (text before table, usually contains key phrases) | |
| description_keywords = ['नाम परिवर्तन', 'अधिनियम', 'धारा', 'प्रकाशन', 'आवेदन'] | |
| description_parts = [] | |
| for i, line in enumerate(lines[:15]): # Check first 15 lines | |
| if any(keyword in line for keyword in description_keywords): | |
| description_parts.append(line) | |
| # Get a few surrounding lines for context | |
| if i > 0: | |
| description_parts.insert(0, lines[i-1]) | |
| if i < len(lines) - 1: | |
| description_parts.append(lines[i+1]) | |
| break | |
| if description_parts: | |
| description = ' '.join(description_parts).strip() | |
| if len(description) > 30: # Only if substantial | |
| # Clean up and limit length | |
| description = re.sub(r'\s+', ' ', description) | |
| metadata["description"] = description[:300] # Limit length | |
| return metadata | |
| def _extract_footer_notes(text: str) -> List[str]: | |
| """ | |
| Extract footer notes from document. | |
| Usually appears after the table. | |
| """ | |
| notes = [] | |
| # Find table end | |
| lines = text.split('\n') | |
| table_end_idx = len(lines) | |
| for i, line in enumerate(lines): | |
| if '|' in line: | |
| # Find last table line | |
| j = i + 1 | |
| while j < len(lines) and ('|' in lines[j] or re.match(r'^[\s\|\-:]+$', lines[j])): | |
| j += 1 | |
| table_end_idx = j | |
| break | |
| # Extract footer text (after table) | |
| footer_lines = lines[table_end_idx:] | |
| footer_text = '\n'.join(footer_lines).strip() | |
| # Split into sentences/notes | |
| # Look for sentences ending with period, exclamation, or specific keywords | |
| sentences = re.split(r'[।\.!]\s+', footer_text) | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 20: # Only substantial notes | |
| # Clean up | |
| sentence = re.sub(r'\s+', ' ', sentence) | |
| if sentence: | |
| notes.append(sentence) | |
| # Limit to most relevant notes (usually 2-4) | |
| return notes[:5] | |
| def _parse_text_with_tables(text: str) -> Dict[str, Any]: | |
| """ | |
| Parse text and extract structured data including tables. | |
| Returns structured JSON format with metadata, table, and footer_notes. | |
| """ | |
| result = { | |
| "text": text, # Keep original text | |
| "metadata": {}, | |
| "table": [], | |
| "footer_notes": [] | |
| } | |
| # Check if text contains a table | |
| table_data = _parse_markdown_table(text) | |
| if table_data: | |
| headers, rows = table_data | |
| print(f"[INFO] Found table with {len(headers)} columns and {len(rows)} rows") | |
| # Extract metadata | |
| result["metadata"] = _extract_metadata(text) | |
| # Map headers to field names using original header text | |
| # Keep original language, just make valid JSON keys and handle duplicates | |
| header_mapping = {} | |
| header_counts = {} # Track occurrences of each header | |
| for i, header in enumerate(headers): | |
| header_clean = header.strip() | |
| # Create a valid JSON key from the original header | |
| # Remove special characters that aren't valid in JSON keys, but keep the text | |
| # Replace spaces and special chars with underscores, but preserve the original text | |
| header_key = header_clean | |
| # Track how many times we've seen this exact header | |
| if header_key not in header_counts: | |
| header_counts[header_key] = 0 | |
| header_counts[header_key] += 1 | |
| # If this header appears multiple times, append a number | |
| if header_counts[header_key] > 1: | |
| header_key = f"{header_key}_{header_counts[header_key]}" | |
| # Clean the key to be valid for JSON (remove/replace problematic characters) | |
| # Keep the original text but make it JSON-safe | |
| header_key = re.sub(r'[^\w\s\u0900-\u097F]', '', header_key) # Keep Unicode Hindi chars | |
| header_key = re.sub(r'\s+', '_', header_key) # Replace spaces with underscores | |
| # If key is empty after cleaning, use column index | |
| if not header_key: | |
| header_key = f"column_{i+1}" | |
| header_mapping[i] = header_key | |
| # Parse table rows - each row becomes a separate section | |
| table_rows_dict = {} | |
| for idx, row in enumerate(rows, start=1): | |
| row_dict = {} | |
| for i, header_idx in header_mapping.items(): | |
| if i < len(row): | |
| row_dict[header_idx] = row[i].strip() | |
| if row_dict: | |
| # Each row is a separate section: row_1, row_2, etc. | |
| table_rows_dict[f"row_{idx}"] = row_dict | |
| # Store rows as separate sections instead of array | |
| result["table"] = table_rows_dict | |
| # Extract footer notes | |
| result["footer_notes"] = _extract_footer_notes(text) | |
| else: | |
| # No table found, just extract basic metadata | |
| result["metadata"] = _extract_metadata(text) | |
| result["footer_notes"] = _extract_footer_notes(text) | |
| return result | |
| async def _extract_text_with_ocr(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]: | |
| """ | |
| Extract text from a single page/image using the OCR model. | |
| Returns text output in full_text field, keeps fields empty for now. | |
| """ | |
| # Convert image bytes to base64 data URL | |
| data_url = _image_bytes_to_base64(image_bytes) | |
| print(f"[INFO] OCR: Processing page {page_num}/{total_pages} with model {OCR_MODEL_NAME}") | |
| try: | |
| # Use OpenAI client with OCR endpoint (as per sample code) | |
| import asyncio | |
| loop = asyncio.get_event_loop() | |
| # Run the synchronous OpenAI call in executor | |
| response = await loop.run_in_executor( | |
| None, | |
| lambda: ocr_client.chat.completions.create( | |
| model=OCR_MODEL_NAME, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Extract all text from this image"}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": data_url | |
| } | |
| } | |
| ] | |
| } | |
| ], | |
| ) | |
| ) | |
| # Extract text from response | |
| extracted_text = response.choices[0].message.content | |
| if not extracted_text: | |
| extracted_text = "" | |
| print(f"[INFO] OCR: Extracted {len(extracted_text)} characters from page {page_num}") | |
| # Calculate confidence based on response quality | |
| confidence = _calculate_ocr_confidence(response, extracted_text) | |
| # Return text in full_text, keep fields empty for now | |
| return { | |
| "doc_type": "other", | |
| "confidence": confidence, | |
| "full_text": extracted_text, | |
| "fields": {} # Keep fields empty for now | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"[ERROR] OCR API error for page {page_num}: {error_msg}") | |
| raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}") | |
| def _calculate_ocr_confidence(response, extracted_text: str) -> float: | |
| """ | |
| Calculate confidence score based on OCR response quality. | |
| Checks for explicit confidence in response, or calculates based on heuristics. | |
| """ | |
| # Check if response has explicit confidence score | |
| try: | |
| # Check response object for confidence-related fields | |
| if hasattr(response, 'usage'): | |
| # Some models provide usage info that might indicate quality | |
| usage = response.usage | |
| if hasattr(usage, 'completion_tokens') and usage.completion_tokens > 0: | |
| # More tokens might indicate better extraction | |
| pass | |
| # Check if finish_reason indicates quality | |
| if hasattr(response.choices[0], 'finish_reason'): | |
| finish_reason = response.choices[0].finish_reason | |
| if finish_reason == "stop": | |
| # Normal completion - good sign | |
| base_confidence = 85.0 | |
| elif finish_reason == "length": | |
| # Response was truncated - lower confidence | |
| base_confidence = 70.0 | |
| else: | |
| base_confidence = 75.0 | |
| else: | |
| base_confidence = 85.0 | |
| except Exception: | |
| base_confidence = 85.0 | |
| # Adjust confidence based on text quality heuristics | |
| text_length = len(extracted_text.strip()) | |
| if text_length == 0: | |
| return 0.0 | |
| elif text_length < 10: | |
| # Very short text - might be error or empty | |
| return max(30.0, base_confidence - 30.0) | |
| elif text_length < 50: | |
| # Short text | |
| return max(50.0, base_confidence - 15.0) | |
| elif text_length > 1000: | |
| # Long text - likely good extraction | |
| confidence = min(95.0, base_confidence + 10.0) | |
| else: | |
| confidence = base_confidence | |
| # Check for structured content (tables, etc.) - indicates good extraction | |
| if '|' in extracted_text and extracted_text.count('|') > 5: | |
| # Table detected - boost confidence | |
| confidence = min(95.0, confidence + 5.0) | |
| # Check for meaningful content (non-whitespace ratio) | |
| non_whitespace = len([c for c in extracted_text if not c.isspace()]) | |
| if text_length > 0: | |
| content_ratio = non_whitespace / text_length | |
| if content_ratio > 0.8: | |
| # High content ratio - good | |
| confidence = min(95.0, confidence + 3.0) | |
| elif content_ratio < 0.3: | |
| # Low content ratio - mostly whitespace | |
| confidence = max(50.0, confidence - 10.0) | |
| return round(confidence, 1) | |
| async def extract_fields_from_document( | |
| file_bytes: bytes, | |
| content_type: str, | |
| filename: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Extract text from document using OCR model. | |
| Processes pages separately for better reliability. | |
| Returns text output in full_text, keeps JSON/XML fields empty for now. | |
| """ | |
| # Get raw image bytes for processing | |
| if content_type == "application/pdf" or content_type.endswith("/pdf"): | |
| if not PDF_SUPPORT: | |
| raise RuntimeError("PDF support requires PyMuPDF. Please install it.") | |
| # For PDFs, convert to images | |
| pdf_images = _pdf_to_images(file_bytes) | |
| image_bytes_list = pdf_images | |
| else: | |
| # For regular images, process the file bytes | |
| # Convert to JPEG for consistency | |
| try: | |
| img = Image.open(BytesIO(file_bytes)) | |
| if img.mode != "RGB": | |
| img = img.convert("RGB") | |
| # Resize if too large (max 1920px on longest side) | |
| max_size = 1920 | |
| w, h = img.size | |
| if w > max_size or h > max_size: | |
| if w > h: | |
| new_w = max_size | |
| new_h = int(h * (max_size / w)) | |
| else: | |
| new_h = max_size | |
| new_w = int(w * (max_size / h)) | |
| img = img.resize((new_w, new_h), Image.LANCZOS) | |
| print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}") | |
| # Convert to JPEG bytes | |
| img_bytes = BytesIO() | |
| img.save(img_bytes, format="JPEG", quality=95) | |
| image_bytes_list = [img_bytes.getvalue()] | |
| except Exception as e: | |
| # Fallback: use original file bytes | |
| print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.") | |
| image_bytes_list = [file_bytes] | |
| total_pages = len(image_bytes_list) | |
| print(f"[INFO] Processing {total_pages} page(s) with OCR model...") | |
| # Process each page separately | |
| page_results = [] | |
| for page_num, img_bytes in enumerate(image_bytes_list): | |
| print(f"[INFO] Processing page {page_num + 1}/{total_pages}...") | |
| try: | |
| page_result = await _extract_text_with_ocr(img_bytes, page_num + 1, total_pages) | |
| page_results.append({ | |
| "page_number": page_num + 1, | |
| "text": page_result.get("full_text", ""), | |
| "fields": page_result.get("fields", {}), | |
| "confidence": page_result.get("confidence", 0), | |
| "doc_type": page_result.get("doc_type", "other"), | |
| }) | |
| print(f"[INFO] Page {page_num + 1} processed successfully") | |
| except Exception as e: | |
| print(f"[ERROR] Failed to process page {page_num + 1}: {e}") | |
| page_results.append({ | |
| "page_number": page_num + 1, | |
| "text": "", | |
| "fields": {}, | |
| "confidence": 0, | |
| "error": str(e) | |
| }) | |
| # Combine results from all pages | |
| combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")]) | |
| # Parse each page for tables and structure the output | |
| structured_pages = {} | |
| for page_result in page_results: | |
| if page_result.get("text"): | |
| page_num = page_result.get("page_number", 1) | |
| page_text = page_result.get("text", "") | |
| # Parse text for tables and structure | |
| parsed_data = _parse_text_with_tables(page_text) | |
| # Build structured page output | |
| page_key = f"page_{page_num}" | |
| structured_pages[page_key] = { | |
| "text": parsed_data["text"], | |
| "metadata": parsed_data["metadata"], | |
| "table": parsed_data["table"], | |
| "footer_notes": parsed_data["footer_notes"], | |
| "confidence": page_result.get("confidence", 0), | |
| "doc_type": page_result.get("doc_type", "other") | |
| } | |
| # If we have structured pages, use them; otherwise keep fields empty | |
| if structured_pages: | |
| # Always return pages with page_X keys (even for single page) | |
| combined_fields = structured_pages | |
| else: | |
| combined_fields = {} | |
| # Calculate average confidence | |
| confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0] | |
| avg_confidence = sum(confidences) / len(confidences) if confidences else 0 | |
| # Determine doc_type from first successful page | |
| doc_type = "other" | |
| for page_result in page_results: | |
| if page_result.get("doc_type") and page_result["doc_type"] != "other": | |
| doc_type = page_result["doc_type"] | |
| break | |
| return { | |
| "doc_type": doc_type, | |
| "confidence": avg_confidence, | |
| "full_text": combined_full_text, | |
| "fields": combined_fields, # Now contains structured data with tables | |
| "pages": page_results | |
| } | |