""" pdf_processor.py ================= Production-ready PDF preprocessing module using Docling. What this module does: 1. Loads a PDF using Docling's document converter 2. Extracts text sections with their heading hierarchy 3. Filters out noise pages (cover, TOC, disclaimers, legal boilerplate) 4. Extracts tables as structured data (markdown + row/col data) 5. Cleans and normalises text (whitespace, encoding issues) 6. Attaches rich metadata to every element 7. Saves the structured output as JSON Why Docling over PyPDF / pdfplumber? - PyPDF gives raw text dump — tables become garbled single lines - pdfplumber is better but still struggles with multi-column layouts - Docling runs an AI layout model (DocLayNet) that understands the visual structure of the page: columns, tables, headings, captions - For financial documents with income statements and data tables this structural understanding is non-negotiable Usage (as a module): from src.pdf_processor import PDFProcessor processor = PDFProcessor() result = processor.process("data/raw/morningstar/ptc01302411420.pdf") Usage (as a script): python src/pdf_processor.py """ import re import json import logging from pathlib import Path from datetime import datetime, timezone # ── Logging ──────────────────────────────────────────────────────────────────── logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(levelname)-8s %(message)s" ) log = logging.getLogger(__name__) # ── Paths ────────────────────────────────────────────────────────────────────── BASE_DIR = Path(__file__).parent.parent RAW_DIR = BASE_DIR / "data" / "raw" / "morningstar" PROCESSED_DIR = BASE_DIR / "data" / "processed" / "morningstar" # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 1 — Build the Docling Converter # ────────────────────────────────────────────────────────────────────────────── # We configure Docling with specific pipeline options before parsing. # These options control which AI models run during parsing. # # Options we set: # do_table_structure = True # → Runs TableFormer model to reconstruct table rows/columns # → Without this, table cells are extracted as unordered text # # do_ocr = False # → These PDFs are digital (not scanned images), so OCR is off # → Turning OCR on for digital PDFs wastes time and adds noise # # generate_picture_images = False # → We don't need embedded chart/figure images # → Skipping this speeds up parsing significantly # ══════════════════════════════════════════════════════════════════════════════ def build_converter(): """Build and return a configured Docling DocumentConverter.""" from docling.document_converter import DocumentConverter, PdfFormatOption from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.datamodel.base_models import InputFormat opts = PdfPipelineOptions() opts.do_table_structure = True # reconstruct table rows/columns opts.do_ocr = False # skip OCR — digital PDFs only opts.generate_picture_images = False # skip figure image extraction converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=opts) } ) log.info("Docling converter initialised (table_structure=ON, OCR=OFF)") return converter # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 2 — Noise Page Filter # ────────────────────────────────────────────────────────────────────────────── # Not every page in a financial PDF is useful. Pages we actively remove: # # Cover / Title pages # → Just document title, author name, date # → Zero retrieval value — no financial content # # Table of Contents / Index pages # → Lists section names and page numbers # → Section names are already captured in real section headers # → Page numbers refer to printed pages, useless in RAG # # Disclaimer / Legal pages # → "Important Disclosure", "General Disclosure", "Risk Warning", # "Conflicts of Interest", copyright notices # → ACTIVELY HARMFUL: contains terms like "investment", "securities", # "risk" that match financial queries but return legal boilerplate # → Query "what are the risks?" should return risk analysis, NOT this # # Detection strategy: # → A page is noise if its ONLY headers are known boilerplate titles # AND its text is below a minimum meaningful length threshold # ══════════════════════════════════════════════════════════════════════════════ # Known boilerplate section titles (after normalisation — no ® / ™ symbols) # Used for exact-match check in is_noise_header() NOISE_HEADERS = { "contents", "table of contents", "index", "important disclosure", "important disclosures", "general disclosure", "general disclosures", "risk warning", "risks", "conflicts of interest", "third-party distribution", "vaneck disclosures", "legal disclaimer", "disclaimer", "disclaimers", "about morningstar indexes", "about morningstar equity research", } # Regex: street address line e.g. "22 West Washington Street Chicago, IL 60602 USA" _ADDRESS_RE = re.compile( r"^\d+\s+\w+.*\b(street|st|avenue|ave|boulevard|blvd|road|rd|drive|dr|lane|ln|way)\b", re.IGNORECASE, ) def _normalize_header(text: str) -> str: """ Strip trademark symbols and collapse whitespace so that "About Morningstar® Equity Research TM" normalises to "about morningstar equity research". """ t = text.strip().lower() t = t.replace("®", "").replace("™", "").replace("℠", "") # Remove standalone " tm" / "(tm)" suffixes t = re.sub(r"\s*\(?\btm\b\)?$", "", t) # Collapse runs of whitespace left by symbol removal t = re.sub(r"\s{2,}", " ", t).strip() return t def _is_noise_header(raw_header: str) -> bool: """ Return True if a single header line is boilerplate. Checks (in order): 1. Exact match in NOISE_HEADERS after normalisation 2. Header ends with 'disclosure', 'disclosures', 'disclaimer', or 'disclaimers' → catches doc-specific titles like "Wide Moat Focus Index Disclosures" 3. Header looks like a postal address → "22 West Washington Street Chicago, IL 60602 USA" """ norm = _normalize_header(raw_header) if norm in NOISE_HEADERS: return True # Pattern: ends with a disclosure/disclaimer keyword if re.search(r"\b(disclosures?|disclaimers?)\s*$", norm): return True # Pattern: street address if _ADDRESS_RE.match(raw_header.strip()): return True return False def is_noise_page(page_sections: list[dict]) -> bool: """ Return True if a page contains only boilerplate content. A page is considered noise if: - It has no text at all (blank/cover page), OR - Case A: ALL headers are noise → remove regardless of text length Catches multi-paragraph legal/disclaimer pages - Case B: Noise headers outnumber content headers AND text < 300 chars Catches mixed cover pages with one content title + several disclaimer headers """ if not page_sections: return True # blank page total_text = " ".join(s["text"] for s in page_sections).strip() # Blank or near-blank page (cover pages often have <50 chars) if len(total_text) < 50: return True raw_headers = [s["text"] for s in page_sections if s["type"] == "header"] text_blocks = [s for s in page_sections if s["type"] == "text"] text_content = " ".join(s["text"] for s in text_blocks).strip() if not raw_headers: return False # no headers — let content pages through noise_headers = [h for h in raw_headers if _is_noise_header(h)] content_headers = [h for h in raw_headers if not _is_noise_header(h)] # Case A: ALL headers on the page are noise if len(content_headers) == 0: return True # Case B: Noise headers outnumber content headers AND page is mostly boilerplate text if len(noise_headers) > len(content_headers) and len(text_content) < 300: return True return False def filter_noise_pages(sections: list[dict]) -> tuple[list[dict], list[int]]: """ Remove sections that belong to noise pages. Returns: filtered_sections : sections with noise pages removed removed_pages : list of page numbers that were filtered out """ from collections import defaultdict # Group sections by page by_page = defaultdict(list) for s in sections: pg = s.get("page_num") or 0 by_page[pg].append(s) removed_pages = [] kept_sections = [] for pg in sorted(by_page.keys()): if is_noise_page(by_page[pg]): removed_pages.append(pg) else: kept_sections.extend(by_page[pg]) if removed_pages: log.info(f" Filtered {len(removed_pages)} noise pages: {removed_pages}") return kept_sections, removed_pages # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 3 — Text Cleaning # ────────────────────────────────────────────────────────────────────────────── # Raw text from PDFs often contains: # - Extra whitespace and blank lines between words # - Hyphenated line breaks ("competi-\ntive" → "competitive") # - Unicode noise characters (soft hyphens, zero-width spaces) # - Repeated whitespace inside sentences # # We apply a simple cleaning pipeline to fix these before chunking. # Why clean BEFORE chunking? # → If we chunk first, each chunk inherits the noise # → The embedding model will encode noise as part of the meaning # → Clean text produces cleaner, more accurate embeddings # ══════════════════════════════════════════════════════════════════════════════ def clean_text(text: str) -> str: """ Clean raw text extracted from a PDF. Steps: 1. Fix hyphenated line breaks ("competi-\\ntion" → "competition") 2. Remove soft hyphens and zero-width characters 3. Collapse multiple spaces into one 4. Strip leading/trailing whitespace """ if not text: return "" # Step 1: Fix hyphenated line breaks (common in PDFs) text = re.sub(r"-\n", "", text) # Step 2: Remove soft hyphens (U+00AD) and zero-width spaces (U+200B) text = text.replace("\u00ad", "").replace("\u200b", "") # Step 3: Collapse multiple spaces/tabs into single space text = re.sub(r"[ \t]+", " ", text) # Step 4: Collapse more than 2 consecutive newlines into 2 text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 3 — Section Extraction # ────────────────────────────────────────────────────────────────────────────── # Docling's document model organises content as a tree of items. # We iterate over it and separate items into two types: # # SectionHeaderItem → A heading (H1, H2, H3 etc.) # TextItem → A paragraph of body text # # Why capture heading level? # → Heading level tells us where we are in the document hierarchy # → "Net Income" under H1 "Financial Statements" is different from # "Net Income" under H2 "Non-GAAP Reconciliation" # → We store this in metadata so retrieval can filter by section # # Why separate headers from text? # → Headers are short and don't chunk well alone # → We prefix each text chunk with its parent header for context # ══════════════════════════════════════════════════════════════════════════════ def extract_sections(doc) -> list[dict]: """ Extract all text sections from a parsed Docling document. Returns a list of dicts: {type, level, text, page_num, cleaned_text} """ from docling.datamodel.document import TextItem, SectionHeaderItem sections = [] current_header = "" # track the last seen heading for context for item, level in doc.iterate_items(): text = getattr(item, "text", None) if not text or not text.strip(): continue page_num = item.prov[0].page_no if item.prov else None if isinstance(item, SectionHeaderItem): current_header = text.strip() sections.append({ "type" : "header", "level" : level, "text" : text.strip(), "cleaned_text": clean_text(text), "page_num" : page_num, "parent_header": "", }) else: sections.append({ "type" : "text", "level" : level, "text" : text.strip(), "cleaned_text": clean_text(text), "page_num" : page_num, "parent_header": current_header, # context from last heading }) log.info(f" Extracted {len(sections)} sections " f"({sum(1 for s in sections if s['type']=='header')} headers, " f"{sum(1 for s in sections if s['type']=='text')} text blocks)") # Remove cover, TOC, and disclaimer pages sections, removed = filter_noise_pages(sections) log.info(f" After noise filter: {len(sections)} sections remain") return sections # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 4 — Table Extraction # ────────────────────────────────────────────────────────────────────────────── # Tables are the most important element in financial documents. # Docling's TableFormer model reconstructs the row/column structure. # # For each table we extract: # markdown → Human-readable, good for LLM context # data → Raw list of lists for programmatic access # headers → Column names for metadata tagging # # Why keep tables ATOMIC (never split)? # → A revenue table split across two chunks loses column alignment # → LLM receiving half a table gives wrong or hallucinated answers # → Each table is stored as ONE complete chunk, regardless of size # # Why convert to markdown? # → Markdown tables are easy for LLMs to read and parse # → They preserve column-row relationships in plain text # ══════════════════════════════════════════════════════════════════════════════ def extract_tables(doc, skip_pages: set = None) -> list[dict]: """ Extract all tables from a parsed Docling document. Args: skip_pages: set of page numbers to skip (noise pages) Returns a list of dicts: {index, page_num, markdown, headers, rows, cols, data, is_atomic} """ skip_pages = skip_pages or set() tables = [] for i, table in enumerate(doc.tables): try: df = table.export_to_dataframe(doc) markdown = table.export_to_markdown(doc) page_num = table.prov[0].page_no if table.prov else None # Skip tables on noise pages (cover, TOC, disclaimer) if page_num in skip_pages: continue # Skip empty or trivially small tables (1 row = probably a label) if df.empty or len(df) < 2: continue tables.append({ "index" : i, "page_num" : page_num, "markdown" : markdown, "headers" : list(df.columns.astype(str)), "rows" : len(df), "cols" : len(df.columns), "data" : df.fillna("").values.tolist(), "is_atomic": True, # NEVER split this chunk }) except Exception as e: log.warning(f" Table {i} could not be extracted: {e}") log.info(f" Extracted {len(tables)} tables") return tables # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 5 — Metadata Tagging # ────────────────────────────────────────────────────────────────────────────── # Every element (section or table) gets a metadata dict attached. # This metadata is stored alongside the vector in ChromaDB. # # Why metadata matters: # → Allows FILTERED retrieval ("only search 2024 10-K documents") # → Enables source citation ("found on page 12 of PTC report") # → Supports temporal queries ("Apple revenue in fiscal 2024") # # Fields we tag: # source → which file this came from # doc_type → research_report / 10-K / 10-Q / 8-K # company → Apple / PTC / etc. # fiscal_year → for time-aware retrieval # page_num → for citations # section_title → which section this chunk belongs to # ══════════════════════════════════════════════════════════════════════════════ def build_metadata(pdf_path: Path, extra: dict = None) -> dict: """ Build base metadata for a document from its file path and optional extras. """ meta = { "file_name" : pdf_path.name, "file_path" : str(pdf_path), "source" : "morningstar", "doc_type" : "research_report", "license" : "proprietary", "parsed_at" : datetime.now(timezone.utc).isoformat(), "parser" : "docling", } if extra: meta.update(extra) return meta # ══════════════════════════════════════════════════════════════════════════════ # PREPROCESSING STEP 6 — Full Document Export # ────────────────────────────────────────────────────────────────────────────── # After extracting sections and tables, we also export the full document # as a single markdown string. # # Why? # → Useful for quick inspection and debugging # → Can be used as a fallback if section-level chunking fails # → Gives the LLM a complete document view when needed # ══════════════════════════════════════════════════════════════════════════════ def export_full_markdown(doc) -> str: """Export the entire document as a single markdown string.""" return doc.export_to_markdown() # ══════════════════════════════════════════════════════════════════════════════ # MAIN PROCESSOR CLASS # ══════════════════════════════════════════════════════════════════════════════ class PDFProcessor: """ End-to-end PDF processor using Docling. Combines all preprocessing steps into a single callable interface. Idempotent — skips files that have already been processed (checks cache). """ def __init__(self, output_dir: Path = PROCESSED_DIR): self.output_dir = output_dir self.output_dir.mkdir(parents=True, exist_ok=True) self._converter = None # lazy load — only initialise when first needed @property def converter(self): if self._converter is None: log.info("Loading Docling converter (first use) ...") self._converter = build_converter() return self._converter def process(self, pdf_path: str | Path, extra_meta: dict = None, force: bool = False) -> dict: """ Process a single PDF file through all preprocessing steps. Args: pdf_path : Path to the PDF file extra_meta : Optional extra metadata (company, fiscal_year, etc.) force : If True, re-process even if output already exists Returns: Parsed document dict with metadata, sections, tables, markdown """ pdf_path = Path(pdf_path) out_path = self.output_dir / f"{pdf_path.stem}.json" # Check cache — skip if already processed if out_path.exists() and not force: log.info(f"SKIP {pdf_path.name} (already processed → {out_path.name})") with open(out_path) as f: return json.load(f) log.info(f"Processing: {pdf_path.name}") # ── Step 1: Parse with Docling ──────────────────────────────────────── result = self.converter.convert(str(pdf_path)) doc = result.document log.info(f" Docling parse complete") # ── Step 2 + 3: Extract sections (text cleaning happens inside) ─────── sections = extract_sections(doc) # ── Step 4: Extract tables ──────────────────────────────────────────── # Identify pages that were removed so we can skip their tables too from collections import defaultdict by_page = defaultdict(list) for s in sections: pg = s.get("page_num") or 0 by_page[pg].append(s) # Get list of noise pages from raw doc (before filter was applied) raw_sections_for_filter = [] for item, level in doc.iterate_items(): from docling.datamodel.document import TextItem, SectionHeaderItem text = getattr(item, "text", None) if not text: continue page_num = item.prov[0].page_no if item.prov else None raw_sections_for_filter.append({ "type" : "header" if isinstance(item, SectionHeaderItem) else "text", "text" : text.strip(), "page_num": page_num, }) _, removed_pages = filter_noise_pages(raw_sections_for_filter) tables = extract_tables(doc, skip_pages=set(removed_pages)) # ── Step 5: Build metadata ──────────────────────────────────────────── metadata = build_metadata(pdf_path, extra_meta) metadata["total_sections"] = len(sections) metadata["total_tables"] = len(tables) metadata["total_pages"] = max( (s["page_num"] for s in sections if s["page_num"]), default=0 ) # ── Step 6: Full markdown export ────────────────────────────────────── full_markdown = export_full_markdown(doc) # ── Assemble final output ───────────────────────────────────────────── metadata["removed_pages"] = sorted(removed_pages) # used by chunker parsed = { "metadata" : metadata, "sections" : sections, "tables" : tables, "full_markdown": full_markdown, } # ── Save custom processed JSON ──────────────────────────────────────── with open(out_path, "w") as f: json.dump(parsed, f, indent=2, ensure_ascii=False, default=str) # ── Save native DoclingDocument (for HybridChunker in Phase 3) ─────── # HybridChunker needs the original DoclingDocument object. # Docling's native format preserves full structural metadata # (heading hierarchy, table cell positions, reading order) that # our custom JSON does not capture. docling_path = out_path.with_name(out_path.stem + "_docling.json") with open(docling_path, "w") as f: f.write(doc.model_dump_json()) log.info(f" Saved DoclingDocument → {docling_path.name} " f"({docling_path.stat().st_size / 1024:.1f} KB)") size_kb = out_path.stat().st_size / 1024 log.info(f" Saved → {out_path.name} ({size_kb:.1f} KB)") log.info(f" Summary: {metadata['total_pages']} pages | " f"{metadata['total_sections']} sections | " f"{metadata['total_tables']} tables") return parsed def process_all(self, pdf_dir: Path = RAW_DIR, force: bool = False) -> list[dict]: """Process all PDFs in a directory.""" pdfs = sorted(pdf_dir.glob("*.pdf")) log.info(f"Found {len(pdfs)} PDFs in {pdf_dir}") results = [] for pdf in pdfs: result = self.process(pdf, force=force) results.append(result) log.info(f"Processing complete — {len(results)} documents") return results # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": processor = PDFProcessor() results = processor.process_all() print("\n" + "=" * 55) print("PROCESSING SUMMARY") print("=" * 55) for r in results: m = r["metadata"] print(f"\nFile : {m['file_name']}") print(f"Pages : {m['total_pages']}") print(f"Sections: {m['total_sections']}") print(f"Tables : {m['total_tables']}") if r["tables"]: print("Tables found:") for t in r["tables"]: print(f" Page {t['page_num']} — " f"{t['rows']} rows × {t['cols']} cols | " f"Headers: {t['headers'][:3]}")