""" chunker.py ========== Phase 3 – Document Chunking Converts the structured output from pdf_processor.py into chunks ready for embedding and storage in ChromaDB. Why not LangChain SemanticChunker? ----------------------------------- LangChain's SemanticChunker treats every document as a flat string. It has no concept of headings, section hierarchy, or whether a block of text is a table or a paragraph. Docling's HybridChunker, on the other hand, works on the native DoclingDocument object and understands the full visual structure parsed by the DocLayNet layout model. Why not HybridChunker for tables? ----------------------------------- HybridChunker splits tables to fit within the token limit. For financial documents, a holdings table with 30 rows becomes 5 fragments — each fragment loses the column-header context. An LLM reading "Ticker = GS. Sector = Financial Services." with no table headers cannot reconstruct the original table structure reliably. Solution: best of both worlds ------------------------------- TEXT → Docling HybridChunker (tokenizer-aware, structure-aware, full heading-path injection, respects document hierarchy) TABLES → Atomic markdown pass-through (one complete table = one chunk, markdown preserves column/row alignment, never split) This gives semantic coherence for text AND lossless fidelity for tables. Output format per chunk ------------------------ { "chunk_id" : "ptc01302411420_text_0042", "doc_id" : "ptc01302411420", "chunk_type" : "text" | "table", "text" : "...", # prose or full markdown table "is_atomic" : false | true, "metadata" : { "source" : "morningstar" | "sec_edgar", "doc_type" : "research_report" | "10-K" | "10-Q" | "8-K", "file_name" : "ptc01302411420.pdf", "page_num" : 5, "section_title" : "Financial Summary", # immediate heading "heading_path" : "Results > Financial Summary", # full hierarchy ... # all keys from document metadata } } Usage (as a module) ------------------- from src.chunker import DocumentChunker chunker = DocumentChunker() chunks = chunker.chunk_document("data/processed/morningstar/ptc01302411420.json") Usage (as a script) ------------------- python src/chunker.py python src/chunker.py --force # re-chunk even if output exists """ import json import logging from pathlib import Path from datetime import datetime, timezone # ── Logging ──────────────────────────────────────────────────────────────────── logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(levelname)-8s %(message)s" ) log = logging.getLogger(__name__) # ── Paths ────────────────────────────────────────────────────────────────────── BASE_DIR = Path(__file__).parent.parent PROCESSED_DIR = BASE_DIR / "data" / "processed" CHUNKS_DIR = BASE_DIR / "data" / "chunks" # ── Constants ────────────────────────────────────────────────────────────────── EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" MAX_TOKENS = 256 # all-MiniLM-L6-v2 context window # ══════════════════════════════════════════════════════════════════════════════ # TEXT CHUNKING — Docling HybridChunker # ────────────────────────────────────────────────────────────────────────────── # HybridChunker works on the native DoclingDocument, understanding: # • Section headings and their hierarchy # • Reading order determined by DocLayNet # • Token limits of the target embedding model # # For each text chunk it produces: # chunk.text → the prose content # chunk.meta.headings → list of heading titles from root to this section # e.g. ["Results", "Financial Summary", "Revenue"] # chunk.meta.doc_items → original DocItem objects (for page number lookup) # # We SKIP chunks whose doc_items contain a TableItem — tables are handled # separately with our atomic markdown approach. # ══════════════════════════════════════════════════════════════════════════════ def chunk_text_with_hybrid( docling_doc_path : Path, doc_meta : dict, removed_pages : set, ) -> list[dict]: """ Use Docling HybridChunker to split text sections of a document. Tables are skipped here — they are handled by chunk_tables_atomic(). Args: docling_doc_path : path to the _docling.json file saved by Phase 2 doc_meta : document-level metadata dict (from processed JSON) removed_pages : set of page numbers filtered out by the noise filter Returns: list of text chunk dicts (chunk_type="text") """ from docling.chunking import HybridChunker from docling.datamodel.document import DoclingDocument, TableItem # Load native DoclingDocument with open(docling_doc_path) as f: dl_doc = DoclingDocument.model_validate_json(f.read()) chunker = HybridChunker( tokenizer = EMBEDDING_MODEL, max_tokens = MAX_TOKENS, merge_peers = True, # merge small adjacent chunks sharing same heading ) chunks: list[dict] = [] for chunk in chunker.chunk(dl_doc): # ── Skip table chunks ───────────────────────────────────────────── # HybridChunker splits large tables by token count, which destroys # column-header context. We handle tables separately with full # markdown representation (one table = one chunk, never split). if any(isinstance(item, TableItem) for item in chunk.meta.doc_items): continue # ── Get page number from first doc item ─────────────────────────── page_num = None for item in chunk.meta.doc_items: if item.prov: page_num = item.prov[0].page_no break # ── Skip chunks on noise pages ──────────────────────────────────── if page_num in removed_pages: continue # ── Skip empty chunks ───────────────────────────────────────────── text = chunk.text.strip() if not text: continue # ── Build heading metadata ──────────────────────────────────────── headings = chunk.meta.headings or [] section_title = headings[-1] if headings else "" heading_path = " > ".join(headings) if headings else "" chunks.append({ "chunk_type" : "text", "text" : text, "is_atomic" : False, "metadata" : { **doc_meta, "section_title": section_title, "heading_path" : heading_path, "page_num" : page_num, }, }) return chunks # ══════════════════════════════════════════════════════════════════════════════ # TABLE CHUNKING — Atomic markdown pass-through # ────────────────────────────────────────────────────────────────────────────── # Why markdown and not HybridChunker's key=value format? # # HybridChunker serialises a table cell as: # "Goldman Sachs, Top 10 Holdings.Ticker = GS. Goldman Sachs, Top 10 # Holdings.Sector = Financial Services." # # Our markdown representation: # | Company | Ticker | Sector | # |Goldman Sachs | GS | Financial Services | # # LLMs are trained on markdown and read it far more accurately than the # key=value format. A query about "Goldman Sachs sector" against the # markdown chunk will produce a correct, well-grounded answer. # ══════════════════════════════════════════════════════════════════════════════ def chunk_tables_atomic(tables: list[dict], doc_meta: dict) -> list[dict]: """ Convert extracted tables into atomic chunks using markdown representation. Each table = exactly one chunk. The `is_atomic` flag instructs downstream systems to never split this chunk. Args: tables : list of table dicts from processed JSON (with 'markdown' key) doc_meta : document-level metadata dict Returns: list of table chunk dicts (chunk_type="table") """ chunks: list[dict] = [] for t in tables: markdown = t.get("markdown", "").strip() if not markdown: continue chunks.append({ "chunk_type": "table", "text" : markdown, "is_atomic" : True, "metadata" : { **doc_meta, "page_num" : t.get("page_num"), "table_index" : t.get("index"), "rows" : t.get("rows"), "cols" : t.get("cols"), "col_headers" : t.get("headers", []), }, }) return chunks # ══════════════════════════════════════════════════════════════════════════════ # CHUNK ID ASSIGNMENT # ────────────────────────────────────────────────────────────────────────────── # IDs are deterministic: {doc_stem}_{type}_{zero_padded_index} # Deterministic IDs allow ChromaDB upserts (add-or-update) without duplicates # when the chunker is re-run after adding new documents. # ══════════════════════════════════════════════════════════════════════════════ def _assign_ids(chunks: list[dict], doc_stem: str) -> list[dict]: """Attach chunk_id and doc_id to every chunk in-place.""" text_idx = table_idx = 0 for chunk in chunks: if chunk["chunk_type"] == "text": chunk["chunk_id"] = f"{doc_stem}_text_{text_idx:04d}" text_idx += 1 else: chunk["chunk_id"] = f"{doc_stem}_table_{table_idx:03d}" table_idx += 1 chunk["doc_id"] = doc_stem return chunks # ══════════════════════════════════════════════════════════════════════════════ # MAIN CHUNKER CLASS # ══════════════════════════════════════════════════════════════════════════════ class DocumentChunker: """ End-to-end document chunker. Loads a processed JSON + its paired _docling.json (from PDFProcessor), applies HybridChunker to text sections, passes tables through atomically as markdown, and saves a chunks JSON. """ def __init__(self, chunks_dir: Path = CHUNKS_DIR): self.chunks_dir = Path(chunks_dir) self.chunks_dir.mkdir(parents=True, exist_ok=True) def chunk_document( self, json_path : str | Path, force : bool = False, ) -> list[dict]: """ Chunk a single processed document. Args: json_path : path to the processed JSON (output of PDFProcessor) force : if True, re-chunk even if output already exists Returns: list of chunk dicts """ json_path = Path(json_path).resolve() # Mirror processed/ directory structure under chunks/ rel = json_path.relative_to(PROCESSED_DIR.resolve()) out_path = self.chunks_dir / rel.parent / f"{json_path.stem}_chunks.json" doc_stem = json_path.stem if out_path.exists() and not force: log.info(f"SKIP {json_path.name} (already chunked → {out_path.name})") with open(out_path) as f: return json.load(f)["chunks"] log.info(f"Chunking: {json_path.name}") # ── Load processed JSON ─────────────────────────────────────────── with open(json_path) as f: doc = json.load(f) doc_meta = doc["metadata"] tables = doc.get("tables", []) removed_pages = set(doc_meta.get("removed_pages", [])) # ── Locate paired _docling.json ─────────────────────────────────── docling_path = json_path.with_name(json_path.stem + "_docling.json") if not docling_path.exists(): log.warning( f" _docling.json not found for {json_path.name}. " f"Re-run pdf_processor.py with force=True to regenerate it." ) return [] # ── Text chunks via HybridChunker ───────────────────────────────── text_chunks = chunk_text_with_hybrid(docling_path, doc_meta, removed_pages) log.info(f" Text chunks : {len(text_chunks)}") # ── Table chunks via atomic markdown pass-through ───────────────── table_chunks = chunk_tables_atomic(tables, doc_meta) log.info(f" Table chunks : {len(table_chunks)}") # ── Combine, assign IDs, save ───────────────────────────────────── all_chunks = _assign_ids(text_chunks + table_chunks, doc_stem) out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w") as f: json.dump({ "doc_id" : doc_stem, "source_file" : str(json_path), "chunked_at" : datetime.now(timezone.utc).isoformat(), "total_chunks" : len(all_chunks), "text_chunks" : len(text_chunks), "table_chunks" : len(table_chunks), "chunks" : all_chunks, }, f, indent=2, ensure_ascii=False) size_kb = out_path.stat().st_size / 1024 log.info(f" Saved → {out_path.name} ({size_kb:.1f} KB) " f"[{len(text_chunks)} text + {len(table_chunks)} table = " f"{len(all_chunks)} total]") return all_chunks def chunk_all( self, processed_dir : Path = PROCESSED_DIR, force : bool = False, ) -> dict: """Chunk all processed JSON files. Returns {filename: chunk_count}.""" json_files = sorted(Path(processed_dir).rglob("*.json")) # Only process main processed JSONs, not the _docling.json files json_files = [f for f in json_files if not f.stem.endswith("_docling")] log.info(f"Found {len(json_files)} processed documents") summary = {} for jf in json_files: try: chunks = self.chunk_document(jf, force=force) summary[jf.name] = len(chunks) except Exception as e: log.error(f" FAILED {jf.name}: {e}") summary[jf.name] = 0 return summary # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": import sys force = "--force" in sys.argv log.info("=" * 60) log.info("Phase 3 – Document Chunker (HybridChunker + atomic tables)") log.info("=" * 60) chunker = DocumentChunker() summary = chunker.chunk_all(force=force) log.info("\n" + "=" * 60) log.info("Chunking complete.") total = 0 for fname, n in summary.items(): log.info(f" {fname:50s} {n:>5} chunks") total += n log.info(f" {'TOTAL':50s} {total:>5} chunks") log.info("=" * 60)