Financial_bot / src /pdf_processor.py
Pushkya's picture
Upload 30 files
8299003 verified
Raw
History Blame Contribute Delete
29.3 kB
"""
pdf_processor.py
=================
Production-ready PDF preprocessing module using Docling.
What this module does:
1. Loads a PDF using Docling's document converter
2. Extracts text sections with their heading hierarchy
3. Filters out noise pages (cover, TOC, disclaimers, legal boilerplate)
4. Extracts tables as structured data (markdown + row/col data)
5. Cleans and normalises text (whitespace, encoding issues)
6. Attaches rich metadata to every element
7. Saves the structured output as JSON
Why Docling over PyPDF / pdfplumber?
- PyPDF gives raw text dump — tables become garbled single lines
- pdfplumber is better but still struggles with multi-column layouts
- Docling runs an AI layout model (DocLayNet) that understands the
visual structure of the page: columns, tables, headings, captions
- For financial documents with income statements and data tables
this structural understanding is non-negotiable
Usage (as a module):
from src.pdf_processor import PDFProcessor
processor = PDFProcessor()
result = processor.process("data/raw/morningstar/ptc01302411420.pdf")
Usage (as a script):
python src/pdf_processor.py
"""
import re
import json
import logging
from pathlib import Path
from datetime import datetime, timezone
# ── Logging ────────────────────────────────────────────────────────────────────
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s %(levelname)-8s %(message)s"
)
log = logging.getLogger(__name__)
# ── Paths ──────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent.parent
RAW_DIR = BASE_DIR / "data" / "raw" / "morningstar"
PROCESSED_DIR = BASE_DIR / "data" / "processed" / "morningstar"
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 1 — Build the Docling Converter
# ──────────────────────────────────────────────────────────────────────────────
# We configure Docling with specific pipeline options before parsing.
# These options control which AI models run during parsing.
#
# Options we set:
# do_table_structure = True
# → Runs TableFormer model to reconstruct table rows/columns
# → Without this, table cells are extracted as unordered text
#
# do_ocr = False
# → These PDFs are digital (not scanned images), so OCR is off
# → Turning OCR on for digital PDFs wastes time and adds noise
#
# generate_picture_images = False
# → We don't need embedded chart/figure images
# → Skipping this speeds up parsing significantly
# ══════════════════════════════════════════════════════════════════════════════
def build_converter():
"""Build and return a configured Docling DocumentConverter."""
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.base_models import InputFormat
opts = PdfPipelineOptions()
opts.do_table_structure = True # reconstruct table rows/columns
opts.do_ocr = False # skip OCR — digital PDFs only
opts.generate_picture_images = False # skip figure image extraction
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=opts)
}
)
log.info("Docling converter initialised (table_structure=ON, OCR=OFF)")
return converter
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 2 — Noise Page Filter
# ──────────────────────────────────────────────────────────────────────────────
# Not every page in a financial PDF is useful. Pages we actively remove:
#
# Cover / Title pages
# → Just document title, author name, date
# → Zero retrieval value — no financial content
#
# Table of Contents / Index pages
# → Lists section names and page numbers
# → Section names are already captured in real section headers
# → Page numbers refer to printed pages, useless in RAG
#
# Disclaimer / Legal pages
# → "Important Disclosure", "General Disclosure", "Risk Warning",
# "Conflicts of Interest", copyright notices
# → ACTIVELY HARMFUL: contains terms like "investment", "securities",
# "risk" that match financial queries but return legal boilerplate
# → Query "what are the risks?" should return risk analysis, NOT this
#
# Detection strategy:
# → A page is noise if its ONLY headers are known boilerplate titles
# AND its text is below a minimum meaningful length threshold
# ══════════════════════════════════════════════════════════════════════════════
# Known boilerplate section titles (after normalisation — no ® / ™ symbols)
# Used for exact-match check in is_noise_header()
NOISE_HEADERS = {
"contents", "table of contents", "index",
"important disclosure", "important disclosures",
"general disclosure", "general disclosures",
"risk warning", "risks", "conflicts of interest",
"third-party distribution", "vaneck disclosures",
"legal disclaimer", "disclaimer", "disclaimers",
"about morningstar indexes", "about morningstar equity research",
}
# Regex: street address line e.g. "22 West Washington Street Chicago, IL 60602 USA"
_ADDRESS_RE = re.compile(
r"^\d+\s+\w+.*\b(street|st|avenue|ave|boulevard|blvd|road|rd|drive|dr|lane|ln|way)\b",
re.IGNORECASE,
)
def _normalize_header(text: str) -> str:
"""
Strip trademark symbols and collapse whitespace so that
"About Morningstar® Equity Research TM" normalises to
"about morningstar equity research".
"""
t = text.strip().lower()
t = t.replace("®", "").replace("™", "").replace("℠", "")
# Remove standalone " tm" / "(tm)" suffixes
t = re.sub(r"\s*\(?\btm\b\)?$", "", t)
# Collapse runs of whitespace left by symbol removal
t = re.sub(r"\s{2,}", " ", t).strip()
return t
def _is_noise_header(raw_header: str) -> bool:
"""
Return True if a single header line is boilerplate.
Checks (in order):
1. Exact match in NOISE_HEADERS after normalisation
2. Header ends with 'disclosure', 'disclosures', 'disclaimer', or 'disclaimers'
→ catches doc-specific titles like "Wide Moat Focus Index Disclosures"
3. Header looks like a postal address
→ "22 West Washington Street Chicago, IL 60602 USA"
"""
norm = _normalize_header(raw_header)
if norm in NOISE_HEADERS:
return True
# Pattern: ends with a disclosure/disclaimer keyword
if re.search(r"\b(disclosures?|disclaimers?)\s*$", norm):
return True
# Pattern: street address
if _ADDRESS_RE.match(raw_header.strip()):
return True
return False
def is_noise_page(page_sections: list[dict]) -> bool:
"""
Return True if a page contains only boilerplate content.
A page is considered noise if:
- It has no text at all (blank/cover page), OR
- Case A: ALL headers are noise → remove regardless of text length
Catches multi-paragraph legal/disclaimer pages
- Case B: Noise headers outnumber content headers AND text < 300 chars
Catches mixed cover pages with one content title + several disclaimer headers
"""
if not page_sections:
return True # blank page
total_text = " ".join(s["text"] for s in page_sections).strip()
# Blank or near-blank page (cover pages often have <50 chars)
if len(total_text) < 50:
return True
raw_headers = [s["text"] for s in page_sections if s["type"] == "header"]
text_blocks = [s for s in page_sections if s["type"] == "text"]
text_content = " ".join(s["text"] for s in text_blocks).strip()
if not raw_headers:
return False # no headers — let content pages through
noise_headers = [h for h in raw_headers if _is_noise_header(h)]
content_headers = [h for h in raw_headers if not _is_noise_header(h)]
# Case A: ALL headers on the page are noise
if len(content_headers) == 0:
return True
# Case B: Noise headers outnumber content headers AND page is mostly boilerplate text
if len(noise_headers) > len(content_headers) and len(text_content) < 300:
return True
return False
def filter_noise_pages(sections: list[dict]) -> tuple[list[dict], list[int]]:
"""
Remove sections that belong to noise pages.
Returns:
filtered_sections : sections with noise pages removed
removed_pages : list of page numbers that were filtered out
"""
from collections import defaultdict
# Group sections by page
by_page = defaultdict(list)
for s in sections:
pg = s.get("page_num") or 0
by_page[pg].append(s)
removed_pages = []
kept_sections = []
for pg in sorted(by_page.keys()):
if is_noise_page(by_page[pg]):
removed_pages.append(pg)
else:
kept_sections.extend(by_page[pg])
if removed_pages:
log.info(f" Filtered {len(removed_pages)} noise pages: {removed_pages}")
return kept_sections, removed_pages
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 3 — Text Cleaning
# ──────────────────────────────────────────────────────────────────────────────
# Raw text from PDFs often contains:
# - Extra whitespace and blank lines between words
# - Hyphenated line breaks ("competi-\ntive" → "competitive")
# - Unicode noise characters (soft hyphens, zero-width spaces)
# - Repeated whitespace inside sentences
#
# We apply a simple cleaning pipeline to fix these before chunking.
# Why clean BEFORE chunking?
# → If we chunk first, each chunk inherits the noise
# → The embedding model will encode noise as part of the meaning
# → Clean text produces cleaner, more accurate embeddings
# ══════════════════════════════════════════════════════════════════════════════
def clean_text(text: str) -> str:
"""
Clean raw text extracted from a PDF.
Steps:
1. Fix hyphenated line breaks ("competi-\\ntion" → "competition")
2. Remove soft hyphens and zero-width characters
3. Collapse multiple spaces into one
4. Strip leading/trailing whitespace
"""
if not text:
return ""
# Step 1: Fix hyphenated line breaks (common in PDFs)
text = re.sub(r"-\n", "", text)
# Step 2: Remove soft hyphens (U+00AD) and zero-width spaces (U+200B)
text = text.replace("\u00ad", "").replace("\u200b", "")
# Step 3: Collapse multiple spaces/tabs into single space
text = re.sub(r"[ \t]+", " ", text)
# Step 4: Collapse more than 2 consecutive newlines into 2
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 3 — Section Extraction
# ──────────────────────────────────────────────────────────────────────────────
# Docling's document model organises content as a tree of items.
# We iterate over it and separate items into two types:
#
# SectionHeaderItem → A heading (H1, H2, H3 etc.)
# TextItem → A paragraph of body text
#
# Why capture heading level?
# → Heading level tells us where we are in the document hierarchy
# → "Net Income" under H1 "Financial Statements" is different from
# "Net Income" under H2 "Non-GAAP Reconciliation"
# → We store this in metadata so retrieval can filter by section
#
# Why separate headers from text?
# → Headers are short and don't chunk well alone
# → We prefix each text chunk with its parent header for context
# ══════════════════════════════════════════════════════════════════════════════
def extract_sections(doc) -> list[dict]:
"""
Extract all text sections from a parsed Docling document.
Returns a list of dicts:
{type, level, text, page_num, cleaned_text}
"""
from docling.datamodel.document import TextItem, SectionHeaderItem
sections = []
current_header = "" # track the last seen heading for context
for item, level in doc.iterate_items():
text = getattr(item, "text", None)
if not text or not text.strip():
continue
page_num = item.prov[0].page_no if item.prov else None
if isinstance(item, SectionHeaderItem):
current_header = text.strip()
sections.append({
"type" : "header",
"level" : level,
"text" : text.strip(),
"cleaned_text": clean_text(text),
"page_num" : page_num,
"parent_header": "",
})
else:
sections.append({
"type" : "text",
"level" : level,
"text" : text.strip(),
"cleaned_text": clean_text(text),
"page_num" : page_num,
"parent_header": current_header, # context from last heading
})
log.info(f" Extracted {len(sections)} sections "
f"({sum(1 for s in sections if s['type']=='header')} headers, "
f"{sum(1 for s in sections if s['type']=='text')} text blocks)")
# Remove cover, TOC, and disclaimer pages
sections, removed = filter_noise_pages(sections)
log.info(f" After noise filter: {len(sections)} sections remain")
return sections
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 4 — Table Extraction
# ──────────────────────────────────────────────────────────────────────────────
# Tables are the most important element in financial documents.
# Docling's TableFormer model reconstructs the row/column structure.
#
# For each table we extract:
# markdown → Human-readable, good for LLM context
# data → Raw list of lists for programmatic access
# headers → Column names for metadata tagging
#
# Why keep tables ATOMIC (never split)?
# → A revenue table split across two chunks loses column alignment
# → LLM receiving half a table gives wrong or hallucinated answers
# → Each table is stored as ONE complete chunk, regardless of size
#
# Why convert to markdown?
# → Markdown tables are easy for LLMs to read and parse
# → They preserve column-row relationships in plain text
# ══════════════════════════════════════════════════════════════════════════════
def extract_tables(doc, skip_pages: set = None) -> list[dict]:
"""
Extract all tables from a parsed Docling document.
Args:
skip_pages: set of page numbers to skip (noise pages)
Returns a list of dicts:
{index, page_num, markdown, headers, rows, cols, data, is_atomic}
"""
skip_pages = skip_pages or set()
tables = []
for i, table in enumerate(doc.tables):
try:
df = table.export_to_dataframe(doc)
markdown = table.export_to_markdown(doc)
page_num = table.prov[0].page_no if table.prov else None
# Skip tables on noise pages (cover, TOC, disclaimer)
if page_num in skip_pages:
continue
# Skip empty or trivially small tables (1 row = probably a label)
if df.empty or len(df) < 2:
continue
tables.append({
"index" : i,
"page_num" : page_num,
"markdown" : markdown,
"headers" : list(df.columns.astype(str)),
"rows" : len(df),
"cols" : len(df.columns),
"data" : df.fillna("").values.tolist(),
"is_atomic": True, # NEVER split this chunk
})
except Exception as e:
log.warning(f" Table {i} could not be extracted: {e}")
log.info(f" Extracted {len(tables)} tables")
return tables
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 5 — Metadata Tagging
# ──────────────────────────────────────────────────────────────────────────────
# Every element (section or table) gets a metadata dict attached.
# This metadata is stored alongside the vector in ChromaDB.
#
# Why metadata matters:
# → Allows FILTERED retrieval ("only search 2024 10-K documents")
# → Enables source citation ("found on page 12 of PTC report")
# → Supports temporal queries ("Apple revenue in fiscal 2024")
#
# Fields we tag:
# source → which file this came from
# doc_type → research_report / 10-K / 10-Q / 8-K
# company → Apple / PTC / etc.
# fiscal_year → for time-aware retrieval
# page_num → for citations
# section_title → which section this chunk belongs to
# ══════════════════════════════════════════════════════════════════════════════
def build_metadata(pdf_path: Path, extra: dict = None) -> dict:
"""
Build base metadata for a document from its file path and optional extras.
"""
meta = {
"file_name" : pdf_path.name,
"file_path" : str(pdf_path),
"source" : "morningstar",
"doc_type" : "research_report",
"license" : "proprietary",
"parsed_at" : datetime.now(timezone.utc).isoformat(),
"parser" : "docling",
}
if extra:
meta.update(extra)
return meta
# ══════════════════════════════════════════════════════════════════════════════
# PREPROCESSING STEP 6 — Full Document Export
# ──────────────────────────────────────────────────────────────────────────────
# After extracting sections and tables, we also export the full document
# as a single markdown string.
#
# Why?
# → Useful for quick inspection and debugging
# → Can be used as a fallback if section-level chunking fails
# → Gives the LLM a complete document view when needed
# ══════════════════════════════════════════════════════════════════════════════
def export_full_markdown(doc) -> str:
"""Export the entire document as a single markdown string."""
return doc.export_to_markdown()
# ══════════════════════════════════════════════════════════════════════════════
# MAIN PROCESSOR CLASS
# ══════════════════════════════════════════════════════════════════════════════
class PDFProcessor:
"""
End-to-end PDF processor using Docling.
Combines all preprocessing steps into a single callable interface.
Idempotent — skips files that have already been processed (checks cache).
"""
def __init__(self, output_dir: Path = PROCESSED_DIR):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self._converter = None # lazy load — only initialise when first needed
@property
def converter(self):
if self._converter is None:
log.info("Loading Docling converter (first use) ...")
self._converter = build_converter()
return self._converter
def process(self, pdf_path: str | Path, extra_meta: dict = None,
force: bool = False) -> dict:
"""
Process a single PDF file through all preprocessing steps.
Args:
pdf_path : Path to the PDF file
extra_meta : Optional extra metadata (company, fiscal_year, etc.)
force : If True, re-process even if output already exists
Returns:
Parsed document dict with metadata, sections, tables, markdown
"""
pdf_path = Path(pdf_path)
out_path = self.output_dir / f"{pdf_path.stem}.json"
# Check cache — skip if already processed
if out_path.exists() and not force:
log.info(f"SKIP {pdf_path.name} (already processed → {out_path.name})")
with open(out_path) as f:
return json.load(f)
log.info(f"Processing: {pdf_path.name}")
# ── Step 1: Parse with Docling ────────────────────────────────────────
result = self.converter.convert(str(pdf_path))
doc = result.document
log.info(f" Docling parse complete")
# ── Step 2 + 3: Extract sections (text cleaning happens inside) ───────
sections = extract_sections(doc)
# ── Step 4: Extract tables ────────────────────────────────────────────
# Identify pages that were removed so we can skip their tables too
from collections import defaultdict
by_page = defaultdict(list)
for s in sections:
pg = s.get("page_num") or 0
by_page[pg].append(s)
# Get list of noise pages from raw doc (before filter was applied)
raw_sections_for_filter = []
for item, level in doc.iterate_items():
from docling.datamodel.document import TextItem, SectionHeaderItem
text = getattr(item, "text", None)
if not text:
continue
page_num = item.prov[0].page_no if item.prov else None
raw_sections_for_filter.append({
"type" : "header" if isinstance(item, SectionHeaderItem) else "text",
"text" : text.strip(),
"page_num": page_num,
})
_, removed_pages = filter_noise_pages(raw_sections_for_filter)
tables = extract_tables(doc, skip_pages=set(removed_pages))
# ── Step 5: Build metadata ────────────────────────────────────────────
metadata = build_metadata(pdf_path, extra_meta)
metadata["total_sections"] = len(sections)
metadata["total_tables"] = len(tables)
metadata["total_pages"] = max(
(s["page_num"] for s in sections if s["page_num"]), default=0
)
# ── Step 6: Full markdown export ──────────────────────────────────────
full_markdown = export_full_markdown(doc)
# ── Assemble final output ─────────────────────────────────────────────
metadata["removed_pages"] = sorted(removed_pages) # used by chunker
parsed = {
"metadata" : metadata,
"sections" : sections,
"tables" : tables,
"full_markdown": full_markdown,
}
# ── Save custom processed JSON ────────────────────────────────────────
with open(out_path, "w") as f:
json.dump(parsed, f, indent=2, ensure_ascii=False, default=str)
# ── Save native DoclingDocument (for HybridChunker in Phase 3) ───────
# HybridChunker needs the original DoclingDocument object.
# Docling's native format preserves full structural metadata
# (heading hierarchy, table cell positions, reading order) that
# our custom JSON does not capture.
docling_path = out_path.with_name(out_path.stem + "_docling.json")
with open(docling_path, "w") as f:
f.write(doc.model_dump_json())
log.info(f" Saved DoclingDocument → {docling_path.name} "
f"({docling_path.stat().st_size / 1024:.1f} KB)")
size_kb = out_path.stat().st_size / 1024
log.info(f" Saved → {out_path.name} ({size_kb:.1f} KB)")
log.info(f" Summary: {metadata['total_pages']} pages | "
f"{metadata['total_sections']} sections | "
f"{metadata['total_tables']} tables")
return parsed
def process_all(self, pdf_dir: Path = RAW_DIR,
force: bool = False) -> list[dict]:
"""Process all PDFs in a directory."""
pdfs = sorted(pdf_dir.glob("*.pdf"))
log.info(f"Found {len(pdfs)} PDFs in {pdf_dir}")
results = []
for pdf in pdfs:
result = self.process(pdf, force=force)
results.append(result)
log.info(f"Processing complete — {len(results)} documents")
return results
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
processor = PDFProcessor()
results = processor.process_all()
print("\n" + "=" * 55)
print("PROCESSING SUMMARY")
print("=" * 55)
for r in results:
m = r["metadata"]
print(f"\nFile : {m['file_name']}")
print(f"Pages : {m['total_pages']}")
print(f"Sections: {m['total_sections']}")
print(f"Tables : {m['total_tables']}")
if r["tables"]:
print("Tables found:")
for t in r["tables"]:
print(f" Page {t['page_num']} — "
f"{t['rows']} rows × {t['cols']} cols | "
f"Headers: {t['headers'][:3]}")