|
|
""" |
|
|
PDF Processing Module - Layer 1: PDF Understanding |
|
|
Handles multimodal extraction: text, images, tables |
|
|
""" |
|
|
|
|
|
import PyPDF2 |
|
|
import fitz |
|
|
from pdf2image import convert_from_path |
|
|
from PIL import Image |
|
|
import pytesseract |
|
|
import logging |
|
|
from typing import Dict, List, Any, Optional |
|
|
import io |
|
|
import numpy as np |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PDFProcessor: |
|
|
""" |
|
|
Comprehensive PDF processing for medical documents |
|
|
Implements hybrid extraction: native text + OCR fallback |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.supported_formats = ['.pdf'] |
|
|
logger.info("PDF Processor initialized") |
|
|
|
|
|
async def extract_content(self, file_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Extract multimodal content from PDF |
|
|
|
|
|
Returns: |
|
|
Dict with: |
|
|
- text: extracted text content |
|
|
- images: list of extracted images |
|
|
- tables: detected tabular content |
|
|
- metadata: document metadata |
|
|
- page_count: number of pages |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Starting PDF extraction: {file_path}") |
|
|
|
|
|
|
|
|
result = { |
|
|
"text": "", |
|
|
"images": [], |
|
|
"tables": [], |
|
|
"metadata": {}, |
|
|
"page_count": 0, |
|
|
"extraction_method": "hybrid" |
|
|
} |
|
|
|
|
|
|
|
|
doc = fitz.open(file_path) |
|
|
result["page_count"] = len(doc) |
|
|
result["metadata"] = self._extract_metadata(doc) |
|
|
|
|
|
all_text = [] |
|
|
all_images = [] |
|
|
|
|
|
|
|
|
for page_num in range(len(doc)): |
|
|
page = doc[page_num] |
|
|
|
|
|
|
|
|
page_text = page.get_text() |
|
|
|
|
|
|
|
|
if not page_text.strip(): |
|
|
logger.info(f"Page {page_num + 1}: Using OCR (no native text)") |
|
|
page_text = await self._ocr_page(file_path, page_num) |
|
|
result["extraction_method"] = "hybrid_with_ocr" |
|
|
|
|
|
all_text.append(page_text) |
|
|
|
|
|
|
|
|
page_images = self._extract_images_from_page(page, page_num) |
|
|
all_images.extend(page_images) |
|
|
|
|
|
|
|
|
tables = self._detect_tables(page_text) |
|
|
result["tables"].extend(tables) |
|
|
|
|
|
result["text"] = "\n\n".join(all_text) |
|
|
result["images"] = all_images |
|
|
|
|
|
|
|
|
result["sections"] = self._extract_sections(result["text"]) |
|
|
|
|
|
doc.close() |
|
|
|
|
|
logger.info(f"PDF extraction complete: {result['page_count']} pages, " |
|
|
f"{len(result['images'])} images, {len(result['tables'])} tables") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"PDF extraction failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _extract_metadata(self, doc: fitz.Document) -> Dict[str, Any]: |
|
|
"""Extract PDF metadata""" |
|
|
metadata = {} |
|
|
try: |
|
|
pdf_metadata = doc.metadata |
|
|
metadata = { |
|
|
"title": pdf_metadata.get("title", ""), |
|
|
"author": pdf_metadata.get("author", ""), |
|
|
"subject": pdf_metadata.get("subject", ""), |
|
|
"creator": pdf_metadata.get("creator", ""), |
|
|
"producer": pdf_metadata.get("producer", ""), |
|
|
"creation_date": pdf_metadata.get("creationDate", ""), |
|
|
"modification_date": pdf_metadata.get("modDate", "") |
|
|
} |
|
|
except Exception as e: |
|
|
logger.warning(f"Metadata extraction failed: {str(e)}") |
|
|
|
|
|
return metadata |
|
|
|
|
|
async def _ocr_page(self, file_path: str, page_num: int) -> str: |
|
|
"""Perform OCR on a single page""" |
|
|
try: |
|
|
|
|
|
images = convert_from_path( |
|
|
file_path, |
|
|
first_page=page_num + 1, |
|
|
last_page=page_num + 1, |
|
|
dpi=300 |
|
|
) |
|
|
|
|
|
if images: |
|
|
|
|
|
text = pytesseract.image_to_string(images[0]) |
|
|
return text |
|
|
|
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"OCR failed for page {page_num + 1}: {str(e)}") |
|
|
return "" |
|
|
|
|
|
def _extract_images_from_page(self, page: fitz.Page, page_num: int) -> List[Dict[str, Any]]: |
|
|
"""Extract images from a PDF page""" |
|
|
images = [] |
|
|
try: |
|
|
image_list = page.get_images(full=True) |
|
|
|
|
|
for img_index, img_info in enumerate(image_list): |
|
|
images.append({ |
|
|
"page": page_num + 1, |
|
|
"index": img_index, |
|
|
"xref": img_info[0], |
|
|
"width": img_info[2], |
|
|
"height": img_info[3] |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Image extraction failed for page {page_num + 1}: {str(e)}") |
|
|
|
|
|
return images |
|
|
|
|
|
def _detect_tables(self, text: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Detect tabular content in text |
|
|
Simplified heuristic-based detection |
|
|
""" |
|
|
tables = [] |
|
|
|
|
|
|
|
|
lines = text.split('\n') |
|
|
potential_table = [] |
|
|
in_table = False |
|
|
|
|
|
for line in lines: |
|
|
|
|
|
if '\t' in line or '|' in line or line.count(' ') > 3: |
|
|
potential_table.append(line) |
|
|
in_table = True |
|
|
elif in_table and potential_table: |
|
|
|
|
|
if len(potential_table) >= 2: |
|
|
tables.append({ |
|
|
"rows": potential_table, |
|
|
"row_count": len(potential_table) |
|
|
}) |
|
|
potential_table = [] |
|
|
in_table = False |
|
|
|
|
|
return tables |
|
|
|
|
|
def _extract_sections(self, text: str) -> Dict[str, str]: |
|
|
""" |
|
|
Extract common medical report sections |
|
|
""" |
|
|
sections = {} |
|
|
|
|
|
|
|
|
section_headers = [ |
|
|
"HISTORY", "PHYSICAL EXAMINATION", "ASSESSMENT", "PLAN", |
|
|
"CHIEF COMPLAINT", "DIAGNOSIS", "FINDINGS", "IMPRESSION", |
|
|
"RECOMMENDATIONS", "LAB RESULTS", "MEDICATIONS", "ALLERGIES", |
|
|
"VITAL SIGNS", "PAST MEDICAL HISTORY", "FAMILY HISTORY", |
|
|
"SOCIAL HISTORY", "REVIEW OF SYSTEMS" |
|
|
] |
|
|
|
|
|
lines = text.split('\n') |
|
|
current_section = "GENERAL" |
|
|
current_content = [] |
|
|
|
|
|
for line in lines: |
|
|
line_upper = line.strip().upper() |
|
|
|
|
|
|
|
|
is_header = False |
|
|
for header in section_headers: |
|
|
if header in line_upper and len(line.strip()) < 50: |
|
|
|
|
|
if current_content: |
|
|
sections[current_section] = '\n'.join(current_content) |
|
|
|
|
|
current_section = header |
|
|
current_content = [] |
|
|
is_header = True |
|
|
break |
|
|
|
|
|
if not is_header: |
|
|
current_content.append(line) |
|
|
|
|
|
|
|
|
if current_content: |
|
|
sections[current_section] = '\n'.join(current_content) |
|
|
|
|
|
return sections |
|
|
|