Financial_bot / src /chunker.py
Pushkya's picture
Upload 30 files
8299003 verified
Raw
History Blame Contribute Delete
17.9 kB
"""
chunker.py
==========
Phase 3 – Document Chunking
Converts the structured output from pdf_processor.py into chunks ready for
embedding and storage in ChromaDB.
Why not LangChain SemanticChunker?
-----------------------------------
LangChain's SemanticChunker treats every document as a flat string. It has
no concept of headings, section hierarchy, or whether a block of text is a
table or a paragraph. Docling's HybridChunker, on the other hand, works on
the native DoclingDocument object and understands the full visual structure
parsed by the DocLayNet layout model.
Why not HybridChunker for tables?
-----------------------------------
HybridChunker splits tables to fit within the token limit. For financial
documents, a holdings table with 30 rows becomes 5 fragments β€” each fragment
loses the column-header context. An LLM reading "Ticker = GS. Sector =
Financial Services." with no table headers cannot reconstruct the original
table structure reliably.
Solution: best of both worlds
-------------------------------
TEXT β†’ Docling HybridChunker (tokenizer-aware, structure-aware,
full heading-path injection, respects document hierarchy)
TABLES β†’ Atomic markdown pass-through (one complete table = one chunk,
markdown preserves column/row alignment, never split)
This gives semantic coherence for text AND lossless fidelity for tables.
Output format per chunk
------------------------
{
"chunk_id" : "ptc01302411420_text_0042",
"doc_id" : "ptc01302411420",
"chunk_type" : "text" | "table",
"text" : "...", # prose or full markdown table
"is_atomic" : false | true,
"metadata" : {
"source" : "morningstar" | "sec_edgar",
"doc_type" : "research_report" | "10-K" | "10-Q" | "8-K",
"file_name" : "ptc01302411420.pdf",
"page_num" : 5,
"section_title" : "Financial Summary", # immediate heading
"heading_path" : "Results > Financial Summary", # full hierarchy
... # all keys from document metadata
}
}
Usage (as a module)
-------------------
from src.chunker import DocumentChunker
chunker = DocumentChunker()
chunks = chunker.chunk_document("data/processed/morningstar/ptc01302411420.json")
Usage (as a script)
-------------------
python src/chunker.py
python src/chunker.py --force # re-chunk even if output exists
"""
import json
import logging
from pathlib import Path
from datetime import datetime, timezone
# ── Logging ────────────────────────────────────────────────────────────────────
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s %(levelname)-8s %(message)s"
)
log = logging.getLogger(__name__)
# ── Paths ──────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent.parent
PROCESSED_DIR = BASE_DIR / "data" / "processed"
CHUNKS_DIR = BASE_DIR / "data" / "chunks"
# ── Constants ──────────────────────────────────────────────────────────────────
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 256 # all-MiniLM-L6-v2 context window
# ══════════════════════════════════════════════════════════════════════════════
# TEXT CHUNKING β€” Docling HybridChunker
# ──────────────────────────────────────────────────────────────────────────────
# HybridChunker works on the native DoclingDocument, understanding:
# β€’ Section headings and their hierarchy
# β€’ Reading order determined by DocLayNet
# β€’ Token limits of the target embedding model
#
# For each text chunk it produces:
# chunk.text β†’ the prose content
# chunk.meta.headings β†’ list of heading titles from root to this section
# e.g. ["Results", "Financial Summary", "Revenue"]
# chunk.meta.doc_items β†’ original DocItem objects (for page number lookup)
#
# We SKIP chunks whose doc_items contain a TableItem β€” tables are handled
# separately with our atomic markdown approach.
# ══════════════════════════════════════════════════════════════════════════════
def chunk_text_with_hybrid(
docling_doc_path : Path,
doc_meta : dict,
removed_pages : set,
) -> list[dict]:
"""
Use Docling HybridChunker to split text sections of a document.
Tables are skipped here β€” they are handled by chunk_tables_atomic().
Args:
docling_doc_path : path to the _docling.json file saved by Phase 2
doc_meta : document-level metadata dict (from processed JSON)
removed_pages : set of page numbers filtered out by the noise filter
Returns:
list of text chunk dicts (chunk_type="text")
"""
from docling.chunking import HybridChunker
from docling.datamodel.document import DoclingDocument, TableItem
# Load native DoclingDocument
with open(docling_doc_path) as f:
dl_doc = DoclingDocument.model_validate_json(f.read())
chunker = HybridChunker(
tokenizer = EMBEDDING_MODEL,
max_tokens = MAX_TOKENS,
merge_peers = True, # merge small adjacent chunks sharing same heading
)
chunks: list[dict] = []
for chunk in chunker.chunk(dl_doc):
# ── Skip table chunks ─────────────────────────────────────────────
# HybridChunker splits large tables by token count, which destroys
# column-header context. We handle tables separately with full
# markdown representation (one table = one chunk, never split).
if any(isinstance(item, TableItem) for item in chunk.meta.doc_items):
continue
# ── Get page number from first doc item ───────────────────────────
page_num = None
for item in chunk.meta.doc_items:
if item.prov:
page_num = item.prov[0].page_no
break
# ── Skip chunks on noise pages ────────────────────────────────────
if page_num in removed_pages:
continue
# ── Skip empty chunks ─────────────────────────────────────────────
text = chunk.text.strip()
if not text:
continue
# ── Build heading metadata ────────────────────────────────────────
headings = chunk.meta.headings or []
section_title = headings[-1] if headings else ""
heading_path = " > ".join(headings) if headings else ""
chunks.append({
"chunk_type" : "text",
"text" : text,
"is_atomic" : False,
"metadata" : {
**doc_meta,
"section_title": section_title,
"heading_path" : heading_path,
"page_num" : page_num,
},
})
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# TABLE CHUNKING β€” Atomic markdown pass-through
# ──────────────────────────────────────────────────────────────────────────────
# Why markdown and not HybridChunker's key=value format?
#
# HybridChunker serialises a table cell as:
# "Goldman Sachs, Top 10 Holdings.Ticker = GS. Goldman Sachs, Top 10
# Holdings.Sector = Financial Services."
#
# Our markdown representation:
# | Company | Ticker | Sector |
# |Goldman Sachs | GS | Financial Services |
#
# LLMs are trained on markdown and read it far more accurately than the
# key=value format. A query about "Goldman Sachs sector" against the
# markdown chunk will produce a correct, well-grounded answer.
# ══════════════════════════════════════════════════════════════════════════════
def chunk_tables_atomic(tables: list[dict], doc_meta: dict) -> list[dict]:
"""
Convert extracted tables into atomic chunks using markdown representation.
Each table = exactly one chunk. The `is_atomic` flag instructs downstream
systems to never split this chunk.
Args:
tables : list of table dicts from processed JSON (with 'markdown' key)
doc_meta : document-level metadata dict
Returns:
list of table chunk dicts (chunk_type="table")
"""
chunks: list[dict] = []
for t in tables:
markdown = t.get("markdown", "").strip()
if not markdown:
continue
chunks.append({
"chunk_type": "table",
"text" : markdown,
"is_atomic" : True,
"metadata" : {
**doc_meta,
"page_num" : t.get("page_num"),
"table_index" : t.get("index"),
"rows" : t.get("rows"),
"cols" : t.get("cols"),
"col_headers" : t.get("headers", []),
},
})
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# CHUNK ID ASSIGNMENT
# ──────────────────────────────────────────────────────────────────────────────
# IDs are deterministic: {doc_stem}_{type}_{zero_padded_index}
# Deterministic IDs allow ChromaDB upserts (add-or-update) without duplicates
# when the chunker is re-run after adding new documents.
# ══════════════════════════════════════════════════════════════════════════════
def _assign_ids(chunks: list[dict], doc_stem: str) -> list[dict]:
"""Attach chunk_id and doc_id to every chunk in-place."""
text_idx = table_idx = 0
for chunk in chunks:
if chunk["chunk_type"] == "text":
chunk["chunk_id"] = f"{doc_stem}_text_{text_idx:04d}"
text_idx += 1
else:
chunk["chunk_id"] = f"{doc_stem}_table_{table_idx:03d}"
table_idx += 1
chunk["doc_id"] = doc_stem
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# MAIN CHUNKER CLASS
# ══════════════════════════════════════════════════════════════════════════════
class DocumentChunker:
"""
End-to-end document chunker.
Loads a processed JSON + its paired _docling.json (from PDFProcessor),
applies HybridChunker to text sections, passes tables through atomically
as markdown, and saves a chunks JSON.
"""
def __init__(self, chunks_dir: Path = CHUNKS_DIR):
self.chunks_dir = Path(chunks_dir)
self.chunks_dir.mkdir(parents=True, exist_ok=True)
def chunk_document(
self,
json_path : str | Path,
force : bool = False,
) -> list[dict]:
"""
Chunk a single processed document.
Args:
json_path : path to the processed JSON (output of PDFProcessor)
force : if True, re-chunk even if output already exists
Returns:
list of chunk dicts
"""
json_path = Path(json_path).resolve()
# Mirror processed/ directory structure under chunks/
rel = json_path.relative_to(PROCESSED_DIR.resolve())
out_path = self.chunks_dir / rel.parent / f"{json_path.stem}_chunks.json"
doc_stem = json_path.stem
if out_path.exists() and not force:
log.info(f"SKIP {json_path.name} (already chunked β†’ {out_path.name})")
with open(out_path) as f:
return json.load(f)["chunks"]
log.info(f"Chunking: {json_path.name}")
# ── Load processed JSON ───────────────────────────────────────────
with open(json_path) as f:
doc = json.load(f)
doc_meta = doc["metadata"]
tables = doc.get("tables", [])
removed_pages = set(doc_meta.get("removed_pages", []))
# ── Locate paired _docling.json ───────────────────────────────────
docling_path = json_path.with_name(json_path.stem + "_docling.json")
if not docling_path.exists():
log.warning(
f" _docling.json not found for {json_path.name}. "
f"Re-run pdf_processor.py with force=True to regenerate it."
)
return []
# ── Text chunks via HybridChunker ─────────────────────────────────
text_chunks = chunk_text_with_hybrid(docling_path, doc_meta, removed_pages)
log.info(f" Text chunks : {len(text_chunks)}")
# ── Table chunks via atomic markdown pass-through ─────────────────
table_chunks = chunk_tables_atomic(tables, doc_meta)
log.info(f" Table chunks : {len(table_chunks)}")
# ── Combine, assign IDs, save ─────────────────────────────────────
all_chunks = _assign_ids(text_chunks + table_chunks, doc_stem)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump({
"doc_id" : doc_stem,
"source_file" : str(json_path),
"chunked_at" : datetime.now(timezone.utc).isoformat(),
"total_chunks" : len(all_chunks),
"text_chunks" : len(text_chunks),
"table_chunks" : len(table_chunks),
"chunks" : all_chunks,
}, f, indent=2, ensure_ascii=False)
size_kb = out_path.stat().st_size / 1024
log.info(f" Saved β†’ {out_path.name} ({size_kb:.1f} KB) "
f"[{len(text_chunks)} text + {len(table_chunks)} table = "
f"{len(all_chunks)} total]")
return all_chunks
def chunk_all(
self,
processed_dir : Path = PROCESSED_DIR,
force : bool = False,
) -> dict:
"""Chunk all processed JSON files. Returns {filename: chunk_count}."""
json_files = sorted(Path(processed_dir).rglob("*.json"))
# Only process main processed JSONs, not the _docling.json files
json_files = [f for f in json_files if not f.stem.endswith("_docling")]
log.info(f"Found {len(json_files)} processed documents")
summary = {}
for jf in json_files:
try:
chunks = self.chunk_document(jf, force=force)
summary[jf.name] = len(chunks)
except Exception as e:
log.error(f" FAILED {jf.name}: {e}")
summary[jf.name] = 0
return summary
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
force = "--force" in sys.argv
log.info("=" * 60)
log.info("Phase 3 – Document Chunker (HybridChunker + atomic tables)")
log.info("=" * 60)
chunker = DocumentChunker()
summary = chunker.chunk_all(force=force)
log.info("\n" + "=" * 60)
log.info("Chunking complete.")
total = 0
for fname, n in summary.items():
log.info(f" {fname:50s} {n:>5} chunks")
total += n
log.info(f" {'TOTAL':50s} {total:>5} chunks")
log.info("=" * 60)