""" PDF Processing Module - Layer 1: PDF Understanding Handles multimodal extraction: text, images, tables """ import PyPDF2 import fitz # PyMuPDF from pdf2image import convert_from_path from PIL import Image import pytesseract import logging from typing import Dict, List, Any, Optional import io import numpy as np logger = logging.getLogger(__name__) class PDFProcessor: """ Comprehensive PDF processing for medical documents Implements hybrid extraction: native text + OCR fallback """ def __init__(self): self.supported_formats = ['.pdf'] logger.info("PDF Processor initialized") async def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract multimodal content from PDF Returns: Dict with: - text: extracted text content - images: list of extracted images - tables: detected tabular content - metadata: document metadata - page_count: number of pages """ try: logger.info(f"Starting PDF extraction: {file_path}") # Initialize result structure result = { "text": "", "images": [], "tables": [], "metadata": {}, "page_count": 0, "extraction_method": "hybrid" } # Open PDF with PyMuPDF for robust extraction doc = fitz.open(file_path) result["page_count"] = len(doc) result["metadata"] = self._extract_metadata(doc) all_text = [] all_images = [] # Process each page for page_num in range(len(doc)): page = doc[page_num] # Extract text page_text = page.get_text() # If native text extraction fails, use OCR if not page_text.strip(): logger.info(f"Page {page_num + 1}: Using OCR (no native text)") page_text = await self._ocr_page(file_path, page_num) result["extraction_method"] = "hybrid_with_ocr" all_text.append(page_text) # Extract images from page page_images = self._extract_images_from_page(page, page_num) all_images.extend(page_images) # Detect tables (simplified detection) tables = self._detect_tables(page_text) result["tables"].extend(tables) result["text"] = "\n\n".join(all_text) result["images"] = all_images # Extract structured sections result["sections"] = self._extract_sections(result["text"]) doc.close() logger.info(f"PDF extraction complete: {result['page_count']} pages, " f"{len(result['images'])} images, {len(result['tables'])} tables") return result except Exception as e: logger.error(f"PDF extraction failed: {str(e)}") raise def _extract_metadata(self, doc: fitz.Document) -> Dict[str, Any]: """Extract PDF metadata""" metadata = {} try: pdf_metadata = doc.metadata metadata = { "title": pdf_metadata.get("title", ""), "author": pdf_metadata.get("author", ""), "subject": pdf_metadata.get("subject", ""), "creator": pdf_metadata.get("creator", ""), "producer": pdf_metadata.get("producer", ""), "creation_date": pdf_metadata.get("creationDate", ""), "modification_date": pdf_metadata.get("modDate", "") } except Exception as e: logger.warning(f"Metadata extraction failed: {str(e)}") return metadata async def _ocr_page(self, file_path: str, page_num: int) -> str: """Perform OCR on a single page""" try: # Convert PDF page to image images = convert_from_path( file_path, first_page=page_num + 1, last_page=page_num + 1, dpi=300 ) if images: # Perform OCR text = pytesseract.image_to_string(images[0]) return text return "" except Exception as e: logger.warning(f"OCR failed for page {page_num + 1}: {str(e)}") return "" def _extract_images_from_page(self, page: fitz.Page, page_num: int) -> List[Dict[str, Any]]: """Extract images from a PDF page""" images = [] try: image_list = page.get_images(full=True) for img_index, img_info in enumerate(image_list): images.append({ "page": page_num + 1, "index": img_index, "xref": img_info[0], "width": img_info[2], "height": img_info[3] }) except Exception as e: logger.warning(f"Image extraction failed for page {page_num + 1}: {str(e)}") return images def _detect_tables(self, text: str) -> List[Dict[str, Any]]: """ Detect tabular content in text Simplified heuristic-based detection """ tables = [] # Look for common table patterns lines = text.split('\n') potential_table = [] in_table = False for line in lines: # Simple heuristic: lines with multiple tabs or pipes if '\t' in line or '|' in line or line.count(' ') > 3: potential_table.append(line) in_table = True elif in_table and potential_table: # End of table if len(potential_table) >= 2: # At least header + 1 row tables.append({ "rows": potential_table, "row_count": len(potential_table) }) potential_table = [] in_table = False return tables def _extract_sections(self, text: str) -> Dict[str, str]: """ Extract common medical report sections """ sections = {} # Common section headers in medical reports section_headers = [ "HISTORY", "PHYSICAL EXAMINATION", "ASSESSMENT", "PLAN", "CHIEF COMPLAINT", "DIAGNOSIS", "FINDINGS", "IMPRESSION", "RECOMMENDATIONS", "LAB RESULTS", "MEDICATIONS", "ALLERGIES", "VITAL SIGNS", "PAST MEDICAL HISTORY", "FAMILY HISTORY", "SOCIAL HISTORY", "REVIEW OF SYSTEMS" ] lines = text.split('\n') current_section = "GENERAL" current_content = [] for line in lines: line_upper = line.strip().upper() # Check if line is a section header is_header = False for header in section_headers: if header in line_upper and len(line.strip()) < 50: # Save previous section if current_content: sections[current_section] = '\n'.join(current_content) current_section = header current_content = [] is_header = True break if not is_header: current_content.append(line) # Save last section if current_content: sections[current_section] = '\n'.join(current_content) return sections