Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Hybrid PDF extractor: | |
| 1) Text-based PDF via PyMuPDF/pdfplumber | |
| 2) Scan PDF via OCR (Tesseract first, PaddleOCR fallback) | |
| Output JSON to stdout. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| import sys | |
| from typing import Optional | |
| from PIL import ImageFilter, ImageOps | |
| def clean_text(text: str) -> str: | |
| text = text or "" | |
| text = re.sub(r"\r\n?", "\n", text) | |
| text = re.sub(r"[ \t]{2,}", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| def extract_with_pymupdf(path: str, max_pages: int) -> str: | |
| try: | |
| import fitz # PyMuPDF | |
| except Exception: | |
| return "" | |
| texts = [] | |
| try: | |
| doc = fitz.open(path) | |
| total = min(len(doc), max_pages) | |
| for i in range(total): | |
| page = doc.load_page(i) | |
| texts.append(page.get_text("text") or "") | |
| doc.close() | |
| except Exception: | |
| return "" | |
| return clean_text("\n".join(texts)) | |
| def extract_with_pdfplumber(path: str, max_pages: int) -> str: | |
| try: | |
| import pdfplumber | |
| except Exception: | |
| return "" | |
| texts = [] | |
| try: | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages[:max_pages]: | |
| texts.append(page.extract_text() or "") | |
| except Exception: | |
| return "" | |
| return clean_text("\n".join(texts)) | |
| def preprocess_image_for_ocr(image): | |
| """ | |
| Improve readability for scan-based PDFs: | |
| - grayscale | |
| - autocontrast | |
| - light denoise/sharpen | |
| """ | |
| img = image.convert("L") | |
| img = ImageOps.autocontrast(img) | |
| img = img.filter(ImageFilter.MedianFilter(size=3)) | |
| img = img.filter(ImageFilter.SHARPEN) | |
| return img | |
| def ocr_with_tesseract(path: str, max_pages: int, lang: str) -> str: | |
| try: | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| except Exception: | |
| return "" | |
| texts = [] | |
| try: | |
| images = convert_from_path(path, dpi=250, first_page=1, last_page=max_pages) | |
| for image in images: | |
| processed = preprocess_image_for_ocr(image) | |
| # First pass: general OCR | |
| text = pytesseract.image_to_string( | |
| processed, | |
| lang=lang, | |
| config="--oem 3 --psm 6", | |
| ) or "" | |
| # Fallback pass if result is still too short | |
| if len(clean_text(text)) < 20: | |
| text = pytesseract.image_to_string( | |
| processed, | |
| lang=lang if "+" in lang else f"{lang}+eng", | |
| config="--oem 3 --psm 11", | |
| ) or text | |
| # Final fallback in case requested lang data is unavailable | |
| if len(clean_text(text)) < 20: | |
| text = pytesseract.image_to_string( | |
| processed, | |
| lang="eng", | |
| config="--oem 3 --psm 6", | |
| ) or text | |
| texts.append(text) | |
| except Exception: | |
| return "" | |
| return clean_text("\n".join(texts)) | |
| def ocr_with_paddle(path: str, max_pages: int) -> str: | |
| try: | |
| from pdf2image import convert_from_path | |
| from paddleocr import PaddleOCR | |
| except Exception: | |
| return "" | |
| texts = [] | |
| try: | |
| import numpy as np | |
| images = convert_from_path(path, dpi=240, first_page=1, last_page=max_pages) | |
| ocr = PaddleOCR(use_angle_cls=True, lang="en", show_log=False) | |
| for image in images: | |
| processed = preprocess_image_for_ocr(image) | |
| result = ocr.ocr(np.array(processed)) | |
| if not result: | |
| continue | |
| page_lines = [] | |
| for item in result[0] or []: | |
| if isinstance(item, (list, tuple)) and len(item) >= 2: | |
| text_info = item[1] | |
| if isinstance(text_info, (list, tuple)) and text_info: | |
| page_lines.append(str(text_info[0])) | |
| if page_lines: | |
| texts.append("\n".join(page_lines)) | |
| except Exception: | |
| return "" | |
| return clean_text("\n".join(texts)) | |
| def looks_like_text_based(text: str) -> bool: | |
| text = clean_text(text) | |
| if len(text) < 10: | |
| return False | |
| alnum_count = sum(1 for c in text if c.isalnum()) | |
| return alnum_count >= 6 | |
| def run(path: str, max_pages: int, ocr_lang: str) -> dict: | |
| text = extract_with_pymupdf(path, max_pages) | |
| if looks_like_text_based(text): | |
| return { | |
| "success": True, | |
| "mode": "text-based", | |
| "engine": "pymupdf", | |
| "text": text, | |
| } | |
| text_pdfplumber = extract_with_pdfplumber(path, max_pages) | |
| if looks_like_text_based(text_pdfplumber): | |
| return { | |
| "success": True, | |
| "mode": "text-based", | |
| "engine": "pdfplumber", | |
| "text": text_pdfplumber, | |
| } | |
| text_ocr_tesseract = ocr_with_tesseract(path, max_pages, ocr_lang) | |
| if looks_like_text_based(text_ocr_tesseract): | |
| return { | |
| "success": True, | |
| "mode": "scan-ocr", | |
| "engine": "tesseract", | |
| "text": text_ocr_tesseract, | |
| "debug": { | |
| "len_pymupdf": len(clean_text(text)), | |
| "len_pdfplumber": len(clean_text(text_pdfplumber)), | |
| "len_tesseract": len(clean_text(text_ocr_tesseract)), | |
| }, | |
| } | |
| text_ocr_paddle = ocr_with_paddle(path, max_pages) | |
| if looks_like_text_based(text_ocr_paddle): | |
| return { | |
| "success": True, | |
| "mode": "scan-ocr", | |
| "engine": "paddleocr", | |
| "text": text_ocr_paddle, | |
| "debug": { | |
| "len_pymupdf": len(clean_text(text)), | |
| "len_pdfplumber": len(clean_text(text_pdfplumber)), | |
| "len_tesseract": len(clean_text(text_ocr_tesseract)), | |
| "len_paddleocr": len(clean_text(text_ocr_paddle)), | |
| }, | |
| } | |
| merged = clean_text("\n\n".join([text, text_pdfplumber, text_ocr_tesseract, text_ocr_paddle])) | |
| return { | |
| "success": len(merged) >= 10, | |
| "mode": "mixed-fallback" if merged else "none", | |
| "engine": "combined", | |
| "text": merged, | |
| "error": "Tidak ada teks yang dapat diekstrak dari PDF." if len(merged) < 10 else None, | |
| "debug": { | |
| "len_pymupdf": len(clean_text(text)), | |
| "len_pdfplumber": len(clean_text(text_pdfplumber)), | |
| "len_tesseract": len(clean_text(text_ocr_tesseract)), | |
| "len_paddleocr": len(clean_text(text_ocr_paddle)), | |
| "len_merged": len(merged), | |
| }, | |
| } | |
| def parse_args(argv: Optional[list] = None) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Extract text from PDF (text-based + OCR)") | |
| parser.add_argument("pdf_path", help="Path to PDF file") | |
| parser.add_argument("--max-pages", type=int, default=20) | |
| parser.add_argument("--ocr-lang", default="ind+eng") | |
| return parser.parse_args(argv) | |
| def main(argv: Optional[list] = None) -> int: | |
| args = parse_args(argv) | |
| try: | |
| payload = run(args.pdf_path, max(1, args.max_pages), args.ocr_lang) | |
| except Exception as exc: | |
| payload = { | |
| "success": False, | |
| "mode": "error", | |
| "engine": "none", | |
| "text": "", | |
| "error": str(exc), | |
| } | |
| sys.stdout.write(json.dumps(payload, ensure_ascii=False)) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |