| """OCR extraction for scanned / image-based PDF pages.
|
|
|
| Uses pytesseract with OpenCV preprocessing. Falls back gracefully
|
| if Tesseract is not installed.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import logging
|
| import re
|
| from pathlib import Path
|
|
|
| import fitz
|
| from PIL import Image
|
|
|
| from app.schemas.extraction import (
|
| BlockType,
|
| ContentBlock,
|
| HeadingLevel,
|
| ListItem,
|
| PageResult,
|
| TableBlock,
|
| TableCell,
|
| )
|
| from app.services.preprocessing import (
|
| is_mostly_blank,
|
| preprocess_for_ocr,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
| _TESSERACT_AVAILABLE = False
|
| try:
|
| import pytesseract
|
|
|
|
|
| pytesseract.get_tesseract_version()
|
| _TESSERACT_AVAILABLE = True
|
| except Exception:
|
| logger.warning("pytesseract / Tesseract not available; OCR will be disabled")
|
|
|
|
|
| def tesseract_available() -> bool:
|
| return _TESSERACT_AVAILABLE
|
|
|
|
|
| def configure_tesseract(cmd: str) -> None:
|
| """Override the tesseract binary path at runtime."""
|
| global _TESSERACT_AVAILABLE
|
| if cmd:
|
| import pytesseract as _pt
|
| _pt.pytesseract.tesseract_cmd = cmd
|
| try:
|
| _pt.get_tesseract_version()
|
| _TESSERACT_AVAILABLE = True
|
| except Exception:
|
| _TESSERACT_AVAILABLE = False
|
|
|
|
|
|
|
|
|
|
|
| def rasterize_page(pdf_path: str | Path, page_num: int, dpi: int = 300) -> Image.Image:
|
| """Render a PDF page to a PIL Image at the given DPI."""
|
| with fitz.open(str(pdf_path)) as doc:
|
| page = doc[page_num]
|
| zoom = dpi / 72.0
|
| mat = fitz.Matrix(zoom, zoom)
|
| pix = page.get_pixmap(matrix=mat, alpha=False)
|
| return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
|
|
|
|
|
|
|
|
|
| _HEADING_PATTERN = re.compile(
|
| r"^(?:chapter|section|part|article)\s+\d+",
|
| re.IGNORECASE,
|
| )
|
| _LIST_PATTERN = re.compile(r"^\s*(?:[•\-–—○■□►▸●]|\d+[.)]\s|[a-z][.)]\s)", re.IGNORECASE)
|
|
|
|
|
| def _classify_ocr_lines(lines: list[str]) -> list[ContentBlock]:
|
| """Heuristic classification of OCR text lines into content blocks."""
|
| blocks: list[ContentBlock] = []
|
| para_lines: list[str] = []
|
|
|
| def flush():
|
| if para_lines:
|
| text = " ".join(para_lines).strip()
|
| if text:
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.PARAGRAPH,
|
| text=text,
|
| source="ocr",
|
| ))
|
| para_lines.clear()
|
|
|
| for line in lines:
|
| stripped = line.strip()
|
| if not stripped:
|
| flush()
|
| continue
|
|
|
|
|
| if (
|
| (len(stripped) < 80 and stripped == stripped.upper() and len(stripped) > 3)
|
| or _HEADING_PATTERN.match(stripped)
|
| ):
|
| flush()
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.HEADING,
|
| text=stripped,
|
| heading_level=HeadingLevel.H2,
|
| source="ocr",
|
| ))
|
| continue
|
|
|
| if _LIST_PATTERN.match(stripped):
|
| flush()
|
|
|
| clean = re.sub(r"^\s*[•\-–—○■□►▸●]\s*", "", stripped)
|
| clean = re.sub(r"^\s*\d+[.)]\s*", "", clean) or stripped
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.LIST,
|
| list_items=[ListItem(text=clean)],
|
| source="ocr",
|
| ))
|
| continue
|
|
|
| para_lines.append(stripped)
|
|
|
| flush()
|
| return blocks
|
|
|
|
|
| def ocr_page(
|
| pdf_path: str | Path,
|
| page_num: int,
|
| dpi: int = 300,
|
| lang: str = "eng",
|
| ) -> PageResult:
|
| """Run OCR on a single PDF page and return structured blocks."""
|
| if not _TESSERACT_AVAILABLE:
|
| return PageResult(
|
| page_number=page_num + 1,
|
| blocks=[ContentBlock(
|
| block_type=BlockType.PARAGRAPH,
|
| text="[OCR unavailable — Tesseract not installed]",
|
| source="ocr",
|
| confidence=0.0,
|
| )],
|
| plain_text="[OCR unavailable]",
|
| is_scanned=True,
|
| ocr_confidence=0.0,
|
| )
|
|
|
| import pytesseract
|
|
|
| image = rasterize_page(pdf_path, page_num, dpi)
|
|
|
| if is_mostly_blank(image):
|
| return PageResult(
|
| page_number=page_num + 1,
|
| is_scanned=True,
|
| ocr_confidence=1.0,
|
| plain_text="",
|
| blocks=[],
|
| )
|
|
|
|
|
| processed = preprocess_for_ocr(image)
|
|
|
|
|
| ocr_data = pytesseract.image_to_data(
|
| processed, lang=lang, output_type=pytesseract.Output.DICT,
|
| )
|
|
|
|
|
| confs = [c for c in ocr_data.get("conf", []) if isinstance(c, (int, float)) and c >= 0]
|
| avg_conf = sum(confs) / len(confs) / 100.0 if confs else 0.0
|
|
|
|
|
| plain_text = pytesseract.image_to_string(processed, lang=lang).strip()
|
| lines = plain_text.split("\n")
|
| blocks = _classify_ocr_lines(lines)
|
|
|
|
|
| for b in blocks:
|
| b.confidence = avg_conf
|
|
|
|
|
| table_blocks = _detect_tables_from_ocr(ocr_data)
|
| blocks.extend(table_blocks)
|
|
|
| with fitz.open(str(pdf_path)) as doc:
|
| rect = doc[page_num].rect
|
|
|
| return PageResult(
|
| page_number=page_num + 1,
|
| width=rect.width,
|
| height=rect.height,
|
| blocks=blocks,
|
| plain_text=plain_text,
|
| is_scanned=True,
|
| ocr_confidence=round(avg_conf, 3),
|
| )
|
|
|
|
|
| def _detect_tables_from_ocr(ocr_data: dict) -> list[ContentBlock]:
|
| """Basic table detection from OCR bounding-box alignment.
|
|
|
| Groups words by similar y-coordinates (rows) and x-gaps (columns).
|
| This is a heuristic — it won't catch every table but gives a
|
| reasonable first pass.
|
| """
|
| blocks: list[ContentBlock] = []
|
|
|
| tops = ocr_data.get("top", [])
|
| lefts = ocr_data.get("left", [])
|
| widths = ocr_data.get("width", [])
|
| heights = ocr_data.get("height", [])
|
| texts = ocr_data.get("text", [])
|
|
|
| if not tops or len(tops) < 4:
|
| return blocks
|
|
|
|
|
| block_nums = ocr_data.get("block_num", [])
|
| line_nums = ocr_data.get("line_num", [])
|
| word_nums = ocr_data.get("word_num", [])
|
|
|
|
|
| line_groups: dict[tuple[int, int], list[dict]] = {}
|
| for i in range(len(texts)):
|
| txt = (texts[i] or "").strip()
|
| if not txt:
|
| continue
|
| key = (block_nums[i], line_nums[i])
|
| line_groups.setdefault(key, []).append({
|
| "text": txt,
|
| "left": lefts[i],
|
| "top": tops[i],
|
| "width": widths[i],
|
| "height": heights[i],
|
| })
|
|
|
|
|
|
|
|
|
|
|
| return blocks
|
|
|