handbook-ocr-engine / app /services /ocr_extractor.py
internationalscholarsprogram's picture
Initial deploy: ISP Handbook OCR Engine
b12284c verified
"""OCR extraction for scanned / image-based PDF pages.
Uses pytesseract with OpenCV preprocessing. Falls back gracefully
if Tesseract is not installed.
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
import fitz # PyMuPDF — to rasterize pages
from PIL import Image
from app.schemas.extraction import (
BlockType,
ContentBlock,
HeadingLevel,
ListItem,
PageResult,
TableBlock,
TableCell,
)
from app.services.preprocessing import (
is_mostly_blank,
preprocess_for_ocr,
)
logger = logging.getLogger(__name__)
# ── Tesseract availability ──
_TESSERACT_AVAILABLE = False
try:
import pytesseract
# Quick sanity check
pytesseract.get_tesseract_version()
_TESSERACT_AVAILABLE = True
except Exception:
logger.warning("pytesseract / Tesseract not available; OCR will be disabled")
def tesseract_available() -> bool:
return _TESSERACT_AVAILABLE
def configure_tesseract(cmd: str) -> None:
"""Override the tesseract binary path at runtime."""
global _TESSERACT_AVAILABLE
if cmd:
import pytesseract as _pt
_pt.pytesseract.tesseract_cmd = cmd
try:
_pt.get_tesseract_version()
_TESSERACT_AVAILABLE = True
except Exception:
_TESSERACT_AVAILABLE = False
# ── Rasterize a PDF page to PIL Image ──
def rasterize_page(pdf_path: str | Path, page_num: int, dpi: int = 300) -> Image.Image:
"""Render a PDF page to a PIL Image at the given DPI."""
with fitz.open(str(pdf_path)) as doc:
page = doc[page_num]
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
# ── OCR a single page ──
_HEADING_PATTERN = re.compile(
r"^(?:chapter|section|part|article)\s+\d+",
re.IGNORECASE,
)
_LIST_PATTERN = re.compile(r"^\s*(?:[•\-–—○■□►▸●]|\d+[.)]\s|[a-z][.)]\s)", re.IGNORECASE)
def _classify_ocr_lines(lines: list[str]) -> list[ContentBlock]:
"""Heuristic classification of OCR text lines into content blocks."""
blocks: list[ContentBlock] = []
para_lines: list[str] = []
def flush():
if para_lines:
text = " ".join(para_lines).strip()
if text:
blocks.append(ContentBlock(
block_type=BlockType.PARAGRAPH,
text=text,
source="ocr",
))
para_lines.clear()
for line in lines:
stripped = line.strip()
if not stripped:
flush()
continue
# All-caps short line or "Chapter N" pattern → heading
if (
(len(stripped) < 80 and stripped == stripped.upper() and len(stripped) > 3)
or _HEADING_PATTERN.match(stripped)
):
flush()
blocks.append(ContentBlock(
block_type=BlockType.HEADING,
text=stripped,
heading_level=HeadingLevel.H2,
source="ocr",
))
continue
if _LIST_PATTERN.match(stripped):
flush()
# Strip bullet character
clean = re.sub(r"^\s*[•\-–—○■□►▸●]\s*", "", stripped)
clean = re.sub(r"^\s*\d+[.)]\s*", "", clean) or stripped
blocks.append(ContentBlock(
block_type=BlockType.LIST,
list_items=[ListItem(text=clean)],
source="ocr",
))
continue
para_lines.append(stripped)
flush()
return blocks
def ocr_page(
pdf_path: str | Path,
page_num: int,
dpi: int = 300,
lang: str = "eng",
) -> PageResult:
"""Run OCR on a single PDF page and return structured blocks."""
if not _TESSERACT_AVAILABLE:
return PageResult(
page_number=page_num + 1,
blocks=[ContentBlock(
block_type=BlockType.PARAGRAPH,
text="[OCR unavailable — Tesseract not installed]",
source="ocr",
confidence=0.0,
)],
plain_text="[OCR unavailable]",
is_scanned=True,
ocr_confidence=0.0,
)
import pytesseract
image = rasterize_page(pdf_path, page_num, dpi)
if is_mostly_blank(image):
return PageResult(
page_number=page_num + 1,
is_scanned=True,
ocr_confidence=1.0,
plain_text="",
blocks=[],
)
# Preprocess for OCR
processed = preprocess_for_ocr(image)
# Run Tesseract with full data for confidence scores
ocr_data = pytesseract.image_to_data(
processed, lang=lang, output_type=pytesseract.Output.DICT,
)
# Compute average confidence (skip -1 entries)
confs = [c for c in ocr_data.get("conf", []) if isinstance(c, (int, float)) and c >= 0]
avg_conf = sum(confs) / len(confs) / 100.0 if confs else 0.0
# Also get plain text
plain_text = pytesseract.image_to_string(processed, lang=lang).strip()
lines = plain_text.split("\n")
blocks = _classify_ocr_lines(lines)
# Set confidence on all blocks
for b in blocks:
b.confidence = avg_conf
# Attempt table detection via Tesseract TSV data
table_blocks = _detect_tables_from_ocr(ocr_data)
blocks.extend(table_blocks)
with fitz.open(str(pdf_path)) as doc:
rect = doc[page_num].rect
return PageResult(
page_number=page_num + 1,
width=rect.width,
height=rect.height,
blocks=blocks,
plain_text=plain_text,
is_scanned=True,
ocr_confidence=round(avg_conf, 3),
)
def _detect_tables_from_ocr(ocr_data: dict) -> list[ContentBlock]:
"""Basic table detection from OCR bounding-box alignment.
Groups words by similar y-coordinates (rows) and x-gaps (columns).
This is a heuristic — it won't catch every table but gives a
reasonable first pass.
"""
blocks: list[ContentBlock] = []
tops = ocr_data.get("top", [])
lefts = ocr_data.get("left", [])
widths = ocr_data.get("width", [])
heights = ocr_data.get("height", [])
texts = ocr_data.get("text", [])
if not tops or len(tops) < 4:
return blocks
# Group by block_num
block_nums = ocr_data.get("block_num", [])
line_nums = ocr_data.get("line_num", [])
word_nums = ocr_data.get("word_num", [])
# Build lines: group words with same (block, line)
line_groups: dict[tuple[int, int], list[dict]] = {}
for i in range(len(texts)):
txt = (texts[i] or "").strip()
if not txt:
continue
key = (block_nums[i], line_nums[i])
line_groups.setdefault(key, []).append({
"text": txt,
"left": lefts[i],
"top": tops[i],
"width": widths[i],
"height": heights[i],
})
# Look for blocks where multiple lines have consistent tab-stop alignment
# (indicates tabular layout). This is a simplified heuristic.
# For production, consider using opencv line-detection on the original image.
return blocks