| import os |
| import base64 |
| import json |
| import re |
| import time |
| import asyncio |
| from io import BytesIO |
| from typing import Any, Dict, List, Optional, Tuple |
| import httpx |
|
|
| try: |
| import fitz |
| from PIL import Image |
| PDF_SUPPORT = True |
| except ImportError as e: |
| PDF_SUPPORT = False |
| print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.") |
|
|
|
|
| |
| RUNPOD_ENDPOINT = os.environ.get("RUNPOD_ENDPOINT", "https://api.runpod.ai/v2/j2jvf8t6n0rk5c/run") |
| RUNPOD_API_KEY = os.environ.get("RUNPOD_API_KEY", "rpa_0UJOK33ZO7SID9B3ASFSKKPUHNPBQC5Z2128RB4O4qi9ts") |
|
|
| |
| |
| _endpoint_id = RUNPOD_ENDPOINT.split("/v2/")[1].split("/")[0] if "/v2/" in RUNPOD_ENDPOINT else None |
| RUNPOD_STATUS_ENDPOINT = f"https://api.runpod.ai/v2/{_endpoint_id}/status" if _endpoint_id else None |
|
|
|
|
| def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]: |
| """ |
| Convert PDF pages to PNG images. |
| Returns a list of PNG image bytes, one per page. |
| """ |
| if not PDF_SUPPORT: |
| raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.") |
| |
| pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| images = [] |
| |
| print(f"[INFO] PDF has {len(pdf_doc)} page(s)") |
| |
| for page_num in range(len(pdf_doc)): |
| page = pdf_doc[page_num] |
| |
| mat = fitz.Matrix(2.0, 2.0) |
| pix = page.get_pixmap(matrix=mat) |
| |
| |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| |
| |
| max_size = 1920 |
| w, h = img.size |
| if w > max_size or h > max_size: |
| if w > h: |
| new_w = max_size |
| new_h = int(h * (max_size / w)) |
| else: |
| new_h = max_size |
| new_w = int(w * (max_size / h)) |
| img = img.resize((new_w, new_h), Image.LANCZOS) |
| print(f"[INFO] Resized page {page_num + 1} from {w}x{h} to {new_w}x{new_h}") |
| else: |
| print(f"[INFO] Converted page {page_num + 1} to image ({w}x{h})") |
| |
| |
| img_bytes = BytesIO() |
| img.save(img_bytes, format="JPEG", quality=95) |
| images.append(img_bytes.getvalue()) |
| |
| pdf_doc.close() |
| return images |
|
|
|
|
| def _image_bytes_to_base64(image_bytes: bytes) -> str: |
| """Convert image bytes to base64 data URL (JPEG format).""" |
| b64 = base64.b64encode(image_bytes).decode("utf-8") |
| data_url = f"data:image/jpeg;base64,{b64}" |
| print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars") |
| return data_url |
|
|
|
|
| def _parse_markdown_table(text: str) -> Optional[Tuple[List[str], List[List[str]]]]: |
| """ |
| Parse a markdown table from text. |
| Returns (headers, rows) if table found, None otherwise. |
| Handles various table formats including malformed ones. |
| """ |
| lines = [line.strip() for line in text.split('\n')] |
| |
| |
| table_start = None |
| for i, line in enumerate(lines): |
| if '|' in line and line.count('|') >= 2: |
| |
| if re.match(r'^[\s\|\-:]+$', line): |
| continue |
| |
| cells = [cell.strip() for cell in line.split('|')] |
| if cells and not cells[0]: |
| cells = cells[1:] |
| if cells and not cells[-1]: |
| cells = cells[:-1] |
| |
| meaningful_cells = [c for c in cells if len(c) > 0] |
| if len(meaningful_cells) >= 2: |
| table_start = i |
| break |
| |
| if table_start is None: |
| return None |
| |
| |
| table_end = None |
| for i in range(table_start + 1, len(lines)): |
| line = lines[i] |
| if not line: |
| continue |
| if '|' not in line: |
| |
| table_end = i |
| break |
| |
| if table_end is None: |
| table_end = len(lines) |
| |
| table_lines = lines[table_start:table_end] |
| |
| |
| headers = None |
| header_idx = None |
| |
| for i, line in enumerate(table_lines): |
| if not line or '|' not in line: |
| continue |
| |
| |
| if re.match(r'^[\s\|\-:]+$', line): |
| continue |
| |
| |
| cells = [cell.strip() for cell in line.split('|')] |
| |
| if cells and not cells[0]: |
| cells = cells[1:] |
| if cells and not cells[-1]: |
| cells = cells[:-1] |
| |
| |
| if len(cells) >= 3: |
| |
| meaningful_cells = [c for c in cells if len(c) > 1] |
| if len(meaningful_cells) >= 3: |
| headers = cells |
| header_idx = i |
| break |
| |
| if not headers or header_idx is None: |
| return None |
| |
| |
| rows = [] |
| num_columns = len(headers) |
| |
| for i in range(header_idx + 1, len(table_lines)): |
| line = table_lines[i] |
| |
| if not line: |
| continue |
| |
| |
| if re.match(r'^[\s\|\-:]+$', line): |
| continue |
| |
| if '|' not in line: |
| |
| break |
| |
| cells = [cell.strip() for cell in line.split('|')] |
| |
| if cells and not cells[0]: |
| cells = cells[1:] |
| if cells and not cells[-1]: |
| cells = cells[:-1] |
| |
| |
| if len(cells) == num_columns or (len(cells) >= num_columns - 1 and len(cells) <= num_columns + 1): |
| |
| if len(cells) < num_columns: |
| cells.extend([''] * (num_columns - len(cells))) |
| elif len(cells) > num_columns: |
| cells = cells[:num_columns] |
| |
| |
| if any(cell for cell in cells): |
| rows.append(cells) |
| |
| if not rows: |
| return None |
| |
| return (headers, rows) |
|
|
|
|
| def _extract_metadata(text: str) -> Dict[str, str]: |
| """ |
| Extract metadata from document header text. |
| Looks for title, office, notice number, and description. |
| """ |
| metadata = { |
| "title": "", |
| "office": "", |
| "notice_no": "", |
| "description": "" |
| } |
| |
| lines = [line.strip() for line in text.split('\n') if line.strip()] |
| |
| |
| if lines: |
| metadata["office"] = lines[0] |
| |
| |
| notice_pattern = r'(?:पत्रक\s+)?सं[-\s:]*(\d+)' |
| for line in lines[:10]: |
| match = re.search(notice_pattern, line) |
| if match: |
| metadata["notice_no"] = match.group(1) |
| break |
| |
| |
| |
| quoted_title = re.search(r'["""]([^"""]+)["""]', text[:1000]) |
| if quoted_title: |
| metadata["title"] = quoted_title.group(1).strip() |
| else: |
| |
| title_keywords = ['सम्पत्ति', 'सूचना', 'विज्ञप्ति', 'नाम परिवर्तन'] |
| for line in lines[:5]: |
| if any(keyword in line for keyword in title_keywords): |
| |
| title_match = re.search(r'(सम्पत्ति[^।]*|सूचना[^।]*|विज्ञप्ति[^।]*)', line) |
| if title_match: |
| metadata["title"] = title_match.group(1).strip() |
| break |
| |
| |
| description_keywords = ['नाम परिवर्तन', 'अधिनियम', 'धारा', 'प्रकाशन', 'आवेदन'] |
| description_parts = [] |
| for i, line in enumerate(lines[:15]): |
| if any(keyword in line for keyword in description_keywords): |
| description_parts.append(line) |
| |
| if i > 0: |
| description_parts.insert(0, lines[i-1]) |
| if i < len(lines) - 1: |
| description_parts.append(lines[i+1]) |
| break |
| |
| if description_parts: |
| description = ' '.join(description_parts).strip() |
| if len(description) > 30: |
| |
| description = re.sub(r'\s+', ' ', description) |
| metadata["description"] = description[:300] |
| |
| return metadata |
|
|
|
|
| def _parse_model_response(response_text: str) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Parse model response to extract text and metadata. |
| The model may return text and metadata in various formats. |
| Returns: (extracted_text, metadata_dict) |
| """ |
| metadata = {} |
| text = response_text |
| |
| |
| |
| metadata_patterns = [ |
| r'METADATA:\s*\n?\s*({.*?})(?:\n\n|\nTEXT|$)', |
| r'metadata:\s*\n?\s*({.*?})(?:\n\n|\nTEXT|$)', |
| r'METADATA:\s*\n?\s*```json\s*({.*?})\s*```', |
| r'METADATA:\s*\n?\s*```\s*({.*?})\s*```', |
| ] |
| |
| for pattern in metadata_patterns: |
| match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE) |
| if match: |
| try: |
| metadata_json = match.group(1).strip() |
| metadata = json.loads(metadata_json) |
| |
| text = response_text[:match.start()] + response_text[match.end():] |
| break |
| except (json.JSONDecodeError, IndexError): |
| continue |
| |
| |
| if not metadata: |
| |
| metadata_section = re.search(r'METADATA:\s*\n(.*?)(?:\n\n|\nTEXT|$)', response_text, re.DOTALL | re.IGNORECASE) |
| if metadata_section: |
| metadata_text = metadata_section.group(1) |
| |
| for line in metadata_text.split('\n'): |
| if ':' in line: |
| parts = line.split(':', 1) |
| if len(parts) == 2: |
| key = parts[0].strip().lower().replace(' ', '_') |
| value = parts[1].strip() |
| if value: |
| metadata[key] = value |
| |
| |
| text_match = re.search(r'TEXT:\s*\n(.*?)(?:\n\nMETADATA|$)', response_text, re.DOTALL | re.IGNORECASE) |
| if text_match: |
| text = text_match.group(1).strip() |
| else: |
| |
| text = re.sub(r'METADATA:.*', '', response_text, flags=re.DOTALL | re.IGNORECASE).strip() |
| |
| |
| text = text.strip() |
| |
| |
| metadata = {k: v for k, v in metadata.items() if v and str(v).strip()} |
| |
| return text, metadata |
|
|
|
|
| def _extract_footer_notes(text: str) -> List[str]: |
| """ |
| Extract footer notes from document. |
| Usually appears after the table. |
| """ |
| notes = [] |
| |
| |
| lines = text.split('\n') |
| table_end_idx = len(lines) |
| |
| for i, line in enumerate(lines): |
| if '|' in line: |
| |
| j = i + 1 |
| while j < len(lines) and ('|' in lines[j] or re.match(r'^[\s\|\-:]+$', lines[j])): |
| j += 1 |
| table_end_idx = j |
| break |
| |
| |
| footer_lines = lines[table_end_idx:] |
| footer_text = '\n'.join(footer_lines).strip() |
| |
| |
| |
| sentences = re.split(r'[।\.!]\s+', footer_text) |
| |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if len(sentence) > 20: |
| |
| sentence = re.sub(r'\s+', ' ', sentence) |
| if sentence: |
| notes.append(sentence) |
| |
| |
| return notes[:5] |
|
|
|
|
| def _parse_text_with_tables(text: str, page_metadata: Dict[str, Any] = None) -> Dict[str, Any]: |
| """ |
| Parse text and extract structured data including tables. |
| Uses model-extracted metadata if provided, otherwise falls back to basic extraction. |
| Returns structured JSON format with metadata, table, and footer_notes. |
| """ |
| result = { |
| "text": text, |
| "metadata": page_metadata if page_metadata else {}, |
| "table": [], |
| "footer_notes": [] |
| } |
| |
| |
| table_data = _parse_markdown_table(text) |
| |
| if table_data: |
| headers, rows = table_data |
| print(f"[INFO] Found table with {len(headers)} columns and {len(rows)} rows") |
| |
| |
| if not result["metadata"]: |
| result["metadata"] = _extract_metadata(text) |
| |
| |
| |
| header_mapping = {} |
| header_counts = {} |
| |
| for i, header in enumerate(headers): |
| header_clean = header.strip() |
| |
| |
| |
| |
| header_key = header_clean |
| |
| |
| if header_key not in header_counts: |
| header_counts[header_key] = 0 |
| |
| header_counts[header_key] += 1 |
| |
| |
| if header_counts[header_key] > 1: |
| header_key = f"{header_key}_{header_counts[header_key]}" |
| |
| |
| |
| header_key = re.sub(r'[^\w\s\u0900-\u097F]', '', header_key) |
| header_key = re.sub(r'\s+', '_', header_key) |
| |
| |
| if not header_key: |
| header_key = f"column_{i+1}" |
| |
| header_mapping[i] = header_key |
| |
| |
| table_rows_dict = {} |
| for idx, row in enumerate(rows, start=1): |
| row_dict = {} |
| for i, header_idx in header_mapping.items(): |
| if i < len(row): |
| row_dict[header_idx] = row[i].strip() |
| |
| if row_dict: |
| |
| table_rows_dict[f"row_{idx}"] = row_dict |
| |
| |
| result["table"] = table_rows_dict |
| |
| |
| result["footer_notes"] = _extract_footer_notes(text) |
| else: |
| |
| result["metadata"] = _extract_metadata(text) |
| result["footer_notes"] = _extract_footer_notes(text) |
| |
| return result |
|
|
|
|
| async def _poll_runpod_job(job_id: str, client: httpx.AsyncClient, max_wait_time: int = 300) -> Dict[str, Any]: |
| """ |
| Poll RunPod job status until completion. |
| Returns the final job result with output. |
| """ |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {RUNPOD_API_KEY}" |
| } |
| |
| start_time = time.time() |
| poll_interval = 2 |
| |
| while True: |
| |
| elapsed = time.time() - start_time |
| if elapsed > max_wait_time: |
| raise RuntimeError(f"Job {job_id} timed out after {max_wait_time} seconds") |
| |
| |
| status_url = f"{RUNPOD_STATUS_ENDPOINT}/{job_id}" |
| response = await client.get(status_url, headers=headers) |
| response.raise_for_status() |
| status_result = response.json() |
| |
| status = status_result.get("status", "").upper() |
| |
| if status == "COMPLETED": |
| print(f"[INFO] Job {job_id} completed successfully") |
| return status_result |
| elif status == "FAILED": |
| error_msg = status_result.get("error", "Unknown error") |
| raise RuntimeError(f"Job {job_id} failed: {error_msg}") |
| elif status in ["IN_QUEUE", "IN_PROGRESS"]: |
| print(f"[INFO] Job {job_id} status: {status}, waiting...") |
| await asyncio.sleep(poll_interval) |
| else: |
| |
| print(f"[INFO] Job {job_id} status: {status}, waiting...") |
| await asyncio.sleep(poll_interval) |
|
|
|
|
| async def _extract_text_with_ocr(image_bytes: bytes, page_num: int, total_pages: int, custom_prompt: str = None) -> Dict[str, Any]: |
| """ |
| Extract text and metadata from a single page/image using the RunPod serverless OCR model. |
| Uses model-driven extraction to identify and extract metadata fields dynamically. |
| Returns text output in full_text field and extracted metadata. |
| |
| Args: |
| image_bytes: Image bytes to process |
| page_num: Page number |
| total_pages: Total number of pages |
| custom_prompt: Optional custom prompt for field extraction |
| """ |
| |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
| |
| print(f"[INFO] OCR: Processing page {page_num}/{total_pages} with RunPod endpoint") |
| |
| try: |
| |
| if custom_prompt: |
| metadata_prompt = custom_prompt |
| else: |
| |
| metadata_prompt = """Extract all text from this image.""" |
| |
| |
| |
| payload = { |
| "input": { |
| "prompt": metadata_prompt, |
| "image_base64": image_base64 |
| } |
| } |
| |
| |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {RUNPOD_API_KEY}" |
| } |
| |
| async with httpx.AsyncClient(timeout=300.0) as client: |
| |
| response = await client.post( |
| RUNPOD_ENDPOINT, |
| headers=headers, |
| json=payload |
| ) |
| response.raise_for_status() |
| result = response.json() |
| |
| |
| job_id = result.get("id") |
| status = result.get("status", "").upper() |
| |
| if job_id and status in ["IN_QUEUE", "IN_PROGRESS"]: |
| |
| print(f"[INFO] Job submitted with ID: {job_id}, status: {status}") |
| if not RUNPOD_STATUS_ENDPOINT: |
| raise RuntimeError("RunPod status endpoint not configured. Cannot poll async job.") |
| |
| |
| result = await _poll_runpod_job(job_id, client) |
| |
| |
| |
| |
| extracted_text = "" |
| |
| if "output" in result: |
| output = result["output"] |
| if isinstance(output, str): |
| extracted_text = output |
| elif isinstance(output, dict): |
| |
| extracted_text = output.get("text", output.get("result", output.get("content", ""))) |
| if not extracted_text and isinstance(output.get("text"), str): |
| extracted_text = output["text"] |
| elif isinstance(output, list) and len(output) > 0: |
| |
| extracted_text = str(output[0]) |
| elif "result" in result: |
| extracted_text = str(result["result"]) |
| elif "text" in result: |
| extracted_text = str(result["text"]) |
| else: |
| |
| extracted_text = str(result) |
| |
| if not extracted_text: |
| extracted_text = "" |
| |
| print(f"[INFO] OCR: Extracted {len(extracted_text)} characters from page {page_num}") |
| |
| |
| parsed_text, parsed_metadata = _parse_model_response(extracted_text) |
| |
| |
| |
| mock_response = type('obj', (object,), { |
| 'choices': [type('obj', (object,), {'finish_reason': 'stop'})()], |
| 'usage': type('obj', (object,), {'completion_tokens': len(parsed_text.split())})() |
| })() |
| confidence = _calculate_ocr_confidence(mock_response, parsed_text) |
| |
| |
| doc_type = parsed_metadata.get("document_type", "other") |
| if doc_type == "other" and parsed_metadata.get("title"): |
| |
| title_lower = parsed_metadata.get("title", "").lower() |
| if any(kw in title_lower for kw in ["tender", "bid", "quotation"]): |
| doc_type = "tender" |
| elif any(kw in title_lower for kw in ["recruitment", "appointment", "vacancy"]): |
| doc_type = "recruitment" |
| elif any(kw in title_lower for kw in ["notice", "notification", "circular"]): |
| doc_type = "notice" |
| |
| |
| return { |
| "doc_type": doc_type, |
| "confidence": confidence, |
| "full_text": parsed_text, |
| "fields": parsed_metadata if parsed_metadata else {} |
| } |
| |
| except httpx.HTTPStatusError as e: |
| error_msg = f"HTTP {e.response.status_code}: {e.response.text}" |
| print(f"[ERROR] OCR API HTTP error for page {page_num}: {error_msg}") |
| raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}") |
| except Exception as e: |
| error_msg = str(e) |
| print(f"[ERROR] OCR API error for page {page_num}: {error_msg}") |
| raise RuntimeError(f"OCR API error for page {page_num}: {error_msg}") |
|
|
|
|
| def _calculate_ocr_confidence(response, extracted_text: str) -> float: |
| """ |
| Calculate confidence score based on OCR response quality. |
| Returns a score from 0-100, with higher scores for better extraction quality. |
| """ |
| |
| base_confidence = 92.0 |
| |
| |
| text_length = len(extracted_text.strip()) |
| |
| if text_length == 0: |
| return 0.0 |
| elif text_length < 10: |
| |
| return max(30.0, base_confidence - 40.0) |
| elif text_length < 50: |
| |
| return max(60.0, base_confidence - 20.0) |
| elif text_length > 1000: |
| |
| confidence = min(100.0, base_confidence + 5.0) |
| elif text_length > 500: |
| |
| confidence = min(100.0, base_confidence + 3.0) |
| else: |
| confidence = base_confidence |
| |
| |
| if '|' in extracted_text and extracted_text.count('|') > 5: |
| |
| confidence = min(100.0, confidence + 6.0) |
| |
| |
| non_whitespace = len([c for c in extracted_text if not c.isspace()]) |
| if text_length > 0: |
| content_ratio = non_whitespace / text_length |
| if content_ratio > 0.85: |
| |
| confidence = min(100.0, confidence + 5.0) |
| elif content_ratio > 0.75: |
| |
| confidence = min(100.0, confidence + 3.0) |
| elif content_ratio > 0.6: |
| |
| confidence = min(100.0, confidence + 1.0) |
| elif content_ratio < 0.3: |
| |
| confidence = max(60.0, confidence - 15.0) |
| |
| |
| |
| has_numbers = any(c.isdigit() for c in extracted_text) |
| has_letters = any(c.isalpha() for c in extracted_text) |
| has_punctuation = any(c in '.,;:!?()[]{}' for c in extracted_text) |
| |
| if has_numbers and has_letters and has_punctuation: |
| |
| confidence = min(100.0, confidence + 2.0) |
| |
| |
| return round(min(100.0, max(0.0, confidence)), 1) |
|
|
|
|
| async def extract_fields_from_document( |
| file_bytes: bytes, |
| content_type: str, |
| filename: str, |
| key_fields: str = None, |
| ) -> Dict[str, Any]: |
| """ |
| Extract text from document using OCR model. |
| Processes pages separately for better reliability. |
| Returns text output in full_text, keeps JSON/XML fields empty for now. |
| """ |
| |
| if content_type == "application/pdf" or content_type.endswith("/pdf"): |
| if not PDF_SUPPORT: |
| raise RuntimeError("PDF support requires PyMuPDF. Please install it.") |
| |
| pdf_images = _pdf_to_images(file_bytes) |
| image_bytes_list = pdf_images |
| else: |
| |
| |
| try: |
| img = Image.open(BytesIO(file_bytes)) |
| if img.mode != "RGB": |
| img = img.convert("RGB") |
| |
| |
| max_size = 1920 |
| w, h = img.size |
| if w > max_size or h > max_size: |
| if w > h: |
| new_w = max_size |
| new_h = int(h * (max_size / w)) |
| else: |
| new_h = max_size |
| new_w = int(w * (max_size / h)) |
| img = img.resize((new_w, new_h), Image.LANCZOS) |
| print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}") |
| |
| |
| img_bytes = BytesIO() |
| img.save(img_bytes, format="JPEG", quality=95) |
| image_bytes_list = [img_bytes.getvalue()] |
| except Exception as e: |
| |
| print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.") |
| image_bytes_list = [file_bytes] |
|
|
| total_pages = len(image_bytes_list) |
| print(f"[INFO] Processing {total_pages} page(s) with OCR model...") |
|
|
| |
| page_results = [] |
| for page_num, img_bytes in enumerate(image_bytes_list): |
| print(f"[INFO] Processing page {page_num + 1}/{total_pages}...") |
| try: |
| page_result = await _extract_text_with_ocr(img_bytes, page_num + 1, total_pages, None) |
| page_results.append({ |
| "page_number": page_num + 1, |
| "text": page_result.get("full_text", ""), |
| "fields": page_result.get("fields", {}), |
| "confidence": page_result.get("confidence", 0), |
| "doc_type": page_result.get("doc_type", "other"), |
| }) |
| print(f"[INFO] Page {page_num + 1} processed successfully") |
| except Exception as e: |
| print(f"[ERROR] Failed to process page {page_num + 1}: {e}") |
| page_results.append({ |
| "page_number": page_num + 1, |
| "text": "", |
| "fields": {}, |
| "confidence": 0, |
| "error": str(e) |
| }) |
|
|
| |
| combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")]) |
| |
| |
| extracted_fields = {} |
| if key_fields and key_fields.strip(): |
| |
| field_list = [f.strip() for f in key_fields.split(',') if f.strip()] |
| if field_list: |
| print(f"[INFO] Extracting user-specified fields: {field_list}") |
| |
| |
| fields_json = json.dumps(field_list) |
| custom_prompt = f"Extract the following fields from this image and return as JSON: {fields_json}. Return only a valid JSON object with the field names as keys and their extracted values." |
| |
| |
| if image_bytes_list and len(image_bytes_list) > 0: |
| try: |
| print("[INFO] Running second OCR pass for field extraction...") |
| field_result = await _extract_text_with_ocr(image_bytes_list[0], 1, 1, custom_prompt) |
| field_text = field_result.get("full_text", "") |
| |
| |
| try: |
| |
| json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', field_text, re.DOTALL) |
| if json_match: |
| extracted_fields = json.loads(json_match.group(0)) |
| print(f"[INFO] Successfully extracted {len(extracted_fields)} fields from second OCR pass") |
| else: |
| |
| extracted_fields = json.loads(field_text) |
| print(f"[INFO] Successfully extracted {len(extracted_fields)} fields from second OCR pass") |
| except json.JSONDecodeError: |
| print(f"[WARNING] Could not parse JSON from field extraction response: {field_text[:200]}") |
| extracted_fields = {} |
| except Exception as e: |
| print(f"[WARNING] Field extraction failed: {e}") |
| extracted_fields = {} |
| |
| |
| structured_pages = {} |
| for page_result in page_results: |
| if page_result.get("text"): |
| page_num = page_result.get("page_number", 1) |
| page_text = page_result.get("text", "") |
| |
| |
| parsed_data = _parse_text_with_tables(page_text, {}) |
| |
| |
| page_key = f"page_{page_num}" |
| structured_pages[page_key] = { |
| "text": parsed_data["text"], |
| "table": parsed_data["table"], |
| "footer_notes": parsed_data["footer_notes"], |
| "confidence": page_result.get("confidence", 0), |
| "doc_type": page_result.get("doc_type", "other") |
| } |
| |
| |
| if structured_pages: |
| |
| combined_fields = structured_pages |
| else: |
| combined_fields = {} |
| |
| |
| confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0] |
| avg_confidence = sum(confidences) / len(confidences) if confidences else 0 |
|
|
| |
| doc_type = "other" |
| for page_result in page_results: |
| if page_result.get("doc_type") and page_result["doc_type"] != "other": |
| doc_type = page_result["doc_type"] |
| break |
|
|
| |
| return_obj = { |
| "doc_type": doc_type, |
| "confidence": avg_confidence, |
| "full_text": combined_full_text, |
| "fields": combined_fields, |
| "pages": page_results |
| } |
| |
| |
| if extracted_fields: |
| return_obj["Fields"] = extracted_fields |
| |
| return return_obj |
|
|