|
|
""" |
|
|
Lightweight Agentic OCR Document Extraction (Tesseract) |
|
|
|
|
|
A lightweight, agentic OCR pipeline to extract text and structured fields from document images. |
|
|
|
|
|
Key features: |
|
|
- Multiple preprocessing variants (grayscale, thresholding, sharpening, denoise, resize) |
|
|
- Multiple Tesseract page segmentation modes (PSM) |
|
|
- Candidate scoring via average OCR confidence |
|
|
- Simple rule-based field extraction (DOI, title, authors, abstract, keywords) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import json |
|
|
import argparse |
|
|
import unicodedata |
|
|
from dataclasses import dataclass, asdict |
|
|
from typing import Dict, List, Tuple, Optional, Any |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
|
|
|
import pytesseract |
|
|
from pytesseract import Output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_uint8(img: np.ndarray) -> np.ndarray: |
|
|
"""Ensure image is uint8 dtype, clipping values if needed.""" |
|
|
if img.dtype == np.uint8: |
|
|
return img |
|
|
return np.clip(img, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
def preprocess_variants(rgb_img: np.ndarray, scale_factor: float = 1.5) -> Dict[str, np.ndarray]: |
|
|
"""Generate multiple preprocessing variants for OCR.""" |
|
|
variants: Dict[str, np.ndarray] = {} |
|
|
|
|
|
|
|
|
variants['raw'] = rgb_img |
|
|
|
|
|
|
|
|
h, w = rgb_img.shape[:2] |
|
|
up = cv2.resize(rgb_img, (int(w * scale_factor), int(h * scale_factor)), interpolation=cv2.INTER_CUBIC) |
|
|
variants['upscaled'] = up |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(rgb_img, cv2.COLOR_RGB2GRAY) |
|
|
variants['gray'] = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
_, th_otsu = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
variants['otsu'] = cv2.cvtColor(th_otsu, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
th_adapt = cv2.adaptiveThreshold( |
|
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 11 |
|
|
) |
|
|
variants['adaptive'] = cv2.cvtColor(th_adapt, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
den = cv2.fastNlMeansDenoising(gray, None, h=15, templateWindowSize=7, searchWindowSize=21) |
|
|
variants['denoise'] = cv2.cvtColor(den, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]], dtype=np.float32) |
|
|
sharp = cv2.filter2D(gray, -1, kernel) |
|
|
variants['sharpen'] = cv2.cvtColor(_ensure_uint8(sharp), cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) |
|
|
enhanced = clahe.apply(gray) |
|
|
variants['clahe'] = cv2.cvtColor(enhanced, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
|
|
|
kernel_morph = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) |
|
|
closed = cv2.morphologyEx(th_otsu, cv2.MORPH_CLOSE, kernel_morph) |
|
|
variants['morph_close'] = cv2.cvtColor(closed, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
return variants |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ocr_with_confidence(rgb_img: np.ndarray, psm: int = 6) -> Tuple[str, float, int]: |
|
|
"""Run OCR and return text, average confidence, and word count.""" |
|
|
cfg = f'--oem 3 --psm {psm}' |
|
|
|
|
|
data = pytesseract.image_to_data(rgb_img, output_type=Output.DICT, config=cfg) |
|
|
confs: List[float] = [] |
|
|
word_count = 0 |
|
|
|
|
|
for conf, text in zip(data.get('conf', []), data.get('text', [])): |
|
|
try: |
|
|
c_val = float(conf) |
|
|
except (ValueError, TypeError): |
|
|
continue |
|
|
if not text or not str(text).strip(): |
|
|
continue |
|
|
if c_val < 0: |
|
|
continue |
|
|
confs.append(c_val) |
|
|
word_count += 1 |
|
|
|
|
|
avg_conf = float(np.mean(confs)) if confs else 0.0 |
|
|
text = pytesseract.image_to_string(rgb_img, config=cfg) |
|
|
return text, avg_conf, word_count |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class OcrCandidate: |
|
|
"""Container for OCR candidate results.""" |
|
|
variant: str |
|
|
psm: int |
|
|
avg_conf: float |
|
|
text: str |
|
|
word_count: int |
|
|
score: float |
|
|
|
|
|
|
|
|
def compute_score(avg_conf: float, text: str, word_count: int) -> float: |
|
|
"""Compute a combined score factoring in confidence, length, and word count.""" |
|
|
length = len(text.strip()) |
|
|
|
|
|
|
|
|
score = avg_conf |
|
|
|
|
|
|
|
|
if length < 40: |
|
|
score *= 0.5 |
|
|
elif length < 100: |
|
|
score *= 0.8 |
|
|
|
|
|
|
|
|
if word_count > 20: |
|
|
score *= 1.1 |
|
|
|
|
|
return score |
|
|
|
|
|
|
|
|
def _process_variant(args: Tuple[str, np.ndarray, int]) -> OcrCandidate: |
|
|
"""Process a single variant/psm combination (for parallel execution).""" |
|
|
vname, vimg, psm = args |
|
|
text, avg_conf, word_count = ocr_with_confidence(vimg, psm=psm) |
|
|
score = compute_score(avg_conf, text, word_count) |
|
|
return OcrCandidate( |
|
|
variant=vname, psm=psm, avg_conf=avg_conf, |
|
|
text=text, word_count=word_count, score=score |
|
|
) |
|
|
|
|
|
|
|
|
def run_agent( |
|
|
rgb_img: np.ndarray, |
|
|
psms: List[int] = None, |
|
|
scale_factor: float = 1.5, |
|
|
parallel: bool = True, |
|
|
top_k: int = 10, |
|
|
verbose: bool = True |
|
|
) -> OcrCandidate: |
|
|
"""Run agentic OCR with multiple variants and PSMs, return best candidate.""" |
|
|
if psms is None: |
|
|
psms = [3, 4, 6, 11] |
|
|
|
|
|
variants = preprocess_variants(rgb_img, scale_factor=scale_factor) |
|
|
|
|
|
|
|
|
tasks = [(vname, vimg, psm) for vname, vimg in variants.items() for psm in psms] |
|
|
|
|
|
candidates: List[OcrCandidate] = [] |
|
|
|
|
|
if parallel: |
|
|
with ThreadPoolExecutor(max_workers=min(8, len(tasks))) as executor: |
|
|
futures = [executor.submit(_process_variant, task) for task in tasks] |
|
|
for future in as_completed(futures): |
|
|
candidates.append(future.result()) |
|
|
else: |
|
|
for task in tasks: |
|
|
candidates.append(_process_variant(task)) |
|
|
|
|
|
|
|
|
candidates.sort(key=lambda c: c.score, reverse=True) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print(f'Top {top_k} OCR candidates:') |
|
|
print('-' * 90) |
|
|
for c in candidates[:top_k]: |
|
|
preview = c.text.strip().replace('\n', ' ')[:60] |
|
|
print(f"{c.variant:12s} psm={c.psm:<2d} conf={c.avg_conf:5.1f} " |
|
|
f"words={c.word_count:3d} score={c.score:5.1f} '{preview}...'") |
|
|
print('-' * 90) |
|
|
|
|
|
return candidates[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
"""Clean and normalize OCR text output.""" |
|
|
|
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n') |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\S\n]+', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'^ +| +$', '', text, flags=re.MULTILINE) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n\s*\n+', '\n\n', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def fix_ocr_artifacts(text: str) -> str: |
|
|
"""Fix common OCR misreads and artifacts.""" |
|
|
replacements = [ |
|
|
|
|
|
(r'\bl\b', 'I'), |
|
|
(r'(?<=[a-z])0(?=[a-z])', 'o'), |
|
|
(r'(?<=[a-z])1(?=[a-z])', 'l'), |
|
|
(r'\bll\b', 'II'), |
|
|
|
|
|
(r'(\w)-\n(\w)', r'\1\2'), |
|
|
|
|
|
(r'\n[^\w\n]\n', '\n'), |
|
|
|
|
|
(r'\.{2,}', '...'), |
|
|
|
|
|
(r'\s+([.,;:!?])', r'\1'), |
|
|
(r'([.,;:!?])(?=[A-Za-z])', r'\1 '), |
|
|
] |
|
|
|
|
|
for pattern, repl in replacements: |
|
|
text = re.sub(pattern, repl, text) |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def normalize_unicode(text: str) -> str: |
|
|
"""Normalize Unicode characters to ASCII equivalents where appropriate.""" |
|
|
|
|
|
text = unicodedata.normalize('NFKC', text) |
|
|
|
|
|
|
|
|
replacements = { |
|
|
'\u2018': "'", '\u2019': "'", |
|
|
'\u201c': '"', '\u201d': '"', |
|
|
'\u2013': '-', '\u2014': '-', |
|
|
'\u2026': '...', |
|
|
'\ufb01': 'fi', '\ufb02': 'fl', |
|
|
'\u00a0': ' ', |
|
|
} |
|
|
|
|
|
for old, new in replacements.items(): |
|
|
text = text.replace(old, new) |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def process_ocr_text(text: str, fix_artifacts: bool = True, normalize: bool = True) -> str: |
|
|
"""Full text processing pipeline.""" |
|
|
if normalize: |
|
|
text = normalize_unicode(text) |
|
|
text = clean_text(text) |
|
|
if fix_artifacts: |
|
|
text = fix_ocr_artifacts(text) |
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _first_match(pattern: str, text: str, flags: int = 0) -> Optional[str]: |
|
|
"""Return first regex capture group match, or None.""" |
|
|
m = re.search(pattern, text, flags) |
|
|
return m.group(1).strip() if m else None |
|
|
|
|
|
|
|
|
def _all_matches(pattern: str, text: str, flags: int = 0) -> List[str]: |
|
|
"""Return all regex capture group matches.""" |
|
|
return [m.strip() for m in re.findall(pattern, text, flags) if m.strip()] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ExtractedFields: |
|
|
"""Structured container for extracted document fields.""" |
|
|
doi: Optional[str] = None |
|
|
issn: Optional[str] = None |
|
|
volume: Optional[str] = None |
|
|
issue: Optional[str] = None |
|
|
year: Optional[str] = None |
|
|
pages: Optional[str] = None |
|
|
received: Optional[str] = None |
|
|
accepted: Optional[str] = None |
|
|
published: Optional[str] = None |
|
|
title: Optional[str] = None |
|
|
authors: Optional[List[str]] = None |
|
|
affiliations: Optional[List[str]] = None |
|
|
abstract: Optional[str] = None |
|
|
keywords: Optional[List[str]] = None |
|
|
email: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return {k: v for k, v in asdict(self).items() if v is not None} |
|
|
|
|
|
|
|
|
def extract_doi(text: str) -> Optional[str]: |
|
|
"""Extract DOI with multiple pattern fallbacks.""" |
|
|
patterns = [ |
|
|
r'(?:https?://)?(?:dx\.)?doi\.org/\s*(10\.[^\s]+)', |
|
|
r'DOI\s*[::\u00ef\u00bc\u009a]\s*(10\.[^\s]+)', |
|
|
r'\b(10\.\d{4,}/[^\s]+)', |
|
|
] |
|
|
for pattern in patterns: |
|
|
doi = _first_match(pattern, text, re.IGNORECASE) |
|
|
if doi: |
|
|
|
|
|
doi = re.sub(r'[.,;:)\]]+$', '', doi) |
|
|
return doi |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_identifiers(text: str) -> Dict[str, Optional[str]]: |
|
|
"""Extract various document identifiers.""" |
|
|
return { |
|
|
'issn': _first_match(r'ISSN\s*[::\u00ef\u00bc\u009a]?\s*([0-9]{4}-[0-9]{3}[0-9Xx])', text, re.IGNORECASE), |
|
|
'isbn': _first_match(r'ISBN\s*[::\u00ef\u00bc\u009a]?\s*([\d-]{10,17})', text, re.IGNORECASE), |
|
|
'pmid': _first_match(r'PMID\s*[::\u00ef\u00bc\u009a]?\s*(\d+)', text, re.IGNORECASE), |
|
|
'arxiv': _first_match(r'arXiv\s*[::\u00ef\u00bc\u009a]?\s*(\d+\.\d+)', text, re.IGNORECASE), |
|
|
} |
|
|
|
|
|
|
|
|
def extract_publication_info(text: str) -> Dict[str, Optional[str]]: |
|
|
"""Extract volume, issue, pages, year.""" |
|
|
return { |
|
|
'volume': _first_match(r'Vol(?:ume)?\.?\s*[::\u00ef\u00bc\u009a]?\s*(\d{1,4})', text, re.IGNORECASE), |
|
|
'issue': _first_match(r'(?:Issue|No\.?|Number)\s*[::\u00ef\u00bc\u009a]?\s*(\d{1,4})', text, re.IGNORECASE), |
|
|
'pages': _first_match(r'(?:pp?\.?|pages?)\s*[::\u00ef\u00bc\u009a]?\s*(\d+\s*[-\u2013]\s*\d+)', text, re.IGNORECASE), |
|
|
'year': _first_match(r'\b((?:19|20)\d{2})\b', text), |
|
|
} |
|
|
|
|
|
|
|
|
def extract_dates(text: str) -> Dict[str, Optional[str]]: |
|
|
"""Extract received/accepted/published dates.""" |
|
|
date_pattern = r'[::\u00ef\u00bc\u009a]?\s*([A-Za-z]+\.?\s+\d{1,2},?\s+\d{4}|\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|\d{4}[-/]\d{1,2}[-/]\d{1,2})' |
|
|
return { |
|
|
'received': _first_match(rf'Received{date_pattern}', text, re.IGNORECASE), |
|
|
'accepted': _first_match(rf'Accepted{date_pattern}', text, re.IGNORECASE), |
|
|
'published': _first_match(rf'Published{date_pattern}', text, re.IGNORECASE), |
|
|
} |
|
|
|
|
|
|
|
|
def extract_abstract(text: str) -> Optional[str]: |
|
|
"""Extract abstract text.""" |
|
|
patterns = [ |
|
|
r'Abstract\s*[::\u00ef\u00bc\u009a]?\s*(.*?)(?=\n\s*(?:Keywords?|Key\s*words|Introduction|1\.|1\s))', |
|
|
r'Abstract\s*[::\u00ef\u00bc\u009a]?\s*(.*?)(?=\n\n)', |
|
|
] |
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL) |
|
|
if match: |
|
|
abstract = match.group(1).strip() |
|
|
if len(abstract) > 50: |
|
|
return clean_text(abstract) |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_keywords(text: str) -> Optional[List[str]]: |
|
|
"""Extract keywords list.""" |
|
|
pattern = r'(?:Keywords?|Key\s*words)\s*[::\u00ef\u00bc\u009a]?\s*(.*?)(?=\n\n|\n\s*[A-Z][a-z]+:|\Z)' |
|
|
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL) |
|
|
if match: |
|
|
kw_text = match.group(1).strip() |
|
|
|
|
|
parts = re.split(r'[;,\u2022\u00b7]|\s{2,}', kw_text) |
|
|
keywords = [p.strip().strip('.-') for p in parts if p.strip() and len(p.strip()) > 2] |
|
|
return keywords if keywords else None |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_title(lines: List[str]) -> Optional[str]: |
|
|
"""Extract paper title using heuristics.""" |
|
|
exclude_markers = { |
|
|
'journal', 'issn', 'isbn', 'volume', 'issue', 'article', 'research article', |
|
|
'department', 'university', 'corresponding', 'received', 'accepted', |
|
|
'abstract', 'keywords', 'http', 'doi', 'email', '@', 'copyright' |
|
|
} |
|
|
|
|
|
candidates = [] |
|
|
for i, ln in enumerate(lines[:15]): |
|
|
ln_lower = ln.lower() |
|
|
|
|
|
|
|
|
if any(m in ln_lower for m in exclude_markers): |
|
|
continue |
|
|
|
|
|
|
|
|
if not (25 <= len(ln) <= 200): |
|
|
continue |
|
|
|
|
|
|
|
|
words = ln.split() |
|
|
if len(words) < 4: |
|
|
continue |
|
|
|
|
|
|
|
|
letter_ratio = sum(c.isalpha() for c in ln) / max(1, len(ln)) |
|
|
if letter_ratio < 0.6: |
|
|
continue |
|
|
|
|
|
|
|
|
score = 100 - i * 5 |
|
|
if ln[0].isupper(): |
|
|
score += 10 |
|
|
if 50 < len(ln) < 150: |
|
|
score += 10 |
|
|
|
|
|
candidates.append((score, ln)) |
|
|
|
|
|
candidates.sort(reverse=True) |
|
|
return candidates[0][1] if candidates else None |
|
|
|
|
|
|
|
|
def extract_authors(text: str, lines: List[str], title: Optional[str]) -> Optional[List[str]]: |
|
|
"""Extract author names.""" |
|
|
|
|
|
if title and title in lines: |
|
|
idx = lines.index(title) |
|
|
for i in range(idx + 1, min(idx + 4, len(lines))): |
|
|
candidate = lines[i] |
|
|
|
|
|
if re.search(r'\b(?:and|&)\b', candidate, re.IGNORECASE) or candidate.count(',') >= 1: |
|
|
|
|
|
caps = re.findall(r'\b[A-Z][a-z]+\b', candidate) |
|
|
if len(caps) >= 2: |
|
|
|
|
|
authors = re.split(r',\s*(?:and\s+)?|\s+and\s+|\s*&\s*', candidate) |
|
|
authors = [a.strip() for a in authors if a.strip() and len(a.strip()) > 2] |
|
|
if authors: |
|
|
return authors |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_email(text: str) -> Optional[str]: |
|
|
"""Extract corresponding author email.""" |
|
|
pattern = r'[\w.-]+@[\w.-]+\.\w+' |
|
|
emails = re.findall(pattern, text) |
|
|
return emails[0] if emails else None |
|
|
|
|
|
|
|
|
def extract_fields(text: str) -> ExtractedFields: |
|
|
"""Main extraction function combining all extractors.""" |
|
|
lines = [ln.strip() for ln in text.split('\n') if ln.strip()] |
|
|
|
|
|
|
|
|
doi = extract_doi(text) |
|
|
identifiers = extract_identifiers(text) |
|
|
pub_info = extract_publication_info(text) |
|
|
dates = extract_dates(text) |
|
|
title = extract_title(lines) |
|
|
authors = extract_authors(text, lines, title) |
|
|
abstract = extract_abstract(text) |
|
|
keywords = extract_keywords(text) |
|
|
email = extract_email(text) |
|
|
|
|
|
return ExtractedFields( |
|
|
doi=doi, |
|
|
issn=identifiers.get('issn'), |
|
|
volume=pub_info.get('volume'), |
|
|
issue=pub_info.get('issue'), |
|
|
pages=pub_info.get('pages'), |
|
|
year=pub_info.get('year'), |
|
|
received=dates.get('received'), |
|
|
accepted=dates.get('accepted'), |
|
|
published=dates.get('published'), |
|
|
title=title, |
|
|
authors=authors, |
|
|
abstract=abstract, |
|
|
keywords=keywords, |
|
|
email=email, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_image( |
|
|
image_path: str, |
|
|
output_text_path: Optional[str] = None, |
|
|
output_json_path: Optional[str] = None, |
|
|
scale_factor: float = 1.5, |
|
|
psms: List[int] = None, |
|
|
verbose: bool = True |
|
|
) -> Tuple[str, ExtractedFields, OcrCandidate]: |
|
|
""" |
|
|
Process a document image and extract text and structured fields. |
|
|
|
|
|
Args: |
|
|
image_path: Path to the input image file |
|
|
output_text_path: Optional path to save extracted text |
|
|
output_json_path: Optional path to save extracted fields as JSON |
|
|
scale_factor: Scale factor for image upscaling |
|
|
psms: List of Tesseract page segmentation modes to try |
|
|
verbose: Whether to print progress information |
|
|
|
|
|
Returns: |
|
|
Tuple of (cleaned_text, extracted_fields, best_ocr_candidate) |
|
|
""" |
|
|
if psms is None: |
|
|
psms = [3, 4, 6, 11] |
|
|
|
|
|
|
|
|
bgr = cv2.imread(image_path) |
|
|
if bgr is None: |
|
|
raise ValueError(f'Failed to read image: {image_path}') |
|
|
|
|
|
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
if verbose: |
|
|
print(f'Processing image: {image_path}') |
|
|
print(f'Image size: {rgb.shape[1]}x{rgb.shape[0]}') |
|
|
|
|
|
|
|
|
best = run_agent(rgb, psms=psms, scale_factor=scale_factor, verbose=verbose) |
|
|
|
|
|
if verbose: |
|
|
print(f'\nSelected: {best.variant} | PSM={best.psm} | conf={best.avg_conf:.1f} | score={best.score:.1f}') |
|
|
|
|
|
|
|
|
cleaned_text = process_ocr_text(best.text) |
|
|
|
|
|
|
|
|
fields = extract_fields(cleaned_text) |
|
|
|
|
|
|
|
|
if output_text_path: |
|
|
with open(output_text_path, 'w', encoding='utf-8') as f: |
|
|
f.write(cleaned_text) |
|
|
if verbose: |
|
|
print(f'Saved text: {output_text_path}') |
|
|
|
|
|
if output_json_path: |
|
|
with open(output_json_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(fields.to_dict(), f, indent=2, ensure_ascii=False) |
|
|
if verbose: |
|
|
print(f'Saved JSON: {output_json_path}') |
|
|
|
|
|
return cleaned_text, fields, best |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Command-line interface for the OCR extractor.""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description='Lightweight Agentic OCR Document Extraction', |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=''' |
|
|
Examples: |
|
|
python agentic_ocr_extractor.py document.jpg |
|
|
python agentic_ocr_extractor.py document.png -o output.txt -j fields.json |
|
|
python agentic_ocr_extractor.py scan.jpg --scale 2.0 --psm 3 6 11 |
|
|
''' |
|
|
) |
|
|
|
|
|
parser.add_argument('image', help='Path to the input image file') |
|
|
parser.add_argument('-o', '--output-text', help='Path to save extracted text') |
|
|
parser.add_argument('-j', '--output-json', help='Path to save extracted fields as JSON') |
|
|
parser.add_argument('--scale', type=float, default=1.5, help='Scale factor for upscaling (default: 1.5)') |
|
|
parser.add_argument('--psm', type=int, nargs='+', default=[3, 4, 6, 11], |
|
|
help='Tesseract PSM modes to try (default: 3 4 6 11)') |
|
|
parser.add_argument('-q', '--quiet', action='store_true', help='Suppress progress output') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if not os.path.exists(args.image): |
|
|
print(f'Error: Image file not found: {args.image}') |
|
|
return 1 |
|
|
|
|
|
try: |
|
|
cleaned_text, fields, best = process_image( |
|
|
args.image, |
|
|
output_text_path=args.output_text, |
|
|
output_json_path=args.output_json, |
|
|
scale_factor=args.scale, |
|
|
psms=args.psm, |
|
|
verbose=not args.quiet |
|
|
) |
|
|
|
|
|
|
|
|
print('\nExtracted Fields:') |
|
|
print(json.dumps(fields.to_dict(), indent=2, ensure_ascii=False)) |
|
|
|
|
|
return 0 |
|
|
|
|
|
except Exception as e: |
|
|
print(f'Error: {e}') |
|
|
return 1 |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
exit(main()) |
|
|
|