Spaces:
Running
Running
| """ | |
| pdf_processor.py | |
| ================= | |
| Production-ready PDF preprocessing module using Docling. | |
| What this module does: | |
| 1. Loads a PDF using Docling's document converter | |
| 2. Extracts text sections with their heading hierarchy | |
| 3. Filters out noise pages (cover, TOC, disclaimers, legal boilerplate) | |
| 4. Extracts tables as structured data (markdown + row/col data) | |
| 5. Cleans and normalises text (whitespace, encoding issues) | |
| 6. Attaches rich metadata to every element | |
| 7. Saves the structured output as JSON | |
| Why Docling over PyPDF / pdfplumber? | |
| - PyPDF gives raw text dump — tables become garbled single lines | |
| - pdfplumber is better but still struggles with multi-column layouts | |
| - Docling runs an AI layout model (DocLayNet) that understands the | |
| visual structure of the page: columns, tables, headings, captions | |
| - For financial documents with income statements and data tables | |
| this structural understanding is non-negotiable | |
| Usage (as a module): | |
| from src.pdf_processor import PDFProcessor | |
| processor = PDFProcessor() | |
| result = processor.process("data/raw/morningstar/ptc01302411420.pdf") | |
| Usage (as a script): | |
| python src/pdf_processor.py | |
| """ | |
| import re | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| # ── Logging ──────────────────────────────────────────────────────────────────── | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s" | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ── Paths ────────────────────────────────────────────────────────────────────── | |
| BASE_DIR = Path(__file__).parent.parent | |
| RAW_DIR = BASE_DIR / "data" / "raw" / "morningstar" | |
| PROCESSED_DIR = BASE_DIR / "data" / "processed" / "morningstar" | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 1 — Build the Docling Converter | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # We configure Docling with specific pipeline options before parsing. | |
| # These options control which AI models run during parsing. | |
| # | |
| # Options we set: | |
| # do_table_structure = True | |
| # → Runs TableFormer model to reconstruct table rows/columns | |
| # → Without this, table cells are extracted as unordered text | |
| # | |
| # do_ocr = False | |
| # → These PDFs are digital (not scanned images), so OCR is off | |
| # → Turning OCR on for digital PDFs wastes time and adds noise | |
| # | |
| # generate_picture_images = False | |
| # → We don't need embedded chart/figure images | |
| # → Skipping this speeds up parsing significantly | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def build_converter(): | |
| """Build and return a configured Docling DocumentConverter.""" | |
| from docling.document_converter import DocumentConverter, PdfFormatOption | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions | |
| from docling.datamodel.base_models import InputFormat | |
| opts = PdfPipelineOptions() | |
| opts.do_table_structure = True # reconstruct table rows/columns | |
| opts.do_ocr = False # skip OCR — digital PDFs only | |
| opts.generate_picture_images = False # skip figure image extraction | |
| converter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=opts) | |
| } | |
| ) | |
| log.info("Docling converter initialised (table_structure=ON, OCR=OFF)") | |
| return converter | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 2 — Noise Page Filter | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Not every page in a financial PDF is useful. Pages we actively remove: | |
| # | |
| # Cover / Title pages | |
| # → Just document title, author name, date | |
| # → Zero retrieval value — no financial content | |
| # | |
| # Table of Contents / Index pages | |
| # → Lists section names and page numbers | |
| # → Section names are already captured in real section headers | |
| # → Page numbers refer to printed pages, useless in RAG | |
| # | |
| # Disclaimer / Legal pages | |
| # → "Important Disclosure", "General Disclosure", "Risk Warning", | |
| # "Conflicts of Interest", copyright notices | |
| # → ACTIVELY HARMFUL: contains terms like "investment", "securities", | |
| # "risk" that match financial queries but return legal boilerplate | |
| # → Query "what are the risks?" should return risk analysis, NOT this | |
| # | |
| # Detection strategy: | |
| # → A page is noise if its ONLY headers are known boilerplate titles | |
| # AND its text is below a minimum meaningful length threshold | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Known boilerplate section titles (after normalisation — no ® / ™ symbols) | |
| # Used for exact-match check in is_noise_header() | |
| NOISE_HEADERS = { | |
| "contents", "table of contents", "index", | |
| "important disclosure", "important disclosures", | |
| "general disclosure", "general disclosures", | |
| "risk warning", "risks", "conflicts of interest", | |
| "third-party distribution", "vaneck disclosures", | |
| "legal disclaimer", "disclaimer", "disclaimers", | |
| "about morningstar indexes", "about morningstar equity research", | |
| } | |
| # Regex: street address line e.g. "22 West Washington Street Chicago, IL 60602 USA" | |
| _ADDRESS_RE = re.compile( | |
| r"^\d+\s+\w+.*\b(street|st|avenue|ave|boulevard|blvd|road|rd|drive|dr|lane|ln|way)\b", | |
| re.IGNORECASE, | |
| ) | |
| def _normalize_header(text: str) -> str: | |
| """ | |
| Strip trademark symbols and collapse whitespace so that | |
| "About Morningstar® Equity Research TM" normalises to | |
| "about morningstar equity research". | |
| """ | |
| t = text.strip().lower() | |
| t = t.replace("®", "").replace("™", "").replace("℠", "") | |
| # Remove standalone " tm" / "(tm)" suffixes | |
| t = re.sub(r"\s*\(?\btm\b\)?$", "", t) | |
| # Collapse runs of whitespace left by symbol removal | |
| t = re.sub(r"\s{2,}", " ", t).strip() | |
| return t | |
| def _is_noise_header(raw_header: str) -> bool: | |
| """ | |
| Return True if a single header line is boilerplate. | |
| Checks (in order): | |
| 1. Exact match in NOISE_HEADERS after normalisation | |
| 2. Header ends with 'disclosure', 'disclosures', 'disclaimer', or 'disclaimers' | |
| → catches doc-specific titles like "Wide Moat Focus Index Disclosures" | |
| 3. Header looks like a postal address | |
| → "22 West Washington Street Chicago, IL 60602 USA" | |
| """ | |
| norm = _normalize_header(raw_header) | |
| if norm in NOISE_HEADERS: | |
| return True | |
| # Pattern: ends with a disclosure/disclaimer keyword | |
| if re.search(r"\b(disclosures?|disclaimers?)\s*$", norm): | |
| return True | |
| # Pattern: street address | |
| if _ADDRESS_RE.match(raw_header.strip()): | |
| return True | |
| return False | |
| def is_noise_page(page_sections: list[dict]) -> bool: | |
| """ | |
| Return True if a page contains only boilerplate content. | |
| A page is considered noise if: | |
| - It has no text at all (blank/cover page), OR | |
| - Case A: ALL headers are noise → remove regardless of text length | |
| Catches multi-paragraph legal/disclaimer pages | |
| - Case B: Noise headers outnumber content headers AND text < 300 chars | |
| Catches mixed cover pages with one content title + several disclaimer headers | |
| """ | |
| if not page_sections: | |
| return True # blank page | |
| total_text = " ".join(s["text"] for s in page_sections).strip() | |
| # Blank or near-blank page (cover pages often have <50 chars) | |
| if len(total_text) < 50: | |
| return True | |
| raw_headers = [s["text"] for s in page_sections if s["type"] == "header"] | |
| text_blocks = [s for s in page_sections if s["type"] == "text"] | |
| text_content = " ".join(s["text"] for s in text_blocks).strip() | |
| if not raw_headers: | |
| return False # no headers — let content pages through | |
| noise_headers = [h for h in raw_headers if _is_noise_header(h)] | |
| content_headers = [h for h in raw_headers if not _is_noise_header(h)] | |
| # Case A: ALL headers on the page are noise | |
| if len(content_headers) == 0: | |
| return True | |
| # Case B: Noise headers outnumber content headers AND page is mostly boilerplate text | |
| if len(noise_headers) > len(content_headers) and len(text_content) < 300: | |
| return True | |
| return False | |
| def filter_noise_pages(sections: list[dict]) -> tuple[list[dict], list[int]]: | |
| """ | |
| Remove sections that belong to noise pages. | |
| Returns: | |
| filtered_sections : sections with noise pages removed | |
| removed_pages : list of page numbers that were filtered out | |
| """ | |
| from collections import defaultdict | |
| # Group sections by page | |
| by_page = defaultdict(list) | |
| for s in sections: | |
| pg = s.get("page_num") or 0 | |
| by_page[pg].append(s) | |
| removed_pages = [] | |
| kept_sections = [] | |
| for pg in sorted(by_page.keys()): | |
| if is_noise_page(by_page[pg]): | |
| removed_pages.append(pg) | |
| else: | |
| kept_sections.extend(by_page[pg]) | |
| if removed_pages: | |
| log.info(f" Filtered {len(removed_pages)} noise pages: {removed_pages}") | |
| return kept_sections, removed_pages | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 3 — Text Cleaning | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Raw text from PDFs often contains: | |
| # - Extra whitespace and blank lines between words | |
| # - Hyphenated line breaks ("competi-\ntive" → "competitive") | |
| # - Unicode noise characters (soft hyphens, zero-width spaces) | |
| # - Repeated whitespace inside sentences | |
| # | |
| # We apply a simple cleaning pipeline to fix these before chunking. | |
| # Why clean BEFORE chunking? | |
| # → If we chunk first, each chunk inherits the noise | |
| # → The embedding model will encode noise as part of the meaning | |
| # → Clean text produces cleaner, more accurate embeddings | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def clean_text(text: str) -> str: | |
| """ | |
| Clean raw text extracted from a PDF. | |
| Steps: | |
| 1. Fix hyphenated line breaks ("competi-\\ntion" → "competition") | |
| 2. Remove soft hyphens and zero-width characters | |
| 3. Collapse multiple spaces into one | |
| 4. Strip leading/trailing whitespace | |
| """ | |
| if not text: | |
| return "" | |
| # Step 1: Fix hyphenated line breaks (common in PDFs) | |
| text = re.sub(r"-\n", "", text) | |
| # Step 2: Remove soft hyphens (U+00AD) and zero-width spaces (U+200B) | |
| text = text.replace("\u00ad", "").replace("\u200b", "") | |
| # Step 3: Collapse multiple spaces/tabs into single space | |
| text = re.sub(r"[ \t]+", " ", text) | |
| # Step 4: Collapse more than 2 consecutive newlines into 2 | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 3 — Section Extraction | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Docling's document model organises content as a tree of items. | |
| # We iterate over it and separate items into two types: | |
| # | |
| # SectionHeaderItem → A heading (H1, H2, H3 etc.) | |
| # TextItem → A paragraph of body text | |
| # | |
| # Why capture heading level? | |
| # → Heading level tells us where we are in the document hierarchy | |
| # → "Net Income" under H1 "Financial Statements" is different from | |
| # "Net Income" under H2 "Non-GAAP Reconciliation" | |
| # → We store this in metadata so retrieval can filter by section | |
| # | |
| # Why separate headers from text? | |
| # → Headers are short and don't chunk well alone | |
| # → We prefix each text chunk with its parent header for context | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def extract_sections(doc) -> list[dict]: | |
| """ | |
| Extract all text sections from a parsed Docling document. | |
| Returns a list of dicts: | |
| {type, level, text, page_num, cleaned_text} | |
| """ | |
| from docling.datamodel.document import TextItem, SectionHeaderItem | |
| sections = [] | |
| current_header = "" # track the last seen heading for context | |
| for item, level in doc.iterate_items(): | |
| text = getattr(item, "text", None) | |
| if not text or not text.strip(): | |
| continue | |
| page_num = item.prov[0].page_no if item.prov else None | |
| if isinstance(item, SectionHeaderItem): | |
| current_header = text.strip() | |
| sections.append({ | |
| "type" : "header", | |
| "level" : level, | |
| "text" : text.strip(), | |
| "cleaned_text": clean_text(text), | |
| "page_num" : page_num, | |
| "parent_header": "", | |
| }) | |
| else: | |
| sections.append({ | |
| "type" : "text", | |
| "level" : level, | |
| "text" : text.strip(), | |
| "cleaned_text": clean_text(text), | |
| "page_num" : page_num, | |
| "parent_header": current_header, # context from last heading | |
| }) | |
| log.info(f" Extracted {len(sections)} sections " | |
| f"({sum(1 for s in sections if s['type']=='header')} headers, " | |
| f"{sum(1 for s in sections if s['type']=='text')} text blocks)") | |
| # Remove cover, TOC, and disclaimer pages | |
| sections, removed = filter_noise_pages(sections) | |
| log.info(f" After noise filter: {len(sections)} sections remain") | |
| return sections | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 4 — Table Extraction | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Tables are the most important element in financial documents. | |
| # Docling's TableFormer model reconstructs the row/column structure. | |
| # | |
| # For each table we extract: | |
| # markdown → Human-readable, good for LLM context | |
| # data → Raw list of lists for programmatic access | |
| # headers → Column names for metadata tagging | |
| # | |
| # Why keep tables ATOMIC (never split)? | |
| # → A revenue table split across two chunks loses column alignment | |
| # → LLM receiving half a table gives wrong or hallucinated answers | |
| # → Each table is stored as ONE complete chunk, regardless of size | |
| # | |
| # Why convert to markdown? | |
| # → Markdown tables are easy for LLMs to read and parse | |
| # → They preserve column-row relationships in plain text | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def extract_tables(doc, skip_pages: set = None) -> list[dict]: | |
| """ | |
| Extract all tables from a parsed Docling document. | |
| Args: | |
| skip_pages: set of page numbers to skip (noise pages) | |
| Returns a list of dicts: | |
| {index, page_num, markdown, headers, rows, cols, data, is_atomic} | |
| """ | |
| skip_pages = skip_pages or set() | |
| tables = [] | |
| for i, table in enumerate(doc.tables): | |
| try: | |
| df = table.export_to_dataframe(doc) | |
| markdown = table.export_to_markdown(doc) | |
| page_num = table.prov[0].page_no if table.prov else None | |
| # Skip tables on noise pages (cover, TOC, disclaimer) | |
| if page_num in skip_pages: | |
| continue | |
| # Skip empty or trivially small tables (1 row = probably a label) | |
| if df.empty or len(df) < 2: | |
| continue | |
| tables.append({ | |
| "index" : i, | |
| "page_num" : page_num, | |
| "markdown" : markdown, | |
| "headers" : list(df.columns.astype(str)), | |
| "rows" : len(df), | |
| "cols" : len(df.columns), | |
| "data" : df.fillna("").values.tolist(), | |
| "is_atomic": True, # NEVER split this chunk | |
| }) | |
| except Exception as e: | |
| log.warning(f" Table {i} could not be extracted: {e}") | |
| log.info(f" Extracted {len(tables)} tables") | |
| return tables | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 5 — Metadata Tagging | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Every element (section or table) gets a metadata dict attached. | |
| # This metadata is stored alongside the vector in ChromaDB. | |
| # | |
| # Why metadata matters: | |
| # → Allows FILTERED retrieval ("only search 2024 10-K documents") | |
| # → Enables source citation ("found on page 12 of PTC report") | |
| # → Supports temporal queries ("Apple revenue in fiscal 2024") | |
| # | |
| # Fields we tag: | |
| # source → which file this came from | |
| # doc_type → research_report / 10-K / 10-Q / 8-K | |
| # company → Apple / PTC / etc. | |
| # fiscal_year → for time-aware retrieval | |
| # page_num → for citations | |
| # section_title → which section this chunk belongs to | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def build_metadata(pdf_path: Path, extra: dict = None) -> dict: | |
| """ | |
| Build base metadata for a document from its file path and optional extras. | |
| """ | |
| meta = { | |
| "file_name" : pdf_path.name, | |
| "file_path" : str(pdf_path), | |
| "source" : "morningstar", | |
| "doc_type" : "research_report", | |
| "license" : "proprietary", | |
| "parsed_at" : datetime.now(timezone.utc).isoformat(), | |
| "parser" : "docling", | |
| } | |
| if extra: | |
| meta.update(extra) | |
| return meta | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # PREPROCESSING STEP 6 — Full Document Export | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # After extracting sections and tables, we also export the full document | |
| # as a single markdown string. | |
| # | |
| # Why? | |
| # → Useful for quick inspection and debugging | |
| # → Can be used as a fallback if section-level chunking fails | |
| # → Gives the LLM a complete document view when needed | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def export_full_markdown(doc) -> str: | |
| """Export the entire document as a single markdown string.""" | |
| return doc.export_to_markdown() | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # MAIN PROCESSOR CLASS | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| class PDFProcessor: | |
| """ | |
| End-to-end PDF processor using Docling. | |
| Combines all preprocessing steps into a single callable interface. | |
| Idempotent — skips files that have already been processed (checks cache). | |
| """ | |
| def __init__(self, output_dir: Path = PROCESSED_DIR): | |
| self.output_dir = output_dir | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self._converter = None # lazy load — only initialise when first needed | |
| def converter(self): | |
| if self._converter is None: | |
| log.info("Loading Docling converter (first use) ...") | |
| self._converter = build_converter() | |
| return self._converter | |
| def process(self, pdf_path: str | Path, extra_meta: dict = None, | |
| force: bool = False) -> dict: | |
| """ | |
| Process a single PDF file through all preprocessing steps. | |
| Args: | |
| pdf_path : Path to the PDF file | |
| extra_meta : Optional extra metadata (company, fiscal_year, etc.) | |
| force : If True, re-process even if output already exists | |
| Returns: | |
| Parsed document dict with metadata, sections, tables, markdown | |
| """ | |
| pdf_path = Path(pdf_path) | |
| out_path = self.output_dir / f"{pdf_path.stem}.json" | |
| # Check cache — skip if already processed | |
| if out_path.exists() and not force: | |
| log.info(f"SKIP {pdf_path.name} (already processed → {out_path.name})") | |
| with open(out_path) as f: | |
| return json.load(f) | |
| log.info(f"Processing: {pdf_path.name}") | |
| # ── Step 1: Parse with Docling ──────────────────────────────────────── | |
| result = self.converter.convert(str(pdf_path)) | |
| doc = result.document | |
| log.info(f" Docling parse complete") | |
| # ── Step 2 + 3: Extract sections (text cleaning happens inside) ─────── | |
| sections = extract_sections(doc) | |
| # ── Step 4: Extract tables ──────────────────────────────────────────── | |
| # Identify pages that were removed so we can skip their tables too | |
| from collections import defaultdict | |
| by_page = defaultdict(list) | |
| for s in sections: | |
| pg = s.get("page_num") or 0 | |
| by_page[pg].append(s) | |
| # Get list of noise pages from raw doc (before filter was applied) | |
| raw_sections_for_filter = [] | |
| for item, level in doc.iterate_items(): | |
| from docling.datamodel.document import TextItem, SectionHeaderItem | |
| text = getattr(item, "text", None) | |
| if not text: | |
| continue | |
| page_num = item.prov[0].page_no if item.prov else None | |
| raw_sections_for_filter.append({ | |
| "type" : "header" if isinstance(item, SectionHeaderItem) else "text", | |
| "text" : text.strip(), | |
| "page_num": page_num, | |
| }) | |
| _, removed_pages = filter_noise_pages(raw_sections_for_filter) | |
| tables = extract_tables(doc, skip_pages=set(removed_pages)) | |
| # ── Step 5: Build metadata ──────────────────────────────────────────── | |
| metadata = build_metadata(pdf_path, extra_meta) | |
| metadata["total_sections"] = len(sections) | |
| metadata["total_tables"] = len(tables) | |
| metadata["total_pages"] = max( | |
| (s["page_num"] for s in sections if s["page_num"]), default=0 | |
| ) | |
| # ── Step 6: Full markdown export ────────────────────────────────────── | |
| full_markdown = export_full_markdown(doc) | |
| # ── Assemble final output ───────────────────────────────────────────── | |
| metadata["removed_pages"] = sorted(removed_pages) # used by chunker | |
| parsed = { | |
| "metadata" : metadata, | |
| "sections" : sections, | |
| "tables" : tables, | |
| "full_markdown": full_markdown, | |
| } | |
| # ── Save custom processed JSON ──────────────────────────────────────── | |
| with open(out_path, "w") as f: | |
| json.dump(parsed, f, indent=2, ensure_ascii=False, default=str) | |
| # ── Save native DoclingDocument (for HybridChunker in Phase 3) ─────── | |
| # HybridChunker needs the original DoclingDocument object. | |
| # Docling's native format preserves full structural metadata | |
| # (heading hierarchy, table cell positions, reading order) that | |
| # our custom JSON does not capture. | |
| docling_path = out_path.with_name(out_path.stem + "_docling.json") | |
| with open(docling_path, "w") as f: | |
| f.write(doc.model_dump_json()) | |
| log.info(f" Saved DoclingDocument → {docling_path.name} " | |
| f"({docling_path.stat().st_size / 1024:.1f} KB)") | |
| size_kb = out_path.stat().st_size / 1024 | |
| log.info(f" Saved → {out_path.name} ({size_kb:.1f} KB)") | |
| log.info(f" Summary: {metadata['total_pages']} pages | " | |
| f"{metadata['total_sections']} sections | " | |
| f"{metadata['total_tables']} tables") | |
| return parsed | |
| def process_all(self, pdf_dir: Path = RAW_DIR, | |
| force: bool = False) -> list[dict]: | |
| """Process all PDFs in a directory.""" | |
| pdfs = sorted(pdf_dir.glob("*.pdf")) | |
| log.info(f"Found {len(pdfs)} PDFs in {pdf_dir}") | |
| results = [] | |
| for pdf in pdfs: | |
| result = self.process(pdf, force=force) | |
| results.append(result) | |
| log.info(f"Processing complete — {len(results)} documents") | |
| return results | |
| # ── Entry point ──────────────────────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| processor = PDFProcessor() | |
| results = processor.process_all() | |
| print("\n" + "=" * 55) | |
| print("PROCESSING SUMMARY") | |
| print("=" * 55) | |
| for r in results: | |
| m = r["metadata"] | |
| print(f"\nFile : {m['file_name']}") | |
| print(f"Pages : {m['total_pages']}") | |
| print(f"Sections: {m['total_sections']}") | |
| print(f"Tables : {m['total_tables']}") | |
| if r["tables"]: | |
| print("Tables found:") | |
| for t in r["tables"]: | |
| print(f" Page {t['page_num']} — " | |
| f"{t['rows']} rows × {t['cols']} cols | " | |
| f"Headers: {t['headers'][:3]}") | |