| """ |
| PDF → Markdown extraction using Docling. |
| |
| Docling preserves complex financial table structures by converting |
| PDF content into clean Markdown, solving cell-merging and missing-row |
| issues that plague basic text extractors like PyPDF2. |
| """ |
|
|
| import os |
| from typing import Optional |
|
|
| from docling.document_converter import DocumentConverter |
|
|
|
|
| |
| _converter: Optional[DocumentConverter] = None |
|
|
|
|
| def _get_converter() -> DocumentConverter: |
| """Lazy-load the Docling DocumentConverter (one-time cost).""" |
| global _converter |
| if _converter is None: |
| print(" [Docling] Initializing DocumentConverter...") |
| _converter = DocumentConverter() |
| print(" [Docling] Converter ready.") |
| return _converter |
|
|
|
|
| def extract_text_from_pdf(pdf_path: str) -> str: |
| """ |
| Convert a PDF file to a Markdown string using Docling. |
| |
| Args: |
| pdf_path: Path to the PDF file. |
| |
| Returns: |
| Markdown-formatted string preserving tables and structure. |
| |
| Raises: |
| FileNotFoundError: If the PDF file doesn't exist. |
| ValueError: If Docling produces no content. |
| """ |
| if not os.path.exists(pdf_path): |
| raise FileNotFoundError(f"PDF not found: {pdf_path}") |
|
|
| converter = _get_converter() |
|
|
| print(f" [Docling] Converting: {os.path.basename(pdf_path)} → Markdown") |
| result = converter.convert(pdf_path) |
| markdown_text = result.document.export_to_markdown() |
|
|
| if not markdown_text.strip(): |
| raise ValueError( |
| f"No content extracted from {pdf_path}. " |
| "The PDF may be image-only or corrupted." |
| ) |
|
|
| print(f" [Docling] Extracted {len(markdown_text):,} characters of Markdown.") |
| return markdown_text |
|
|
|
|
| def extract_text_from_directory(dir_path: str, max_files: Optional[int] = None) -> list: |
| """ |
| Extract Markdown from all PDFs in a directory. |
| |
| Args: |
| dir_path: Path to directory containing PDFs. |
| max_files: Maximum number of files to process. |
| |
| Returns: |
| List of dicts with 'filename' and 'text' keys. |
| """ |
| results = [] |
| pdf_files = [f for f in os.listdir(dir_path) if f.lower().endswith('.pdf')] |
|
|
| if max_files: |
| pdf_files = pdf_files[:max_files] |
|
|
| for filename in pdf_files: |
| filepath = os.path.join(dir_path, filename) |
| try: |
| text = extract_text_from_pdf(filepath) |
| results.append({ |
| "filename": filename, |
| "text": text, |
| "char_count": len(text), |
| }) |
| except (ValueError, Exception) as e: |
| print(f" [SKIP] {filename}: {e}") |
|
|
| print(f"Successfully extracted text from {len(results)}/{len(pdf_files)} PDFs") |
| return results |
|
|